t_list.c

作者: 生命就是个Bug | 来源:发表于2019-04-17 11:28 被阅读0次

    Redis的t_list.c是对list数据结构的实现。
    在Redis3.2之前,list数据结构基于ziplistlinkedlist实现。
    在Redis3.2之后,list数据结构基于quicklist实现。
    quicklist 又基于 ziplist 实现。

    server.h中定义了如下:

    #define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old list encoding. */
    #define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
    #define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */
    

    redis.conf中存在如下配置:

    
    # Lists are also encoded in a special way to save a lot of space.
    # The number of entries allowed per internal list node can be specified
    # as a fixed maximum size or a maximum number of elements.
    # For a fixed maximum size, use -5 through -1, meaning:
    # -5: max size: 64 Kb  <-- not recommended for normal workloads
    # -4: max size: 32 Kb  <-- not recommended
    # -3: max size: 16 Kb  <-- probably not recommended
    # -2: max size: 8 Kb   <-- good
    # -1: max size: 4 Kb   <-- good
    # Positive numbers mean store up to _exactly_ that number of elements
    # per list node.
    # The highest performing option is usually -2 (8 Kb size) or -1 (4 Kb size),
    # but if your use case is unique, adjust the settings as necessary.
    list-max-ziplist-size -2
    
    # Lists may also be compressed.
    # Compress depth is the number of quicklist ziplist nodes from *each* side of
    # the list to *exclude* from compression.  The head and tail of the list
    # are always uncompressed for fast push/pop operations.  Settings are:
    # 0: disable all list compression
    # 1: depth 1 means "don't start compressing until after 1 node into the list,
    #    going from either the head or tail"
    #    So: [head]->node->node->...->node->[tail]
    #    [head], [tail] will always be uncompressed; inner nodes will compress.
    # 2: [head]->[next]->node->node->...->node->[prev]->[tail]
    #    2 here means: don't compress head or head->next or tail->prev or tail,
    #    but compress all nodes between them.
    # 3: [head]->[next]->[next]->node->node->...->node->[prev]->[prev]->[tail]
    # etc.
    list-compress-depth 0
    

    配置着quicklist上的每个ziplist上允许的entry的个数及压缩深度。

    1. 新增元素

    //push的通用实现,where表示方向头尾,lpush、rpush
    void pushGenericCommand(client *c, int where) {
        int j, pushed = 0;
        //根据key查找
        robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
    
        //类型校验
        if (lobj && lobj->type != OBJ_LIST) {
            addReply(c,shared.wrongtypeerr);
            return;
        }
    
        for (j = 2; j < c->argc; j++) {
            //不存在,则创建
            if (!lobj) {
                lobj = createQuicklistObject();
                //quicklist基于ziplist实现,此处设置的每个ziplist上允许的最多的entry个数
                quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
                                    server.list_compress_depth);
                //保存到dict中
                dbAdd(c->db,c->argv[1],lobj);
            }
            //插入元素
            listTypePush(lobj,c->argv[j],where);
            pushed++;
        }
        //响应client
        addReplyLongLong(c, (lobj ? listTypeLength(lobj) : 0));
        if (pushed) {
            char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
    
            signalModifiedKey(c->db,c->argv[1]);
            notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
        }
        server.dirty += pushed;
    }
    
    //lpush命令
    void lpushCommand(client *c) {
        pushGenericCommand(c,LIST_HEAD);
    }
    
    //rpush命令
    void rpushCommand(client *c) {
        pushGenericCommand(c,LIST_TAIL);
    }
    
    
    void pushxGenericCommand(client *c, int where) {
        int j, pushed = 0;
        robj *subject;
    
        if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
            checkType(c,subject,OBJ_LIST)) return;
    
        for (j = 2; j < c->argc; j++) {
            listTypePush(subject,c->argv[j],where);
            pushed++;
        }
    
        addReplyLongLong(c,listTypeLength(subject));
    
        if (pushed) {
            char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
            signalModifiedKey(c->db,c->argv[1]);
            notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
        }
        server.dirty += pushed;
    }
    
    //lpushx命令 Prepend a value to a list, only if the list exists
    void lpushxCommand(client *c) {
        pushxGenericCommand(c,LIST_HEAD);
    }
    
    //rpushx命令
    void rpushxCommand(client *c) {
        pushxGenericCommand(c,LIST_TAIL);
    }
    
    //linsert命令  LINSERT key BEFORE|AFTER pivot value
    //Insert an element before or after another element in a list
    //在指定元素之前或者之后插入元素
    void linsertCommand(client *c) {
        int where;
        robj *subject;
        listTypeIterator *iter;
        listTypeEntry entry;
        int inserted = 0;
    
        //where
        if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
            where = LIST_TAIL;
        } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
            where = LIST_HEAD;
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    
        //查找list
        if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
            checkType(c,subject,OBJ_LIST)) return;
    
        //查找目标元素
        iter = listTypeInitIterator(subject,0,LIST_TAIL);
        while (listTypeNext(iter,&entry)) {
            //查到了就insert
            if (listTypeEqual(&entry,c->argv[3])) {
                listTypeInsert(&entry,c->argv[4],where);
                inserted = 1;
                break;
            }
        }
        listTypeReleaseIterator(iter);
    
        if (inserted) {
            signalModifiedKey(c->db,c->argv[1]);
            notifyKeyspaceEvent(NOTIFY_LIST,"linsert",
                                c->argv[1],c->db->id);
            server.dirty++;
        } else {
            /* Notify client of a failed insert */
            addReply(c,shared.cnegone);
            return;
        }
    
        addReplyLongLong(c,listTypeLength(subject));
    }
    

    2. 删除元素

    //pop命令的通用实现 where表示方向,头或者尾  lpop、rpop
    void popGenericCommand(client *c, int where) {
        //查找key
        robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
        if (o == NULL || checkType(c,o,OBJ_LIST)) return;
    
        //删除元素
        robj *value = listTypePop(o,where);
        if (value == NULL) {
            addReply(c,shared.nullbulk);
        } else {
            char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
    
            addReplyBulk(c,value);
            decrRefCount(value);
            notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
            //如果删完之后,元素个数为0,则将key删除
            if (listTypeLength(o) == 0) {
                notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
                                    c->argv[1],c->db->id);
                dbDelete(c->db,c->argv[1]);
            }
            signalModifiedKey(c->db,c->argv[1]);
            server.dirty++;
        }
    }
    
    //lpop命令
    void lpopCommand(client *c) {
        popGenericCommand(c,LIST_HEAD);
    }
    
    //rpop命令
    void rpopCommand(client *c) {
        popGenericCommand(c,LIST_TAIL);
    }
    
    //ltrim命令 ltrim key start stop
    void ltrimCommand(client *c) {
        robj *o;
        long start, end, llen, ltrim, rtrim;
    
        //start和end
        if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
            (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
    
        //查找key及类型校验
        if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
            checkType(c,o,OBJ_LIST)) return;
        llen = listTypeLength(o);
    
        /* convert negative indexes */
        //索引转换
        if (start < 0) start = llen+start;
        if (end < 0) end = llen+end;
        if (start < 0) start = 0;
    
        /* Invariant: start >= 0, so this test will be true when end < 0.
         * The range is empty when start > end or start >= length. */
        if (start > end || start >= llen) {
            /* Out of range start or start > end result in empty list */
            ltrim = llen;
            rtrim = 0;
        } else {
            if (end >= llen) end = llen-1;
            ltrim = start;
            rtrim = llen-end-1;
        }
    
        /* Remove list elements to perform the trim */
        //移除非指定范围元素
        if (o->encoding == OBJ_ENCODING_QUICKLIST) {
            quicklistDelRange(o->ptr,0,ltrim);
            quicklistDelRange(o->ptr,-rtrim,rtrim);
        } else {
            serverPanic("Unknown list encoding");
        }
    
        notifyKeyspaceEvent(NOTIFY_LIST,"ltrim",c->argv[1],c->db->id);
        if (listTypeLength(o) == 0) {
            dbDelete(c->db,c->argv[1]);
            notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
        }
        signalModifiedKey(c->db,c->argv[1]);
        server.dirty++;
        addReply(c,shared.ok);
    }
    
    //lrem命令 lrem key count value
    void lremCommand(client *c) {
        robj *subject, *obj;
        //要移除的值
        obj = c->argv[3];
        long toremove;
        long removed = 0;
    
        //移除数
        if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != C_OK))
            return;
    
        //查找目标list
        subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
        if (subject == NULL || checkType(c,subject,OBJ_LIST)) return;
    
        listTypeIterator *li;
        //移除个数处理
        if (toremove < 0) {
            toremove = -toremove;
            li = listTypeInitIterator(subject,-1,LIST_HEAD);
        } else {
            li = listTypeInitIterator(subject,0,LIST_TAIL);
        }
    
        //迭代
        listTypeEntry entry;
        while (listTypeNext(li,&entry)) {
            //找到目标值
            if (listTypeEqual(&entry,obj)) {
                //删除
                listTypeDelete(li, &entry);
                server.dirty++;
                removed++;
                if (toremove && removed == toremove) break;
            }
        }
        listTypeReleaseIterator(li);
    
        if (removed) {
            signalModifiedKey(c->db,c->argv[1]);
            notifyKeyspaceEvent(NOTIFY_LIST,"lrem",c->argv[1],c->db->id);
        }
    
        //列表为空则清除key
        if (listTypeLength(subject) == 0) {
            dbDelete(c->db,c->argv[1]);
            notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
        }
    
        addReplyLongLong(c,removed);
    }
    

    3. 查找元素

    //lindex命令
    void lindexCommand(client *c) {
        //查找key
        robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
        if (o == NULL || checkType(c,o,OBJ_LIST)) return;
        long index;
        robj *value = NULL;
    
        //index处理
        if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
            return;
    
        //编码校验
        if (o->encoding == OBJ_ENCODING_QUICKLIST) {
            quicklistEntry entry;
            //查找
            if (quicklistIndex(o->ptr, index, &entry)) {
                if (entry.value) {
                    value = createStringObject((char*)entry.value,entry.sz);
                } else {
                    value = createStringObjectFromLongLong(entry.longval);
                }
                addReplyBulk(c,value);
                decrRefCount(value);
            } else {
                addReply(c,shared.nullbulk);
            }
        } else {
            serverPanic("Unknown list encoding");
        }
    }
    
    //lrange命令
    void lrangeCommand(client *c) {
        robj *o;
        long start, end, llen, rangelen;
    
        //start和end
        if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
            (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
    
        //查找key及类型校验
        if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
             || checkType(c,o,OBJ_LIST)) return;
        llen = listTypeLength(o);
    
        /* convert negative indexes */
        //索引转换
        if (start < 0) start = llen+start;
        if (end < 0) end = llen+end;
        if (start < 0) start = 0;
    
        /* Invariant: start >= 0, so this test will be true when end < 0.
         * The range is empty when start > end or start >= length. */
        if (start > end || start >= llen) {
            addReply(c,shared.emptymultibulk);
            return;
        }
        if (end >= llen) end = llen-1;
        rangelen = (end-start)+1;
    
        /* Return the result in form of a multi-bulk reply */
        addReplyMultiBulkLen(c,rangelen);
        //查找
        if (o->encoding == OBJ_ENCODING_QUICKLIST) {
            listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);
    
            while(rangelen--) {
                listTypeEntry entry;
                listTypeNext(iter, &entry);
                quicklistEntry *qe = &entry.entry;
                if (qe->value) {
                    addReplyBulkCBuffer(c,qe->value,qe->sz);
                } else {
                    addReplyBulkLongLong(c,qe->longval);
                }
            }
            listTypeReleaseIterator(iter);
        } else {
            serverPanic("List encoding is not QUICKLIST!");
        }
    }
    

    4. 修改元素

    //lset命令 lset key index value
    void lsetCommand(client *c) {
        //查找key
        robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
        if (o == NULL || checkType(c,o,OBJ_LIST)) return;
        long index;
        robj *value = c->argv[3];
    
        //index处理
        if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
            return;
    
        //编码校验
        if (o->encoding == OBJ_ENCODING_QUICKLIST) {
            quicklist *ql = o->ptr;
            //尝试查找并替换
            int replaced = quicklistReplaceAtIndex(ql, index,
                                                   value->ptr, sdslen(value->ptr));
            if (!replaced) {
                addReply(c,shared.outofrangeerr);
            } else {
                addReply(c,shared.ok);
                signalModifiedKey(c->db,c->argv[1]);
                notifyKeyspaceEvent(NOTIFY_LIST,"lset",c->argv[1],c->db->id);
                server.dirty++;
            }
        } else {
            serverPanic("Unknown list encoding");
        }
    }
    

    5. 移动元素及阻塞队列

    void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value) {
        /* Create the list if the key does not exist */
        if (!dstobj) {
            dstobj = createQuicklistObject();
            quicklistSetOptions(dstobj->ptr, server.list_max_ziplist_size,
                                server.list_compress_depth);
            dbAdd(c->db,dstkey,dstobj);
        }
        signalModifiedKey(c->db,dstkey);
        listTypePush(dstobj,value,LIST_HEAD);
        notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id);
        /* Always send the pushed value to the client. */
        addReplyBulk(c,value);
    }
    
    //rpoplpush命令 rpoplpush source destination
    void rpoplpushCommand(client *c) {
        robj *sobj, *value;
        //查找源list
        if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
            checkType(c,sobj,OBJ_LIST)) return;
    
        //如果为空,直接返回
        if (listTypeLength(sobj) == 0) {
            /* This may only happen after loading very old RDB files. Recent
             * versions of Redis delete keys of empty lists. */
            addReply(c,shared.nullbulk);
        } else {
            //查找目标list
            robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
            robj *touchedkey = c->argv[1];
    
            if (dobj && checkType(c,dobj,OBJ_LIST)) return;
            //源list pop一个元素
            value = listTypePop(sobj,LIST_TAIL);
            /* We saved touched key, and protect it, since rpoplpushHandlePush
             * may change the client command argument vector (it does not
             * currently). */
            incrRefCount(touchedkey);
            //目标list push一个元素
            rpoplpushHandlePush(c,c->argv[2],dobj,value);
    
            /* listTypePop returns an object with its refcount incremented */
            decrRefCount(value);
    
            /* Delete the source list when it is empty */
            notifyKeyspaceEvent(NOTIFY_LIST,"rpop",touchedkey,c->db->id);
            if (listTypeLength(sobj) == 0) {
                dbDelete(c->db,touchedkey);
                notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
                                    touchedkey,c->db->id);
            }
            signalModifiedKey(c->db,touchedkey);
            decrRefCount(touchedkey);
            server.dirty++;
            if (c->cmd->proc == brpoplpushCommand) {
                rewriteClientCommandVector(c,3,shared.rpoplpush,c->argv[1],c->argv[2]);
            }
        }
    }
    
    /* Blocking RPOP/LPOP */
    //blpop和brpop命令的通用实现
    void blockingPopGenericCommand(client *c, int where) {
        robj *o;
        mstime_t timeout;
        int j;
    
        //超时时间timeout
        if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
            != C_OK) return;
    
        //多个key
        for (j = 1; j < c->argc-1; j++) {
            o = lookupKeyWrite(c->db,c->argv[j]);
            if (o != NULL) {
                if (o->type != OBJ_LIST) {
                    addReply(c,shared.wrongtypeerr);
                    return;
                } else {
                    if (listTypeLength(o) != 0) {
                        /* Non empty list, this is like a non normal [LR]POP. */
                        char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
                        robj *value = listTypePop(o,where);
                        serverAssert(value != NULL);
    
                        addReplyMultiBulkLen(c,2);
                        addReplyBulk(c,c->argv[j]);
                        addReplyBulk(c,value);
                        decrRefCount(value);
                        notifyKeyspaceEvent(NOTIFY_LIST,event,
                                            c->argv[j],c->db->id);
                        if (listTypeLength(o) == 0) {
                            dbDelete(c->db,c->argv[j]);
                            notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
                                                c->argv[j],c->db->id);
                        }
                        signalModifiedKey(c->db,c->argv[j]);
                        server.dirty++;
    
                        /* Replicate it as an [LR]POP instead of B[LR]POP. */
                        rewriteClientCommandVector(c,2,
                            (where == LIST_HEAD) ? shared.lpop : shared.rpop,
                            c->argv[j]);
                        return;
                    }
                }
            }
        }
    
        /* If we are inside a MULTI/EXEC and the list is empty the only thing
         * we can do is treating it as a timeout (even with timeout 0). */
        if (c->flags & CLIENT_MULTI) {
            addReply(c,shared.nullmultibulk);
            return;
        }
    
        //阻塞类型的key存在dict *blocking_keys 中
        /* If the list is empty or the key does not exists we must block */
        blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
    }
    
    //blpop命令
    void blpopCommand(client *c) {
        blockingPopGenericCommand(c,LIST_HEAD);
    }
    
    //brpop命令
    void brpopCommand(client *c) {
        blockingPopGenericCommand(c,LIST_TAIL);
    }
    
    //brpoplpush命令
    void brpoplpushCommand(client *c) {
        mstime_t timeout;
    
        //timeout
        if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)
            != C_OK) return;
    
        //key
        robj *key = lookupKeyWrite(c->db, c->argv[1]);
    
        if (key == NULL) {
            if (c->flags & CLIENT_MULTI) {
                /* Blocking against an empty list in a multi state
                 * returns immediately. */
                addReply(c, shared.nullbulk);
            } else {
                /* The list is empty and the client blocks. */
                //存在此key,就block
                blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,timeout,c->argv[2],NULL);
            }
        } else {
            if (key->type != OBJ_LIST) {
                addReply(c, shared.wrongtypeerr);
            } else {
                /* The list exists and has elements, so
                 * the regular rpoplpushCommand is executed. */
                serverAssertWithInfo(c,key,listTypeLength(key) > 0);
                rpoplpushCommand(c);
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:t_list.c

          本文链接:https://www.haomeiwen.com/subject/sucgwqtx.html