redis quicklist

作者: 烨哥 | 来源:发表于2018-03-15 14:55 被阅读44次

    简介

    A generic doubly linked quicklist implementation
    A doubly linked list of ziplists
    根据源代码里面给我出的解释就是这个是一个ziplist组成的双向链表。
    每个节点使用ziplist来保存数据。
    根据咱们的编程经验来说。一个链表使用另外一种链表来作为节点,那么本质上来说,quicklist里面保存着一个一个小的ziplist。
    但是为啥不直接用ziplist喃。本身ziplist就是一个双向链表。并且额外的信息还特别的少。
    其实原因很简单。根据我们上一节讲的ziplist来看,ziplist在我们程序里面来看将会是一块连续的内存块。它使用内存偏移来保存next从而节约了next指针。这样造成了我们每一次的删除插入操作都会进行remalloc,从而分配一块新的内存块。当我们的ziplist特别大的时候。没有这么大空闲的内存块给我们的时候。操作系统其实会抽象出一块连续的内存块给我。在底层来说他其实是一个链表链接成为的内存。不过在我们程序使用来说。他还是一块连续的内存。这样的话会造成内存碎片,并且在操作的时候因为内存不连续等原因造成效率问题。或者因为转移到大内存块等进行数据迁移。从而损失性能。
    所以quicklist是对ziplist进行一次封装,使用小块的ziplist来既保证了少使用内存,也保证了性能。

    数据结构

    /* quicklistNode is a 32 byte struct describing a ziplist for a quicklist.
     * We use bit fields keep the quicklistNode at 32 bytes.
     * count: 16 bits, max 65536 (max zl bytes is 65k, so max count actually < 32k).
     * encoding: 2 bits, RAW=1, LZF=2.
     * container: 2 bits, NONE=1, ZIPLIST=2.
     * recompress: 1 bit, bool, true if node is temporarry decompressed for usage.
     * attempted_compress: 1 bit, boolean, used for verifying during testing.
     * extra: 12 bits, free for future use; pads out the remainder of 32 bits */
    typedef struct quicklistNode {
        struct quicklistNode *prev; //上一个node节点
        struct quicklistNode *next; //下一个node
        unsigned char *zl;            //保存的数据 压缩前ziplist 压缩后压缩的数据
        unsigned int sz;             /* ziplist size in bytes */
        unsigned int count : 16;     /* count of items in ziplist */
        unsigned int encoding : 2;   /* RAW==1 or LZF==2 */
        unsigned int container : 2;  /* NONE==1 or ZIPLIST==2 */
        unsigned int recompress : 1; /* was this node previous compressed? */
        unsigned int attempted_compress : 1; /* node can't compress; too small */
        unsigned int extra : 10; /* more bits to steal for future usage */
    } quicklistNode;
    
    /* quicklistLZF is a 4+N byte struct holding 'sz' followed by 'compressed'.
     * 'sz' is byte length of 'compressed' field.
     * 'compressed' is LZF data with total (compressed) length 'sz'
     * NOTE: uncompressed length is stored in quicklistNode->sz.
     * When quicklistNode->zl is compressed, node->zl points to a quicklistLZF */
    typedef struct quicklistLZF {
        unsigned int sz; /* LZF size in bytes*/
        char compressed[];
    } quicklistLZF;
    
    /* quicklist is a 32 byte struct (on 64-bit systems) describing a quicklist.
     * 'count' is the number of total entries.
     * 'len' is the number of quicklist nodes.
     * 'compress' is: -1 if compression disabled, otherwise it's the number
     *                of quicklistNodes to leave uncompressed at ends of quicklist.
     * 'fill' is the user-requested (or default) fill factor. */
    typedef struct quicklist {
        quicklistNode *head; //头结点
        quicklistNode *tail; //尾节点
        unsigned long count;        /* total count of all entries in all ziplists */
        unsigned int len;           /* number of quicklistNodes */
        int fill : 16;              /* fill factor for individual nodes *///负数代表级别,正数代表个数
        unsigned int compress : 16; /* depth of end nodes not to compress;0=off *///压缩级别
    } quicklist;
    

    说起来这quickList就是一个标准的双向链表的配置,有head 有tail node里面有prev 有next 其他的参数都长度或者一些特别标识。总体来说,我们的quicklist会很简单。
    看到这里大家基本都可以猜到插入查找删除等操作了

    插入

    由于咱们的quicklist是一个标准的双向链表,所以他的插入操作的原理会和普通的双向链表插入很像,不过喃由于他使用了其他链表来作为节点。所以喃,我们操作就是找到我们插入的节点。然后进行ziplist的插入就ok了。

    /* Optimization levels for size-based filling */
    static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536};
    
    /* Maximum size in bytes of any multi-element ziplist.
     * Larger values will live in their own isolated ziplists. */
    #define SIZE_SAFETY_LIMIT 8192
    
    REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node,
                                               const int fill, const size_t sz) {
        if (unlikely(!node))
            return 0;
    
        int ziplist_overhead;
        /* size of previous offset */
        if (sz < 254)    //根据我们ziplist的经验 对prvlen进行编码 小于254一位 大于等于254 五位
            ziplist_overhead = 1;
        else
            ziplist_overhead = 5;
    
        /* size of forward offset */
        if (sz < 64)        //编码字节的size  根据我们对len的编码
            ziplist_overhead += 1;
        else if (likely(sz < 16384))
            ziplist_overhead += 2;
        else
            ziplist_overhead += 5;
    
        //不过这个大小只是一个大概的猜想。因为我们知道我们ziplist里面还有很多特殊的操作。这个算出来的是一个最大的大小。总体来说只会比这个小,不会比这个大的
        /* new_sz overestimates if 'sz' encodes to an integer type */
        unsigned int new_sz = node->sz + sz + ziplist_overhead; //计算出一个大概新插入后的大小
        if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))
            return 1;
        else if (!sizeMeetsSafetyLimit(new_sz))//判断是不是个安全的大小
            return 0;
        else if ((int)node->count < fill)//这才来判断个数
            return 1;
        else
            return 0;
    }
    
    REDIS_STATIC int
    _quicklistNodeSizeMeetsOptimizationRequirement(const size_t sz,
                                                   const int fill) {
        if (fill >= 0)//大于等于0 是个数
            return 0;
    
        size_t offset = (-fill) - 1;//小于等于0 级别
        if (offset < (sizeof(optimization_level) / sizeof(*optimization_level))) {
            if (sz <= optimization_level[offset]) {
                return 1;
            } else {
                return 0;
            }
        } else {
            return 0;
        }
    }
    
    REDIS_STATIC void __quicklistInsertNode(quicklist *quicklist,
                                            quicklistNode *old_node,
                                            quicklistNode *new_node, int after) {
        if (after) { //插入方式
            new_node->prev = old_node;//设置节点关系
            if (old_node) { //如果原来的节点有咯
                new_node->next = old_node->next;//设置next
                if (old_node->next)
                    old_node->next->prev = new_node;
                old_node->next = new_node;
            }
            if (quicklist->tail == old_node)
                quicklist->tail = new_node;
        } else {
            new_node->next = old_node;
            if (old_node) {
                new_node->prev = old_node->prev;
                if (old_node->prev)
                    old_node->prev->next = new_node;
                old_node->prev = new_node;
            }
            if (quicklist->head == old_node)
                quicklist->head = new_node;
        }
        /* If this insert creates the only element so far, initialize head/tail. */
        if (quicklist->len == 0) {//第一次加入 设置成头和尾
            quicklist->head = quicklist->tail = new_node;
        }
    
        if (old_node)//加满了之后进行压缩操作
            quicklistCompress(quicklist, old_node);
    
        quicklist->len++;
    }
    /* Add new entry to head node of quicklist.
     *
     * Returns 0 if used existing head.
     * Returns 1 if new head created. */
    int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
        quicklistNode *orig_head = quicklist->head;//当前的head
        if (likely(
                _quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {//看下head让不让你插入
            quicklist->head->zl =
                ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);//让你插入就插
            quicklistNodeUpdateSz(quicklist->head);//更新长度
        } else {
            quicklistNode *node = quicklistCreateNode();//不让插新建一个node
            node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);//在新的ziplist插入
    
            quicklistNodeUpdateSz(node);//更新sz
            _quicklistInsertNodeBefore(quicklist, quicklist->head, node);//插入了新节点,维护quicklist的node关系,如果有压缩啥的进行压缩
        }
        quicklist->count++;//cout++
        quicklist->head->count++;//cout++
        return (orig_head != quicklist->head);
    }
    

    相对于这种标准的双向链表来说,插入就变得非常简单了。首先是确定插入头或者尾,插入的时候先判断该节点能否插入。能否插入喃是有几个标准的。第一个是不是设置了fill fill为负数的时候代表级别,每个级别ziplist能容纳的大小喃在上面有。如果没有设置的话。就看有没大于安全的标准大小。没有的话再看fill是不是正数,正数代表ziplist能够容纳的个数。这样判断完成能不能够进行该节点的插入。不能的话就创建一个新的节点,把数据插入进去。然后维护一下quicklist的双向链表关系。对老节点看下是否进行压缩操作等。

    查找操作

    list的查找操作主要是对index的
    我们的quicklist的节点是由一个一个的ziplist构成的每个ziplist都有大小。所以我们就只需要先根据我们每个node的个数,从而找到对应的ziplist,调用ziplist的index就能成功找到。

    typedef struct quicklistEntry {
        const quicklist *quicklist;//那个quicklist
        quicklistNode *node; //哪一个node
        unsigned char *zi; //找到在ziplist的位置
        unsigned char *value;//str value
        long long longval; //int value
        unsigned int sz; //svalue len
        int offset;//对于ziplist的index
    } quicklistEntry;
    
    
    /* Populate 'entry' with the element at the specified zero-based index
     * where 0 is the head, 1 is the element next to head
     * and so on. Negative integers are used in order to count
     * from the tail, -1 is the last element, -2 the penultimate
     * and so on. If the index is out of range 0 is returned.
     *
     * Returns 1 if element found
     * Returns 0 if element not found */
    int quicklistIndex(const quicklist *quicklist, const long long idx,
                       quicklistEntry *entry) {
        quicklistNode *n;
        unsigned long long accum = 0;
        unsigned long long index;
        int forward = idx < 0 ? 0 : 1; /* < 0 -> reverse, 0+ -> forward */ //向前还是像后
    
        initEntry(entry);
        entry->quicklist = quicklist;//设置list
    
        if (!forward) {//方向
            index = (-idx) - 1;
            n = quicklist->tail;
        } else {
            index = idx;
            n = quicklist->head;
        }
    
        if (index >= quicklist->count)//有米有这么多个
            return 0;
    
        while (likely(n)) { //找到对应的位置
            if ((accum + n->count) > index) {
                break;
            } else {
                D("Skipping over (%p) %u at accum %lld", (void *)n, n->count,
                  accum);
                accum += n->count;
                n = forward ? n->next : n->prev;//下一个node
            }
        }
    
        if (!n)//没找到
            return 0;
    
        D("Found node: %p at accum %llu, idx %llu, sub+ %llu, sub- %llu", (void *)n,
          accum, index, index - accum, (-index) - 1 + accum);
    
        entry->node = n;//设置节点
        if (forward) {//
            /* forward = normal head-to-tail offset. */
            entry->offset = index - accum;//位置 即ziplist的index
        } else {
            /* reverse = need negative offset for tail-to-head, so undo
             * the result of the original if (index < 0) above. */
            entry->offset = (-index) - 1 + accum;
        }
    
        quicklistDecompressNodeForUse(entry->node);//需要解压就解压
        entry->zi = ziplistIndex(entry->node->zl, entry->offset);//找到节点块
        ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval);//获取节点的值 反编码
        /* The caller will use our result, so we don't re-compress here.
         * The caller can recompress or delete the node as needed. */
        return 1;
    }
    

    删除

    /* Delete a range of elements from the quicklist.
     *
     * elements may span across multiple quicklistNodes, so we
     * have to be careful about tracking where we start and end.
     *
     * Returns 1 if entries were deleted, 0 if nothing was deleted. */
    int quicklistDelRange(quicklist *quicklist, const long start,
                          const long count) {
        if (count <= 0)
            return 0;
    
        unsigned long extent = count; /* range is inclusive of start position *///默认你删的个数是对的
    
        if (start >= 0 && extent > (quicklist->count - start)) {//但是喃没有这么多给你删
            /* if requesting delete more elements than exist, limit to list size. */
            extent = quicklist->count - start;
        } else if (start < 0 && extent > (unsigned long)(-start)) {
            /* else, if at negative offset, limit max size to rest of list. */
            extent = -start; /* c.f. LREM -29 29; just delete until end. */
        }
    
        quicklistEntry entry;
        if (!quicklistIndex(quicklist, start, &entry))//先找一下位置
            return 0;
    
        D("Quicklist delete request for start %ld, count %ld, extent: %ld", start,
          count, extent);
        quicklistNode *node = entry.node;//拿到要删的第一个node
    
        /* iterate over next nodes until everything is deleted. */
        while (extent) {
            quicklistNode *next = node->next;//下一个节点喃 都是往一个方向删除哈
    
            unsigned long del;
            int delete_entire_node = 0;
            if (entry.offset == 0 && extent >= node->count) {//他是第一个节点 并且要删的节点数大于这个node保存的节点数 就把这个node一起干掉
                /* If we are deleting more than the count of this node, we
                 * can just delete the entire node without ziplist math. */
                delete_entire_node = 1;
                del = node->count;
            } else if (entry.offset >= 0 && extent >= node->count) {//要删的个数大于 并且是正向来的
                /* If deleting more nodes after this one, calculate delete based
                 * on size of current node. */
                del = node->count - entry.offset;
            } else if (entry.offset < 0) {//如果是反向的话
                /* If offset is negative, we are in the first run of this loop
                 * and we are deleting the entire range
                 * from this start offset to end of list.  Since the Negative
                 * offset is the number of elements until the tail of the list,
                 * just use it directly as the deletion count. */
                del = -entry.offset;
    
                /* If the positive offset is greater than the remaining extent,
                 * we only delete the remaining extent, not the entire offset.
                 */
                if (del > extent)//看下能删多少个
                    del = extent;
            } else {
                /* else, we are deleting less than the extent of this node, so
                 * use extent directly. */
                del = extent;
            }
    
            D("[%ld]: asking to del: %ld because offset: %d; (ENTIRE NODE: %d), "
              "node count: %u",
              extent, del, entry.offset, delete_entire_node, node->count);
    
            if (delete_entire_node) {//删节点
                __quicklistDelNode(quicklist, node);
            } else {
                quicklistDecompressNodeForUse(node);//解压
                node->zl = ziplistDeleteRange(node->zl, entry.offset, del);//ziplist删range
                quicklistNodeUpdateSz(node);//更新大小
                node->count -= del;//更新个数
                quicklist->count -= del;//更新个数
                quicklistDeleteIfEmpty(quicklist, node);//看下节点是不是空了
                if (node)
                    quicklistRecompressOnly(quicklist, node);//压缩回去
            }
    
            extent -= del;//减除已经删除了的
    
            node = next;//继续进行删除
    
            entry.offset = 0;
        }
        return 1;
    }
    

    这里给出的是删除一个range
    我们的操作也很简单。
    1.先把要删除的个数确定下来。可能会存在删除的个数过多,其实没有那么多的情况。
    2.找到要开始删除的节点。删除的情况其实特别的就是获取当前的offset为0,要删除的个数大于等于当前节点的数据个数。这样就把它全部删除。否则的话就判断当前节点需要删除几个

    总结

    quciklist把ziplist进行了封装,保证了ziplist的最大大小。从而压缩了内存。进一步的提高了效率。

    magicdeMacBook-Pro:~ magic$ redis-cli
    127.0.0.1:6379> rpush xislist 'xistest'
    (integer) 2
    127.0.0.1:6379> object encoding xislist
    "quicklist"
    127.0.0.1:6379>

    其实我们可以发现。我们的list就是使用的quicklist.
    参照方法。我们就可以知道我们的,命令的效率。

    相关文章

      网友评论

        本文标题:redis quicklist

        本文链接:https://www.haomeiwen.com/subject/xpbfqftx.html