Redis数据结构

Redis使用了较丰富的数据结构,这也使得流行度已经完全超过了memcached,主要是有以下几种:

  • 字符串;
  • hash;
  • 集合;
  • 有序集合;
  • 列表;
  • bitmap;
  • geo;
  • stream;
  • hyperloglog;

下面是我画的总结图:

本身Redis是一个大dict,就是key->value的键值对。不管是key,还是value在Redis中,都是以Redis对象来存在的,看一下一个Redis对象的数据结构:



#define LRU_BITS 24
typedef struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:LRU_BITS; /* lru time (relative to server.lruclock) */
    int refcount;
    void *ptr;
} robj;

可以看到这是一个struct结构体,type表示的是何种类型,类型主要包括:

#define OBJ_STRING 0    /* String object. */
#define OBJ_LIST 1      /* List object. */
#define OBJ_SET 2       /* Set object. */
#define OBJ_ZSET 3      /* Sorted set object. */
#define OBJ_HASH 4      /* Hash object. */

而下面的encoding就是具体以何种编码方式进行编码存储。比如对于一个string,如果是一个数值,就会以long的方式存储。非数值的可以以SDS或者embstr的方式存储。这样做的目的是Redis希望能找到更节省内存的方式存储。

主要的编码方式看以下宏定义:

#define OBJ_ENCODING_RAW 0     /* Raw representation */
#define OBJ_ENCODING_INT 1     /* Encoded as integer */
#define OBJ_ENCODING_HT 2      /* Encoded as hash table */
#define OBJ_ENCODING_ZIPMAP 3  /* Encoded as zipmap */
#define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old list encoding. */
#define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define OBJ_ENCODING_INTSET 6  /* Encoded as intset */
#define OBJ_ENCODING_SKIPLIST 7  /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8  /* Embedded sds string encoding */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */
#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */

lru这个之前已讨论过了,主要是用来做内存淘汰的。

ptr就是指向实际数据的void指针,可以是任意类型,sds,int,列表等等。

看到上面可以发现,不管是字符号,hash,列表还是集合,都有统一的一个结构,即RedisObject。

在对象里可以区分出是哪些类型,此外,encoding表示了不同的编码存储方式。这指出了里层的一个数据结构。具体的请见上面的宏定义。

下面是Redis字符串具体编码的一个过程,可以看一下:

robj *tryObjectEncoding(robj *o) {
    long value;
    sds s = o->ptr;
    size_t len;

    /* Make sure this is a string object, the only type we encode
     * in this function. Other types use encoded memory efficient
     * representations but are handled by the commands implementing
     * the type. */
    serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);

    /* We try some specialized encoding only for objects that are
     * RAW or EMBSTR encoded, in other words objects that are still
     * in represented by an actually array of chars. */
    if (!sdsEncodedObject(o)) return o;

    /* It's not safe to encode shared objects: shared objects can be shared
     * everywhere in the "object space" of Redis and may end in places where
     * they are not handled. We handle them only as values in the keyspace. */
     if (o->refcount > 1) return o;

    /* Check if we can represent this string as a long integer.
     * Note that we are sure that a string larger than 20 chars is not
     * representable as a 32 nor 64 bit integer. */
    len = sdslen(s);
    if (len <= 20 && string2l(s,len,&value)) {
        /* This object is encodable as a long. Try to use a shared object.
         * Note that we avoid using shared integers when maxmemory is used
         * because every object needs to have a private LRU field for the LRU
         * algorithm to work well. */
        if ((server.maxmemory == 0 ||
            !(server.maxmemory_policy & MAXMEMORY_FLAG_NO_SHARED_INTEGERS)) &&
            value >= 0 &&
            value < OBJ_SHARED_INTEGERS)
        {
            decrRefCount(o);
            incrRefCount(shared.integers[value]);
            return shared.integers[value];
        } else {
            if (o->encoding == OBJ_ENCODING_RAW) {
                sdsfree(o->ptr);
                o->encoding = OBJ_ENCODING_INT;
                o->ptr = (void*) value;
                return o;
            } else if (o->encoding == OBJ_ENCODING_EMBSTR) {
                decrRefCount(o);
                return createStringObjectFromLongLongForValue(value);
            }
        }
    }

    /* If the string is small and is still RAW encoded,
     * try the EMBSTR encoding which is more efficient.
     * In this representation the object and the SDS string are allocated
     * in the same chunk of memory to save space and cache misses. */
    if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) {
        robj *emb;

        if (o->encoding == OBJ_ENCODING_EMBSTR) return o;
        emb = createEmbeddedStringObject(s,sdslen(s));
        decrRefCount(o);
        return emb;
    }

    /* We can't encode the object...
     *
     * Do the last try, and at least optimize the SDS string inside
     * the string object to require little space, in case there
     * is more than 10% of free space at the end of the SDS string.
     *
     * We do that only for relatively large strings as this branch
     * is only entered if the length of the string is greater than
     * OBJ_ENCODING_EMBSTR_SIZE_LIMIT. */
    trimStringObjectIfNeeded(o);

    /* Return the original object. */
    return o;
}

简单的流程就是先检查时不是sds结构,如果不是直接返回,如果引用计数大于1也返回,如果一个字符串是数值类型,那么先判断设没设置内存大小,如果没有设置,可以使用共享内存。如果设置了,就不可使用了,因为设置内存需要进行内存淘汰,每个对象都有自己的lru值。

字符串

字符串并不是简单的使用C语言字符串,而是使用了SDS的结构,全称Simple Dynamic String。先看下SDS到底是什么样的结构。

它主要由header+字符串+填充字符(字符串长度小于最大容量时)+"\0"

这个header是个struct类型,紧邻SDS字符串,在字符串的低位置。之所以这样是使得通过字符串指针为只可以轻松获取到header,header存储了当前字符串长度,最大容量以及flags标志位,这个flag表示的是这个header具体使用的是哪种类型,header类型包括:

#define SDS_TYPE_5  0
#define SDS_TYPE_8  1
#define SDS_TYPE_16 2
#define SDS_TYPE_32 3
#define SDS_TYPE_64 4

不同类型表示的用多少字节长度来表示,上面的图形很清晰

struct __attribute__ ((__packed__)) sdshdr8 {
    uint8_t len; /* used */
    uint8_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr16 {
    uint16_t len; /* used */
    uint16_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 {
    uint32_t len; /* used */
    uint32_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {
    uint64_t len; /* used */
    uint64_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};

Redis使用这种数据结构来存储字符串的好处都有啥啊?

首先是它使用header存储字符串长度,那么在获取字符串长度的时间复杂度是O(1),而C语言字符串的时间复杂度就是O(N)。那么对于一个给定的SDS字符串,怎么找到header呢?因为header和字符串数组是紧邻着的,header在低位置。所以根据字符串指针可以得到header。看看下面源码:

static inline size_t sdslen(const sds s) {
    unsigned char flags = s[-1];
    switch(flags&SDS_TYPE_MASK) {
        case SDS_TYPE_5:
            return SDS_TYPE_5_LEN(flags);
        case SDS_TYPE_8:
            return SDS_HDR(8,s)->len;
        case SDS_TYPE_16:
            return SDS_HDR(16,s)->len;
        case SDS_TYPE_32:
            return SDS_HDR(32,s)->len;
        case SDS_TYPE_64:
            return SDS_HDR(64,s)->len;
    }
    return 0;
}

flags位置是 s[-1],而这个SDS_HDR就表示header的地址指针。

#define SDS_HDR(T,s) ((struct sdshdr##T *)((s)-(sizeof(struct sdshdr##T))))

其次,因为SDS会预留一部分空间,所以当进行字符串拼接时,如果空间够就直接用,如果不够,再重新分配内存,所以这在一定程度上可以减少内存空间的分配。此外,当内存空间不足时,会自动分配内存,所以也不会出现空间溢出的问题。

字典

字典在Redis应用颇多,主要是为了提高增删改查效率。本身key value这个就是一个大dict,此外,hash结构中,多个field value 也是字典结构。

看下字典结构的定义:

typedef struct dictht {
    dictEntry **table;
    unsigned long size;
    unsigned long sizemask;
    unsigned long used;
} dictht;

typedef struct dict {
    dictType *type;
    void *privdata;
    dictht ht[2];
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    unsigned long iterators; /* number of iterators currently running */
} dict;

一个字典里会有两个dictht,之所以有俩是用于rehash,当未进行rehash时,只有ht[0]是有效的,ht[1]没有数据。每个dictht的数据存储采用哈希表的形式。当出现地址冲突的时候,采用链式法解决,这点和HashMap比较相似,在冲突元素较少时,hashMap会使用链表解决冲突,只是多时会使用红黑树。

此外,进行重hash的方式两者也有不同,Redis采用的是渐进式hash,即重hash不是一次完成的,而是每次只做部分操作,即将整个hash操作分散到各个查询插入修改的操作中。Redis主要考虑的还是性能。

哈希表中的每个元素都是一个dictEntry:

typedef struct dictEntry {
    void *key;
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    struct dictEntry *next;
} dictEntry;

下面是下面参考资料里的一个示意图,很清晰表明字典的结构:

看一下数据查找的一个过程:

dictEntry *dictFind(dict *d, const void *key)
{
    dictEntry *he;
    uint64_t h, idx, table;

    if (dictSize(d) == 0) return NULL; /* dict is empty */
    if (dictIsRehashing(d)) _dictRehashStep(d);
    h = dictHashKey(d, key);
    for (table = 0; table <= 1; table++) {
        idx = h & d->ht[table].sizemask;
        he = d->ht[table].table[idx];
        while(he) {
            if (key==he->key || dictCompareKeys(d, key, he->key))
                return he;
            he = he->next;
        }
        if (!dictIsRehashing(d)) return NULL;
    }
    return NULL;
}

其实很简单,如果正在rehash操作,要等待该步骤完成。然后根据目标key获取hash值。然后分别在两张哈希表中进行寻找。找到对应表中的位置,然后进行链表遍历,找到对应元素。

再看一下创建过程,就是初始化各个变量以及两个哈希表。rehashidx初始值是-1:

dict *dictCreate(dictType *type,
        void *privDataPtr)
{
    dict *d = zmalloc(sizeof(*d));

    _dictInit(d,type,privDataPtr);
    return d;
}

/* Initialize the hash table */
int _dictInit(dict *d, dictType *type,
        void *privDataPtr)
{
    _dictReset(&d->ht[0]);
    _dictReset(&d->ht[1]);
    d->type = type;
    d->privdata = privDataPtr;
    d->rehashidx = -1;
    d->iterators = 0;
    return DICT_OK;
}


static void _dictReset(dictht *ht)
{
    ht->table = NULL;
    ht->size = 0;
    ht->sizemask = 0;
    ht->used = 0;
}

接着看插入操作:

/* Add an element to the target hash table */
int dictAdd(dict *d, void *key, void *val)
{
    dictEntry *entry = dictAddRaw(d,key,NULL);

    if (!entry) return DICT_ERR;
    dictSetVal(d, entry, val);
    return DICT_OK;
}

dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
    long index;
    dictEntry *entry;
    dictht *ht;

    if (dictIsRehashing(d)) _dictRehashStep(d);

    /* Get the index of the new element, or -1 if
     * the element already exists. */
    if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
        return NULL;

    /* Allocate the memory and store the new entry.
     * Insert the element in top, with the assumption that in a database
     * system it is more likely that recently added entries are accessed
     * more frequently. */
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    entry = zmalloc(sizeof(*entry));
    entry->next = ht->table[index];
    ht->table[index] = entry;
    ht->used++;

    /* Set the hash entry fields. */
    dictSetKey(d, entry, key);
    return entry;
}

插入的简单流程就是:首先也是看是否正在rehash,如果是,就执行一次rehash操作。检查目标key是否已经存在,如果存在就返回。随后,如果是正在rehash,就直接插入到ht[1],第二张哈希表中。

找到目的索引位置后,如果是有冲突元素,即链表,则插入到链表头,原因就在于最新插入的可能越有可能被访问,此外,不必在遍历到链表,时间复杂度是O(N)。

随后设置dictEntry的key和value值。

看完了上面的创建,插入,查找等过程之后,删除就非常简单了。

static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
    uint64_t h, idx;
    dictEntry *he, *prevHe;
    int table;

    if (d->ht[0].used == 0 && d->ht[1].used == 0) return NULL;

    if (dictIsRehashing(d)) _dictRehashStep(d);
    h = dictHashKey(d, key);

    for (table = 0; table <= 1; table++) {
        idx = h & d->ht[table].sizemask;
        he = d->ht[table].table[idx];
        prevHe = NULL;
        while(he) {
            if (key==he->key || dictCompareKeys(d, key, he->key)) {
                /* Unlink the element from the list */
                if (prevHe)
                    prevHe->next = he->next;
                else
                    d->ht[table].table[idx] = he->next;
                if (!nofree) {
                    dictFreeKey(d, he);
                    dictFreeVal(d, he);
                    zfree(he);
                }
                d->ht[table].used--;
                return he;
            }
            prevHe = he;
            he = he->next;
        }
        if (!dictIsRehashing(d)) break;
    }
    return NULL; /* not found */
}

删除的一个简单流程:首先同样判断是否进行rehash,有就执行一次rehash操作。然后分别遍历两张哈希表。根据hashkey找到目标位置,然后开始遍历链表,找到了直接删除,这个地方其实就是链表的操作了。

说完了Redis字典的增删改查,可以看到每一个步骤当中都有rehash的判断和执行过程。

这个rehash操作在Redis中还是比较重要的,看一下其实现原理:

int dictRehash(dict *d, int n) {
    //rehashidx最多连续移动 10倍的长度,
    int empty_visits = n*10; /* Max number of empty buckets to visit. */
    if (!dictIsRehashing(d)) return 0;

   //循环遍历ht[0],执行n步骤,且不为空,
    while(n-- && d->ht[0].used != 0) {
        dictEntry *de, *nextde;

        /* Note that rehashidx can't overflow as we are sure there are more
         * elements because ht[0].used != 0 */
        assert(d->ht[0].size > (unsigned long)d->rehashidx);
        //直到获取第一个rehash值不为空
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        //获取到对应的dictEntry
        de = d->ht[0].table[d->rehashidx];
        /* Move all the keys in this bucket from the old to the new hash HT */
        //这一个bucket就是一个链表,在同一个索引位置上的元素,一次rehash步骤会一起移过去。
        while(de) {
            uint64_t h;

            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;
            de->next = d->ht[1].table[h];
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }

    /* Check if we already rehashed the whole table... */
    if (d->ht[0].used == 0) {
        zfree(d->ht[0].table);
        d->ht[0] = d->ht[1];
        _dictReset(&d->ht[1]);
        d->rehashidx = -1;
        return 0;
    }

    /* More to rehash... */
    return 1;
}

说了rehash的过程,那什么时候进行rehash呢?我们知道HashMap中定义了装载因子是0.75(元素个数/总容量),装载因子设置过大,就非常有可能因日冲突,导致单个索引下出现链表或者红黑数结构,不利于查找;装载因子过小,可能会造成空间的浪费。

然而Redis的装载因子设置为1,此外在判断是不是需要rehash的规则是:

1、服务器目前没有在执行 BGSAVE/BGREWRITEAOF 命令, 并且哈希表的负载因子大于等于 1;

2、服务器目前正在执行 BGSAVE/BGREWRITEAOF 命令, 并且哈希表的负载因子大于等于 5;

源码:

/* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d)
{
    /* Incremental rehashing already in progress. Return. */
    // 如果正在进行渐进式扩容,则返回OK
    if (dictIsRehashing(d)) return DICT_OK;

    /* If the hash table is empty expand it to the initial size. */
    // 如果哈希表ht[0]的大小为0,则初始化字典
    if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

    /* If we reached the 1:1 ratio, and we are allowed to resize the hash
     * table (global setting) or we should avoid it but the ratio between
     * elements/buckets is over the "safe" threshold, we resize doubling
     * the number of buckets. */
    /*
     * 如果哈希表ht[0]中保存的key个数与哈希表大小的比例已经达到1:1,即保存的节点数已经大于哈希表大小
     * 且redis服务当前允许执行rehash,或者保存的节点数与哈希表大小的比例超过了安全阈值(默认值为5)
     * 则将哈希表大小扩容为原来的两倍
     */
    if (d->ht[0].used >= d->ht[0].size &&
        (dict_can_resize ||
         d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
    {
        return dictExpand(d, d->ht[0].used*2);
    }
    return DICT_OK;
}

之所以设置大于等于1,首先是Redis不希望经常进行rehash操作,因为rehash本身也是耗时的,此外Redis有过期淘汰策略,会惰性或者定时删除一些key,没必要总是执行rehash操作。

有扩容就会有缩容,当key和容量比例小于10%的时候,就会进行缩容。

压缩列表

在Redis的列表和较短字符串和较少元素的Hash中都使用了压缩列表。主要目的就是为了节省内存。

它类似一个双向链表,又不是双向链表。它是在内存中连续分配的空间,就是一个list。其结构比较特殊:

* <zlbytes> <zltail> <zllen> <entry> <entry> ... <entry> <zlend>

zlbyte表示的是总的字节数;

zltail表示起始位置到最后位置的偏移字节数;

zllen表示的是entry的个数;

zlend就是一个标记位。

entry就是真正的数据了。它的结构如下:

<prevrawlen><len><data>

prerawlen记录的是前一个entry所占的总字节数,len表示的实际数据的长度,data就是真正的数据了。

通过Prerawlen可以很轻松的实现从后向前的遍历过程。

通过上面的结构可以以O(1)的时间复杂度进行Push和pop操作。具体的就不说了,太特么复杂了,不再花时间和精力去研究它了。

现在说一下Hash结构使用zipList的过程。当我们创建hash时,会创建如下对象:

robj *createHashObject(void) {
    unsigned char *zl = ziplistNew();
    robj *o = createObject(OBJ_HASH, zl);
    o->encoding = OBJ_ENCODING_ZIPLIST;
    return o;
}

当数据较多或者filed的value超过一定长度时,都会使用dict结构。之所以这样是因为数据过多时性能降低,因为其查找需要遍历。此外内存拷贝的成本也比较大。

在成员较少的情况下,hash,有序集合都会使用ziplist。在Redis3.2之前,列表在元素较少时直接使用ziplist。后来就都用了quicklist结构了。

什么场景下使用ziplist,可以通过配置控制:

hash-max-ziplist-entries 512
hash-max-ziplist-value 64
zset-max-ziplist-entries 128
zset-max-ziplist-value 64

quicklist

列表内部编码方式采用的就是quicklist结构。这个从大面来讲是一个双向链表,但又不是简单的双向链表。说他是双向链表使用因为有着双向链表基本的特性,可以向前后向后遍历,添加和删除的时间复杂度都是O(1)。说他不是简单的双向链表,是因为元素并不是普通的元素,而是由多个zipList组成,即都是由压缩表构成。

quicklist是双向链表和ziplist的两者结合,肯定是要使用两者的优点。双向链表的优点就是上面所说的,缺点就是容易导致内存碎片;ziplist在元素多的时候内存拷贝成本大,优点就是内存地址是连续的。

但如何掌握ziplist和双向链表的平衡点是重中之重。

看一下quicklist的提醒:

/* quicklist is a 40 byte struct (on 64-bit systems) describing a quicklist.
 * 'count' is the number of total entries.
 * 'len' is the number of quicklist nodes.
 * 'compress' is: 0 if compression disabled, otherwise it's the number
 *                of quicklistNodes to leave uncompressed at ends of quicklist.
 * 'fill' is the user-requested (or default) fill factor.
 * 'bookmakrs are an optional feature that is used by realloc this struct,
 *      so that they don't consume memory when not used. */
typedef struct quicklist {
    quicklistNode *head;
    quicklistNode *tail;
    unsigned long count;        /* total count of all entries in all ziplists */
    unsigned long len;          /* number of quicklistNodes */
    int fill : QL_FILL_BITS;              /* fill factor for individual nodes */
    unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */
    unsigned int bookmark_count: QL_BM_BITS;
    quicklistBookmark bookmarks[];
} quicklist;

他定义了首尾指针,以及所有ziplist的entry总数等等。

每一个quicklist节点定义:

/* quicklistNode is a 32 byte struct describing a ziplist for a quicklist.
 * We use bit fields keep the quicklistNode at 32 bytes.
 * count: 16 bits, max 65536 (max zl bytes is 65k, so max count actually < 32k).
 * encoding: 2 bits, RAW=1, LZF=2.
 * container: 2 bits, NONE=1, ZIPLIST=2.
 * recompress: 1 bit, bool, true if node is temporary decompressed for usage.
 * attempted_compress: 1 bit, boolean, used for verifying during testing.
 * extra: 10 bits, free for future use; pads out the remainder of 32 bits */
typedef struct quicklistNode {
    struct quicklistNode *prev;
    struct quicklistNode *next;
    unsigned char *zl;
    unsigned int sz;             /* ziplist size in bytes */
    unsigned int count : 16;     /* count of items in ziplist */
    unsigned int encoding : 2;   /* RAW==1 or LZF==2 */
    unsigned int container : 2;  /* NONE==1 or ZIPLIST==2 */
    unsigned int recompress : 1; /* was this node previous compressed? */
    unsigned int attempted_compress : 1; /* node can't compress; too small */
    unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;


/* quicklistLZF is a 4+N byte struct holding 'sz' followed by 'compressed'.
 * 'sz' is byte length of 'compressed' field.
 * 'compressed' is LZF data with total (compressed) length 'sz'
 * NOTE: uncompressed length is stored in quicklistNode->sz.
 * When quicklistNode->zl is compressed, node->zl points to a quicklistLZF */
typedef struct quicklistLZF {
    unsigned int sz; /* LZF size in bytes*/
    char compressed[];
} quicklistLZF;

quicklistNode定义了前后指针,以及当前要指向的ziplist列表。当然,如果ziplist被压缩了,就指向压缩的数据quicklistLZF。

看到这儿,应该可以发现,quicklist还是比较复杂的,下面的一张图大体展示了该结构:

intset

当一个集合中的数据都是整数且较少的时候,就使用intset的数据结构。

当有非数字加入或者元素个数较多的时候,就会使用hashtable编码方式。

下面是一个简单的测试:

10.38.163.219:6379> smembers testset1
(empty list or set)
10.38.163.219:6379> sadd testset1 32 3 1 5
(integer) 4
10.38.163.219:6379> object encoding testset1
"intset"
10.38.163.219:6379> smembers testset1
1) "1"
2) "3"
3) "5"
4) "32"
10.38.163.219:6379> object encoding testset1
"intset"



10.38.163.219:6379> object encoding testset2
"hashtable"
10.38.163.219:6379> spop testset2
"hshs"
10.38.163.219:6379> smembers testset2
1) "1"
10.38.163.219:6379> object encoding testset2
"hashtable"
10.38.163.219:6379> object encoding testset2
"hashtable"
10.38.163.219:6379> object encoding testset2
"hashtable"

从上面的例子也可以看到,它采用intset的结构时,其实它是一个有序的数组,这样也方便使用二分查找。

一个intset集合是由encoding+length+数据数组组成的。

encoding表示每个数据元素采用几个字节存储;

length表示数组的元素个数;

下面是一个intset数据例子(来自参考资料):

看一下它的查找过程:

static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
    int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
    int64_t cur = -1;

    /* The value can never be found when the set is empty */
    if (intrev32ifbe(is->length) == 0) {
        if (pos) *pos = 0;
        return 0;
    } else {
        /* Check for the case where we know we cannot find the value,
         * but do know the insert position. */
        if (value > _intsetGet(is,max)) {
            if (pos) *pos = intrev32ifbe(is->length);
            return 0;
        } else if (value < _intsetGet(is,0)) {
            if (pos) *pos = 0;
            return 0;
        }
    }

    while(max >= min) {
        mid = ((unsigned int)min + (unsigned int)max) >> 1;
        cur = _intsetGet(is,mid);
        if (value > cur) {
            min = mid+1;
        } else if (value < cur) {
            max = mid-1;
        } else {
            break;
        }
    }

    if (value == cur) {
        if (pos) *pos = mid;
        return 1;
    } else {
        if (pos) *pos = min;
        return 0;
    }
}

intset的查找过程就是一个标准的二分查找。

然后是插入过程:

intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success = 1;

    /* Upgrade encoding if necessary. If we need to upgrade, we know that
     * this value should be either appended (if > 0) or prepended (if < 0),
     * because it lies outside the range of existing values. */
    if (valenc > intrev32ifbe(is->encoding)) {
        /* This always succeeds, so we don't need to curry *success. */
        return intsetUpgradeAndAdd(is,value);
    } else {
        /* Abort if the value is already present in the set.
         * This call will populate "pos" with the right position to insert
         * the value when it cannot be found. */
        if (intsetSearch(is,value,&pos)) {
            if (success) *success = 0;
            return is;
        }

        is = intsetResize(is,intrev32ifbe(is->length)+1);
        if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
    }

    _intsetSet(is,pos,value);
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}

插入之前是要检查对应元素是否已经存在,存在就不再执行。不存在的话,首先realloac内存,然后插入到对应位置上。看到这应该明白,为什么元素多的时候不再适用,因为每添加一次都要进行内存重分配。

内存重分配后,会将待插入位置之后的元素统一向尾部移动。最后执行插入操作。

skiplist

Redis最后一种结构就是跳跃表了,主要是用来存储有序集合。有序集合当元素较少时,是直接使用ziplist,否则就会改用skiplist和dict结构。

skiplist我就不写了,我觉得这篇文章写的非常好: http://zhangtielei.com/posts/blog-redis-skiplist.html

skiplist是之前Pugh论文提出的,感兴趣可以看看。直接来一张论文里的跳表图:

看下Redis的跳表数据结构:

把skiplist的数据结构放出来:

typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned long span;
    } level[];
} zskiplistNode;

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;

每一个节点都包含一个level数组,每个元素元素的是长度和指向的节点。

下面是完整的插入过程:

/* Insert a new node in the skiplist. Assumes the element does not already
 * exist (up to the caller to enforce that). The skiplist takes ownership
 * of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already inside or not. */
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}

理想情况下,我们希望第一层是N个节点,第二层是N>>1个节点,第三层是N>>2个节点,此次类推,最多是log2n层。但是在实际使用中,很难做的。因此Redis其在插入过程中,新节点的层数的产生也是根据注明的Pugh的论文做的。设置p的概率是0.25,如果产生的0-1随机数小于0.25,就增加一层,直到大于0.25返回层数。

int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

JAVA的实现方式:

        int rnd = ThreadLocalRandom.nextSecondarySeed();
        if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
            int level = 1, max;
            while (((rnd >>>= 1) & 1) != 0)
                ++level;

如果将上面插入过程拆分一下,主要分了几大步骤:

  1. 首先要找到每一层元素应该插入的位置。存到update数组中。层数字段在zskiplist有字段;
  2. 随机产生待插入节点的层数;
  3. 如果产生的层数大于当前跳表的最大层数,那么之上的层数的待插入位置更新为header地址,即更新update数组。
  4. 开始执行插入过程。
  5. 更新span ,length等字段。

讲完了几种结构,每种数据类型采用的编码方式如下:

字符串:raw SDS,embstr SDS,整型;

hash: ziplist ,哈希表;

列表:quicklist;

集合:intset,哈希表;

有序集合:ziplist,skiplist。

下面是书上的一个完整编码汇总:

REDIS_STRING REDIS_ENCODING_INT 使用整数值实现的字符串对象。
REDIS_STRING REDIS_ENCODING_EMBSTR 使用 embstr 编码的简单动态字符串实现的字符串对象。
REDIS_STRING REDIS_ENCODING_RAW 使用简单动态字符串实现的字符串对象。
REDIS_LIST REDIS_ENCODING_ZIPLIST 使用压缩列表实现的列表对象。
REDIS_LIST REDIS_ENCODING_LINKEDLIST 使用双端链表实现的列表对象。
REDIS_HASH REDIS_ENCODING_ZIPLIST 使用压缩列表实现的哈希对象。
REDIS_HASH REDIS_ENCODING_HT 使用字典实现的哈希对象。
REDIS_SET REDIS_ENCODING_INTSET 使用整数集合实现的集合对象。
REDIS_SET REDIS_ENCODING_HT 使用字典实现的集合对象。
REDIS_ZSET REDIS_ENCODING_ZIPLIST 使用压缩列表实现的有序集合对象。
REDIS_ZSET REDIS_ENCODING_SKIPLIST 使用跳跃表和字典实现的有序集合对象。

不同的数据对象类型,甚至统一对象类型不同的场景下,可以使用不同的编码格式,Redis尽最大所能去节省内存,提升性能。非常牛鼻!!!

除了上面的各种类型对象和编码,Redis还有一个重要的概念就是共享对象。Redis为了节约内存,弄了一个范围在 0-9999的整形内存池,以供各个对象实现共享,从而减少内存的分配。

参考资料:

Redis内部数据结构详解(2)——sds

美团针对Redis Rehash机制的探索和实践

对象类型和编码

Redis数据编码方式详解

Redis进阶不得不了解的内存优化细节

从零手写缓存框架(14)redis渐进式rehash详解

跳跃表

--------EOF---------
微信分享/微信扫码阅读