t_zset.c

作者: 生命就是个Bug | 来源:发表于2019-04-18 16:50 被阅读0次

    Redis的t_zset.c是对zset数据结构的实现。
    zset是由 dictzskiplist来实现的。

    当元素较少的时候,采用ziplist来实现zset
    当元素较多的时候,采用skiplist来实现zset

    redis.conf中存在如下配置:

    # Similarly to hashes and lists, sorted sets are also specially encoded in
    # order to save a lot of space. This encoding is only used when the length and
    # elements of a sorted set are below the following limits:
    zset-max-ziplist-entries 128
    zset-max-ziplist-value 64
    

    定义了ziplist转为zskiplist的边界。

    zset的定义如下:

    //有序集合
    typedef struct zset {
        //member和score的映射
        dict *dict;
        //跳跃表
        zskiplist *zsl;
    } zset;
    

    包含了一个dict和一个zsl
    dictkey存储的是member
    dictvalue存储的是score

    zskiplist的定义如下:

    //跳跃表
    typedef struct zskiplist {
        //头尾节点
        struct zskiplistNode *header, *tail;
        //节点个数
        unsigned long length;
        //最大层数
        int level;
    } zskiplist;
    

    zskiplistNode的定义如下:

    /* ZSETs use a specialized version of Skiplists */
    typedef struct zskiplistNode {
        //member
        sds ele;
        //score
        double score;
        //后向指针
        struct zskiplistNode *backward;
        //各层level
        struct zskiplistLevel {
            //前向指针,指向后续节点的同一层
            struct zskiplistNode *forward;
            //同一层跨度
            unsigned long span;
        } level[];
    } zskiplistNode;
    

    zset的结构图如下:


    zset.png

    1. 创建跳跃表

    /* Create a new skiplist. */
    //创建一个跳跃表zskiplist
    zskiplist *zslCreate(void) {
        int j;
        zskiplist *zsl;
        //一个跳跃表包含了:
        // zskiplistNode header
        // zskiplistNode tail
        // int level
        // int length
        //分配内存
        zsl = zmalloc(sizeof(*zsl));
        //初始层为1
        zsl->level = 1;
        //初始长度为0
        zsl->length = 0;
        //zskiplistNode包含了score、ele、backward、zskiplistLevel[]
        //创建一个ZSKIPLIST_MAXLEVEL层,分值为0,属性为null的节点
        zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL, 0, NULL);
        //zskiplistLevel数组中每个level包含了forward和span
        //初始化level数组
        //每层的forward为null,span为0
        for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
            zsl->header->level[j].forward = NULL;
            zsl->header->level[j].span = 0;
        }
        //backward为null
        zsl->header->backward = NULL;
        zsl->tail = NULL;
        return zsl;
    }
    

    zskiplist的组成部分包含:

    • 头尾zskiplistNode指针headertail
    • 节点总个数length
    • 最大层数level

    2. 创建跳跃表节点

    //创建一个跳跃表节点
    zskiplistNode *zslCreateNode(int level, double score, sds ele) {
        zskiplistNode *zn =
                zmalloc(sizeof(*zn) + level * sizeof(struct zskiplistLevel));
        zn->score = score;
        zn->ele = ele;
        return zn;
    }
    

    zskiplistNode的组成部分包含:

    • 成员member ele
    • 分数score
    • 后续节点backward
    • zskiplistLevel数组

    zskiplistLevel的组成部分包含:

    • 前序节点forward
    • 跨度span

    3. 插入元素

    /* Insert a new node in the skiplist. Assumes the element does not already
     * exist (up to the caller to enforce that). The skiplist takes ownership
     * of the passed SDS string 'ele'. */
    //插入一个节点到zskiplist中
    zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
        zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
        unsigned int rank[ZSKIPLIST_MAXLEVEL];
        int i, level;
    
        serverAssert(!isnan(score));
        x = zsl->header;
        //遍历各层,寻找插入的位置
        //用update数组来保存各层插入节点的位置
        for (i = zsl->level - 1; i >= 0; i--) {
            //如果i不是zsl->level-1层,那么i层的起始rank的i+1层的rank值
            //依此累计各层的rank
            //最终rank[0]+1就等于新节点的前置rank
            //rank[0]在后续计算span和rank值用到
            rank[i] = i == (zsl->level - 1) ? 0 : rank[i + 1];
            //forward不为空且分数小于插入节点的分数且member不相等
            while (x->level[i].forward &&
                   (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                     sdscmp(x->level[i].forward->ele, ele) < 0))) {
                //求一个节点的rank,是将此节点经过的节点的span加起来
                //a------>c
                //a-->b-->c
                //记录跨越的节点数
                rank[i] += x->level[i].span;
                //指向同一层的下一个节点
                x = x->level[i].forward;
            }
            //保存各层应该插入元素的位置
            update[i] = x;
        }
        /* we assume the element is not already inside, since we allow duplicated
         * scores, reinserting the same element should never happen since the
         * caller of zslInsert() should test in the hash table if the element is
         * already inside or not. */
        //随机一个层数
        level = zslRandomLevel();
        //如果新节点的层数大于原来跳跃表的最大层数
        if (level > zsl->level) {
            //更新大于跳跃表最大层数部分的属性
            for (i = zsl->level; i < level; i++) {
                //因为没有节点,所以排名为0
                rank[i] = 0;
                //因为没有节点,所以节点的前一个节点都是头节点
                update[i] = zsl->header;
                //未添加节点之前,需要更新的节点跨越的节点数为zsl->length
                //因为整层只有一个头节点
                //头节点的span都是链表长度
                update[i]->level[i].span = zsl->length;
            }
            //更新跳跃表的最大层数
            zsl->level = level;
        }
        //创建新节点,插入到第level层
        x = zslCreateNode(level, score, ele);
        //遍历所有层,从0到level层
        for (i = 0; i < level; i++) {
            //插入节点,同一层forward指针
            //N1   N2   N3            rank                    span
            //L3 ---------------->L3   4                 0     4              0
            //L2 ------>L2------->L2   4         2       0     2       2      0
            //L1------->L1-->L1-->L1   4         2   1   0     2       1   1  0
            //L0-->L0-->L0-->L0-->L0   4    3    2   1   0     1   1   1   1  0
            //插入节点,修改前后指针指向
            x->level[i].forward = update[i]->level[i].forward;
            update[i]->level[i].forward = x;
    
            /* header                update[i]     x    update[i]->forward
              |-----------|-----------|-----------|-----------|-----------|-----------|
                                      |<---update[i].span---->|
              |<-------rank[i]------->|
              |<-------------------rank[0]------------------->|
    
             */
            //每一列的rank是一样的
            //插入节点,前后节点的跨度也需要修改
            //rank[0] - rank[i]表示update[0]->store与update[1]->store之间间隔了几个数
            //rank[0]存储的是x元素距离头部的距离
            //rank[i]存储的是update[i]距离头部的距离
            x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
            //update[i]->level[i].span 表示从update[i]到x的span的数目
            update[i]->level[i].span = (rank[0] - rank[i]) + 1;
        }
    
        //将范围外的节点的span数+1
        for (i = level; i < zsl->level; i++) {
            update[i]->level[i].span++;
        }
    
        x->backward = (update[0] == zsl->header) ? NULL : update[0];
        if (x->level[0].forward)
            x->level[0].forward->backward = x;
        else
            zsl->tail = x;
        zsl->length++;
        return x;
    }
    

    4. 删除元素

    /* Delete an element with matching score/element from the skiplist.
     * The function returns 1 if the node was found and deleted, otherwise
     * 0 is returned.
     *
     * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
     * it is not freed (but just unlinked) and *node is set to the node pointer,
     * so that it is possible for the caller to reuse the node (including the
     * referenced SDS string at node->ele). */
    int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
        zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
        int i;
    
        x = zsl->header;
        //遍历,找到要删除的节点
        for (i = zsl->level - 1; i >= 0; i--) {
            while (x->level[i].forward &&
                   (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                     sdscmp(x->level[i].forward->ele, ele) < 0))) {
                x = x->level[i].forward;
            }
            update[i] = x;
        }
        /* We may have multiple elements with the same score, what we need
         * is to find the element with both the right score and object. */
        x = x->level[0].forward;
        if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
            zslDeleteNode(zsl, x, update);
            if (!node)
                zslFreeNode(x);
            else
                *node = x;
            return 1;
        }
        return 0; /* not found */
    }
    
    /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
    void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
        int i;
        //遍历所有的层level
        for (i = 0; i < zsl->level; i++) {
            //访问层上的节点,找到x,修改指针,span
            if (update[i]->level[i].forward == x) {
                update[i]->level[i].span += x->level[i].span - 1;
                update[i]->level[i].forward = x->level[i].forward;
            } else {
                update[i]->level[i].span -= 1;
            }
        }
        //删除x
        if (x->level[0].forward) {
            x->level[0].forward->backward = x->backward;
        } else {
            zsl->tail = x->backward;
        }
        //处理空层
        while (zsl->level > 1 && zsl->header->level[zsl->level - 1].forward == NULL)
            zsl->level--;
        zsl->length--;
    }
    

    5. ZADD、ZINCRBY命令

    //zdd命令 zadd key [NX|XX] [CH] [INCR] score member [score member ...]
    void zaddCommand(client *c) {
        zaddGenericCommand(c, ZADD_NONE);
    }
    //zincrby命令 zincrby key increment member
    void zincrbyCommand(client *c) {
        zaddGenericCommand(c, ZADD_INCR);
    }
    
    /* This generic command implements both ZADD and ZINCRBY. */
    void zaddGenericCommand(client *c, int flags) {
        static char *nanerr = "resulting score is not a number (NaN)";
        //获取key
        robj *key = c->argv[1];
        robj *zobj;
        sds ele;
        double score = 0, *scores = NULL;
        int j, elements;
        int scoreidx = 0;
        /* The following vars are used in order to track what the command actually
         * did during the execution, to reply to the client and to trigger the
         * notification of keyspace change. */
        int added = 0;      /* Number of new elements added. */
        int updated = 0;    /* Number of elements with updated score. */
        int processed = 0;  /* Number of elements processed, may remain zero with
                               options like XX. */
    
        /* Parse options. At the end 'scoreidx' is set to the argument position
         * of the score of the first score-element pair. */
        scoreidx = 2;
        //获取选项
        while (scoreidx < c->argc) {
            char *opt = c->argv[scoreidx]->ptr;
            if (!strcasecmp(opt, "nx")) flags |= ZADD_NX;
            else if (!strcasecmp(opt, "xx")) flags |= ZADD_XX;
            else if (!strcasecmp(opt, "ch")) flags |= ZADD_CH;
            else if (!strcasecmp(opt, "incr")) flags |= ZADD_INCR;
            else break;
            scoreidx++;
        }
    
        /* Turn options into simple to check vars. */
        int incr = (flags & ZADD_INCR) != 0;
        int nx = (flags & ZADD_NX) != 0;
        int xx = (flags & ZADD_XX) != 0;
        int ch = (flags & ZADD_CH) != 0;
    
        /* After the options, we expect to have an even number of args, since
         * we expect any number of score-element pairs. */
        elements = c->argc - scoreidx;
        if (elements % 2 || !elements) {
            addReply(c, shared.syntaxerr);
            return;
        }
        elements /= 2; /* Now this holds the number of score-element pairs. */
    
        /* Check for incompatible options. */
        if (nx && xx) {
            addReplyError(c,
                          "XX and NX options at the same time are not compatible");
            return;
        }
    
        if (incr && elements > 1) {
            addReplyError(c,
                          "INCR option supports a single increment-element pair");
            return;
        }
    
        /* Start parsing all the scores, we need to emit any syntax error
         * before executing additions to the sorted set, as the command should
         * either execute fully or nothing at all. */
        //获取score
        scores = zmalloc(sizeof(double) * elements);
        for (j = 0; j < elements; j++) {
            if (getDoubleFromObjectOrReply(c, c->argv[scoreidx + j * 2], &scores[j], NULL)
                != C_OK)
                goto cleanup;
        }
    
        //1.创建数据结构对象ziplist或者zset
        /* Lookup the key and create the sorted set if does not exist. */
        zobj = lookupKeyWrite(c->db, key);
        if (zobj == NULL) {
            if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
            //如果允许的ziplist上的entry最大个数为0或者member个数大于ziplist最大个数
            if (server.zset_max_ziplist_entries == 0 ||
                server.zset_max_ziplist_value < sdslen(c->argv[scoreidx + 1]->ptr)) {
                //则创建跳跃表
                zobj = createZsetObject();
            }
                //否则创建压缩列表
            else {
                zobj = createZsetZiplistObject();
            }
            dbAdd(c->db, key, zobj);
        } else {
            if (zobj->type != OBJ_ZSET) {
                addReply(c, shared.wrongtypeerr);
                goto cleanup;
            }
        }
    
        //2.存入数据
        for (j = 0; j < elements; j++) {
            double newscore;
            //score
            score = scores[j];
            int retflags = flags;
    
            //member
            ele = c->argv[scoreidx + 1 + j * 2]->ptr;
            int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
            if (retval == 0) {
                addReplyError(c, nanerr);
                goto cleanup;
            }
            if (retflags & ZADD_ADDED) added++;
            if (retflags & ZADD_UPDATED) updated++;
            if (!(retflags & ZADD_NOP)) processed++;
            score = newscore;
        }
        server.dirty += (added + updated);
    
        reply_to_client:
        if (incr) { /* ZINCRBY or INCR option. */
            if (processed)
                addReplyDouble(c, score);
            else
                addReply(c, shared.nullbulk);
        } else { /* ZADD. */
            addReplyLongLong(c, ch ? added + updated : added);
        }
    
        cleanup:
        zfree(scores);
        if (added || updated) {
            signalModifiedKey(c->db, key);
            notifyKeyspaceEvent(NOTIFY_ZSET,
                                incr ? "zincr" : "zadd", key, c->db->id);
        }
    }
    

    6. zrem命令

    //zrem命令 zrem key member [member...]
    void zremCommand(client *c) {
        //获取key
        robj *key = c->argv[1];
        robj *zobj;
        int deleted = 0, keyremoved = 0, j;
    
        //查找key
        if ((zobj = lookupKeyWriteOrReply(c, key, shared.czero)) == NULL ||
            checkType(c, zobj, OBJ_ZSET))
            return;
    
        //删除member
        for (j = 2; j < c->argc; j++) {
            if (zsetDel(zobj, c->argv[j]->ptr)) deleted++;
            //member数为0,删除key
            if (zsetLength(zobj) == 0) {
                dbDelete(c->db, key);
                keyremoved = 1;
                break;
            }
        }
    
        if (deleted) {
            notifyKeyspaceEvent(NOTIFY_ZSET, "zrem", key, c->db->id);
            if (keyremoved)
                notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id);
            signalModifiedKey(c->db, key);
            server.dirty += deleted;
        }
        addReplyLongLong(c, deleted);
    }
    
    
    /* Delete the element 'ele' from the sorted set, returning 1 if the element
     * existed and was deleted, 0 otherwise (the element was not there). */
    //删除节点
    int zsetDel(robj *zobj, sds ele) {
        //ziplist
        if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
            unsigned char *eptr;
    
            //在ziplist上查找
            if ((eptr = zzlFind(zobj->ptr, ele, NULL)) != NULL) {
                //删除member
                zobj->ptr = zzlDelete(zobj->ptr, eptr);
                return 1;
            }
        }
        //zskiplist
        else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
            zset *zs = zobj->ptr;
            dictEntry *de;
            double score;
            //从dict上卸载
            de = dictUnlink(zs->dict, ele);
            if (de != NULL) {
                /* Get the score in order to delete from the skiplist later. */
                score = *(double *) dictGetVal(de);
    
                /* Delete from the hash table and later from the skiplist.
                 * Note that the order is important: deleting from the skiplist
                 * actually releases the SDS string representing the element,
                 * which is shared between the skiplist and the hash table, so
                 * we need to delete from the skiplist as the final step. */
                dictFreeUnlinkedEntry(zs->dict, de);
    
                /* Delete from skiplist. */
                //从跳跃表上删除
                int retval = zslDelete(zs->zsl, score, ele, NULL);
                serverAssert(retval);
    
                if (htNeedsResize(zs->dict)) dictResize(zs->dict);
                return 1;
            }
        } else {
            serverPanic("Unknown sorted set encoding");
        }
        return 0; /* No such element found. */
    }
    

    7. zremrangebyrank、zremrangebyscore、zremrangebylex命令

    //zremrangebyrank命令 zremrangebyrank key start stop
    //移除指定排名区间的元素
    void zremrangebyrankCommand(client *c) {
        zremrangeGenericCommand(c, ZRANGE_RANK);
    }
    
    //zremrangebyscore命令 zremrangebyscore key min max
    //移除指针分数区间的元素
    void zremrangebyscoreCommand(client *c) {
        zremrangeGenericCommand(c, ZRANGE_SCORE);
    }
    
    //zremrangebylex命令 zremrangebylex min max
    //移除按照字典排序的指定区间的元素
    void zremrangebylexCommand(client *c) {
        zremrangeGenericCommand(c, ZRANGE_LEX);
    }
    
    #define ZRANGE_RANK 0
    #define ZRANGE_SCORE 1
    #define ZRANGE_LEX 2
    
    void zremrangeGenericCommand(client *c, int rangetype) {
        robj *key = c->argv[1];
        robj *zobj;
        int keyremoved = 0;
        unsigned long deleted = 0;
        zrangespec range;
        zlexrangespec lexrange;
        long start, end, llen;
    
        /* Step 1: Parse the range. */
        //zremrangebyrank key start stop
        if (rangetype == ZRANGE_RANK) {
            if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
                (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK))
                return;
        }
        //zremrangebyscore key min max
        else if (rangetype == ZRANGE_SCORE) {
            if (zslParseRange(c->argv[2], c->argv[3], &range) != C_OK) {
                addReplyError(c, "min or max is not a float");
                return;
            }
        }
        //zremrangebylex key min max
        else if (rangetype == ZRANGE_LEX) {
            if (zslParseLexRange(c->argv[2], c->argv[3], &lexrange) != C_OK) {
                addReplyError(c, "min or max not valid string range item");
                return;
            }
        }
    
        /* Step 2: Lookup & range sanity checks if needed. */
        //从dict中查找key
        if ((zobj = lookupKeyWriteOrReply(c, key, shared.czero)) == NULL ||
            checkType(c, zobj, OBJ_ZSET))
            goto cleanup;
    
        //rank索引处理
        if (rangetype == ZRANGE_RANK) {
            /* Sanitize indexes. */
            llen = zsetLength(zobj);
            if (start < 0) start = llen + start;
            if (end < 0) end = llen + end;
            if (start < 0) start = 0;
    
            /* Invariant: start >= 0, so this test will be true when end < 0.
             * The range is empty when start > end or start >= length. */
            if (start > end || start >= llen) {
                addReply(c, shared.czero);
                goto cleanup;
            }
            if (end >= llen) end = llen - 1;
        }
    
        /* Step 3: Perform the range deletion operation. */
        //ziplist类型
        if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
            switch (rangetype) {
                case ZRANGE_RANK:
                    zobj->ptr = zzlDeleteRangeByRank(zobj->ptr, start + 1, end + 1, &deleted);
                    break;
                case ZRANGE_SCORE:
                    zobj->ptr = zzlDeleteRangeByScore(zobj->ptr, &range, &deleted);
                    break;
                case ZRANGE_LEX:
                    zobj->ptr = zzlDeleteRangeByLex(zobj->ptr, &lexrange, &deleted);
                    break;
            }
            if (zzlLength(zobj->ptr) == 0) {
                dbDelete(c->db, key);
                keyremoved = 1;
            }
        }
        //zskiplist类型
        else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
            zset *zs = zobj->ptr;
            switch (rangetype) {
                case ZRANGE_RANK:
                    deleted = zslDeleteRangeByRank(zs->zsl, start + 1, end + 1, zs->dict);
                    break;
                case ZRANGE_SCORE:
                    deleted = zslDeleteRangeByScore(zs->zsl, &range, zs->dict);
                    break;
                case ZRANGE_LEX:
                    deleted = zslDeleteRangeByLex(zs->zsl, &lexrange, zs->dict);
                    break;
            }
            if (htNeedsResize(zs->dict)) dictResize(zs->dict);
            if (dictSize(zs->dict) == 0) {
                dbDelete(c->db, key);
                keyremoved = 1;
            }
        } else {
            serverPanic("Unknown sorted set encoding");
        }
    
        /* Step 4: Notifications and reply. */
        if (deleted) {
            char *event[3] = {"zremrangebyrank", "zremrangebyscore", "zremrangebylex"};
            signalModifiedKey(c->db, key);
            notifyKeyspaceEvent(NOTIFY_ZSET, event[rangetype], key, c->db->id);
            if (keyremoved)
                notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id);
        }
        server.dirty += deleted;
        addReplyLongLong(c, deleted);
    
        cleanup:
        if (rangetype == ZRANGE_LEX) zslFreeLexRange(&lexrange);
    }
    

    8. zrank、zrevrank命令

    //zrank命令 zrank key member
    void zrankCommand(client *c) {
        zrankGenericCommand(c, 0);
    }
    
    //zrevrank命令 zrevrank key member
    void zrevrankCommand(client *c) {
        zrankGenericCommand(c, 1);
    }
    
    void zrankGenericCommand(client *c, int reverse) {
        robj *key = c->argv[1];
        robj *ele = c->argv[2];
        robj *zobj;
        long rank;
    
        if ((zobj = lookupKeyReadOrReply(c, key, shared.nullbulk)) == NULL ||
            checkType(c, zobj, OBJ_ZSET))
            return;
    
        serverAssertWithInfo(c, ele, sdsEncodedObject(ele));
        //计算rank
        rank = zsetRank(zobj, ele->ptr, reverse);
        if (rank >= 0) {
            addReplyLongLong(c, rank);
        } else {
            addReply(c, shared.nullbulk);
        }
    }
    
    /* Given a sorted set object returns the 0-based rank of the object or
     * -1 if the object does not exist.
     *
     * For rank we mean the position of the element in the sorted collection
     * of elements. So the first element has rank 0, the second rank 1, and so
     * forth up to length-1 elements.
     *
     * If 'reverse' is false, the rank is returned considering as first element
     * the one with the lowest score. Otherwise if 'reverse' is non-zero
     * the rank is computed considering as element with rank 0 the one with
     * the highest score. */
    //member的排名
    long zsetRank(robj *zobj, sds ele, int reverse) {
        unsigned long llen;
        unsigned long rank;
    
        llen = zsetLength(zobj);
    
        //ziplist
        if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
            unsigned char *zl = zobj->ptr;
            unsigned char *eptr, *sptr;
    
            //取ziplist第一个元素
            eptr = ziplistIndex(zl, 0);
            serverAssert(eptr != NULL);
            //下一个元素
            sptr = ziplistNext(zl, eptr);
            serverAssert(sptr != NULL);
    
            rank = 1;
            //遍历
            while (eptr != NULL) {
                //比较,member一致返回
                if (ziplistCompare(eptr, (unsigned char *) ele, sdslen(ele)))
                    break;
                //排名+1
                rank++;
                //取下一个
                zzlNext(zl, &eptr, &sptr);
            }
    
            //reverse从后往前排
            if (eptr != NULL) {
                if (reverse)
                    return llen - rank;
                else
                    return rank - 1;
            } else {
                return -1;
            }
        }
    
        //zskiplist
        else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
            zset *zs = zobj->ptr;
            zskiplist *zsl = zs->zsl;
            dictEntry *de;
            double score;
    
            //在zset的dict上查找key,返回score
            de = dictFind(zs->dict, ele);
            if (de != NULL) {
                score = *(double *) dictGetVal(de);
                //通过member和score返回rank
                rank = zslGetRank(zsl, score, ele);
                /* Existing elements always have a rank. */
                serverAssert(rank != 0);
                if (reverse)
                    return llen - rank;
                else
                    return rank - 1;
            } else {
                return -1;
            }
        } else {
            serverPanic("Unknown sorted set encoding");
        }
    }
    

    9. zpopmin、zpopmax命令

    //zpopmin命令 zpopmin key [count] 取出最小的几个member
    void zpopminCommand(client *c) {
        if (c->argc > 3) {
            addReply(c, shared.syntaxerr);
            return;
        }
        genericZpopCommand(c, &c->argv[1], 1, ZSET_MIN, 0,
                           c->argc == 3 ? c->argv[2] : NULL);
    }
    
    //zpopmax命令 zpopmax key [count]  取出最大的几个member
    void zpopmaxCommand(client *c) {
        if (c->argc > 3) {
            addReply(c, shared.syntaxerr);
            return;
        }
        genericZpopCommand(c, &c->argv[1], 1, ZSET_MAX, 0,
                           c->argc == 3 ? c->argv[2] : NULL);
    }
    
    /* This command implements the generic zpop operation, used by:
     * ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX. This function is also used
     * inside blocked.c in the unblocking stage of BZPOPMIN and BZPOPMAX.
     *
     * If 'emitkey' is true also the key name is emitted, useful for the blocking
     * behavior of BZPOP[MIN|MAX], since we can block into multiple keys.
     *
     * The synchronous version instead does not need to emit the key, but may
     * use the 'count' argument to return multiple items if available. */
    void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg) {
        int idx;
        robj *key = NULL;
        robj *zobj = NULL;
        sds ele;
        double score;
        long count = 1;
    
        /* If a count argument as passed, parse it or return an error. */
        if (countarg) {
            if (getLongFromObjectOrReply(c, countarg, &count, NULL) != C_OK)
                return;
            if (count <= 0) {
                addReply(c, shared.emptymultibulk);
                return;
            }
        }
    
        /* Check type and break on the first error, otherwise identify candidate. */
        idx = 0;
        while (idx < keyc) {
            key = keyv[idx++];
            zobj = lookupKeyWrite(c->db, key);
            if (!zobj) continue;
            if (checkType(c, zobj, OBJ_ZSET)) return;
            break;
        }
    
        /* No candidate for zpopping, return empty. */
        if (!zobj) {
            addReply(c, shared.emptymultibulk);
            return;
        }
    
        void *arraylen_ptr = addDeferredMultiBulkLength(c);
        long arraylen = 0;
    
        /* We emit the key only for the blocking variant. */
        if (emitkey) addReplyBulk(c, key);
    
        //循环count次,pop元素
        do {
            //ziplist
            if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
                unsigned char *zl = zobj->ptr;
                unsigned char *eptr, *sptr;
                unsigned char *vstr;
                unsigned int vlen;
                long long vlong;
    
                /* Get the first or last element in the sorted set. */
                //根据where判断要开始的位置
                eptr = ziplistIndex(zl, where == ZSET_MAX ? -2 : 0);
                serverAssertWithInfo(c, zobj, eptr != NULL);
                serverAssertWithInfo(c, zobj, ziplistGet(eptr, &vstr, &vlen, &vlong));
                if (vstr == NULL)
                    ele = sdsfromlonglong(vlong);
                else
                    ele = sdsnewlen(vstr, vlen);
    
                /* Get the score. */
                sptr = ziplistNext(zl, eptr);
                serverAssertWithInfo(c, zobj, sptr != NULL);
                score = zzlGetScore(sptr);
            }
            //zskiplist
            else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
                zset *zs = zobj->ptr;
                zskiplist *zsl = zs->zsl;
                zskiplistNode *zln;
    
                //根据where判断要开始的位置
                zln = (where == ZSET_MAX ? zsl->tail :
                       zsl->header->level[0].forward);
    
                /* There must be an element in the sorted set. */
                serverAssertWithInfo(c, zobj, zln != NULL);
                ele = sdsdup(zln->ele);
                score = zln->score;
            } else {
                serverPanic("Unknown sorted set encoding");
            }
    
            serverAssertWithInfo(c, zobj, zsetDel(zobj, ele));
            server.dirty++;
    
            if (arraylen == 0) { /* Do this only for the first iteration. */
                char *events[2] = {"zpopmin", "zpopmax"};
                notifyKeyspaceEvent(NOTIFY_ZSET, events[where], key, c->db->id);
                signalModifiedKey(c->db, key);
            }
    
            addReplyBulkCBuffer(c, ele, sdslen(ele));
            addReplyDouble(c, score);
            sdsfree(ele);
            arraylen += 2;
    
            //如果没有member了,移除key
            if (zsetLength(zobj) == 0) {
                dbDelete(c->db, key);
                notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id);
                break;
            }
        } while (--count);
    
        setDeferredMultiBulkLength(c, arraylen_ptr, arraylen + (emitkey != 0));
    }
    

    10. bzpopmin、bzpopmax命令

    // BZPOPMIN key [key ...] timeout
    void bzpopminCommand(client *c) {
        blockingGenericZpopCommand(c, ZSET_MIN);
    }
    
    // BZPOPMAX key [key ...] timeout
    void bzpopmaxCommand(client *c) {
        blockingGenericZpopCommand(c, ZSET_MAX);
    }
    
    /* BZPOPMIN / BZPOPMAX actual implementation. */
    //阻塞移除最大最小的几个member
    void blockingGenericZpopCommand(client *c, int where) {
        robj *o;
        mstime_t timeout;
        int j;
    
        if (getTimeoutFromObjectOrReply(c, c->argv[c->argc - 1], &timeout, UNIT_SECONDS)
            != C_OK)
            return;
    
        for (j = 1; j < c->argc - 1; j++) {
            o = lookupKeyWrite(c->db, c->argv[j]);
            if (o != NULL) {
                if (o->type != OBJ_ZSET) {
                    addReply(c, shared.wrongtypeerr);
                    return;
                } else {
                    if (zsetLength(o) != 0) {
                        /* Non empty zset, this is like a normal ZPOP[MIN|MAX]. */
                        //调用zpopmin\zpopmax的实现
                        genericZpopCommand(c, &c->argv[j], 1, where, 1, NULL);
                        /* Replicate it as an ZPOP[MIN|MAX] instead of BZPOP[MIN|MAX]. */
                        rewriteClientCommandVector(c, 2,
                                                   where == ZSET_MAX ? shared.zpopmax : shared.zpopmin,
                                                   c->argv[j]);
                        return;
                    }
                }
            }
        }
    
        /* If we are inside a MULTI/EXEC and the zset is empty the only thing
         * we can do is treating it as a timeout (even with timeout 0). */
        if (c->flags & CLIENT_MULTI) {
            addReply(c, shared.nullmultibulk);
            return;
        }
    
        /* If the keys do not exist we must block */
        //如果key不存在,则阻塞
        blockForKeys(c, BLOCKED_ZSET, c->argv + 1, c->argc - 2, timeout, NULL, NULL);
    }
    

    相关文章

      网友评论

          本文标题:t_zset.c

          本文链接:https://www.haomeiwen.com/subject/svytgqtx.html