美文网首页码农的世界程序员
Redis源码阅读笔记(6)-跳跃表

Redis源码阅读笔记(6)-跳跃表

作者: 喵帕斯0_0 | 来源:发表于2018-08-12 23:56 被阅读0次

跳跃表在平常的数据结构和算法中比较少接触,它是一种有序的数据结构,通过建立类似索引的来实现快速的查找,其支持O(LogN)的平均时间复杂度,最坏O(N)的时间复杂度;在大部分情况下,其可以与平衡树相媲美,而且由于实现较平衡树简单,因此常常作为平衡树的替代方案。

设计的主要代码:

  1. redis.h
  2. t_zset.h

跳跃表的定义

我们知道,即使对于一个有序链表,其查询一个元素的时间复杂度为O(N),即使是排序的,也不能通过二分查找来缩短查找的时间。如下图,查找节点15需要比较8个节点。


链表

那么有没有其他优化的算法呢?我们可以利用索引的原理,在链表上提取部分节点建立索引,如下图,只需要比较6个节点。


一级索引

根据这个思路,可以在一级索引之上,再提取部分节点作为二级索引,如下图,只需要比较4个节点


二级索引

在此基础上,再往上建立一层,如下图,只需要比较3个节点。

三级索引

以上就是跳跃表的思想,通过增加多层索引,以空间换时间,加快节点的查询。图中元素不多,查找速度的加快没有体现出,如果节点一多,那跳跃表的优势就体现出来了。

跳跃表具有如下性质:

  1. 以空间换时间,由多层组成;
  2. 每一层都是有序链表;
  3. 最后一层包含所有元素;
  4. 如果节点node出现在layer n出现,那么该节点在layer n之下的链表层一定会出现;
  5. 每一个节点包含两个指针,一个指向同一层链表的下一个节点,另一个指向下一层的节点。

跳跃表的实现

//跳跃表节点定义
typedef struct zskiplistNode {
    robj *obj;      //成员对象
    double score;   //分值  主要按分值来排序,其次才按对象
    struct zskiplistNode *backward; //回退指针
    struct zskiplistLevel {             //层级,按1~32层随机指定
        struct zskiplistNode *forward;  //层级的前向指着
        unsigned int span;              //跨度
    } level[];
} zskiplistNode;

//管理跳跃表
typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;              //最大节点层数
} zskiplist;

Redis的跳跃表由zskiplistNodezskiplist组成,其图像结构如下:

一个简单跳跃表

其中zskiplist用来管理跳跃表,其包含了跳跃表的相关信息:

  1. header:指向跳跃表的表头;
  2. tail:指向跳跃表的表尾;
  3. level:记录该跳跃表目前的最大层级(表头节点不计算在内);
  4. length:记录了该跳跃表的长度。

zskiplistNode是实际跳跃表的节点,其记录了节点信息:

  1. obj:节点保存的对象;
  2. score:节点排序的分值,节点的排序按照该值从小到大排序,如果该值相同,则按照obj的大小来排序;
  3. backward:最底层是一个双向链表,用来从表尾向表头遍历节点用,因为每个节点只有一个后退指针,因此每次只能后退一个节点;
  4. level.forward:前向指针,用来保存同层下一个节点的地址;
  5. level.span记录了同层节点之间的距离span,即在最底层两节点相距多少个节点,用来快速计算节点的rank(即节点所在的index)时使用。指向null的span为0

相关实现细节

创建和销毁
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
//跳跃表节点创建函数
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
    zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));       //申请内存
    zn->score = score;
    zn->obj = obj;
    return zn;
}

//创建跳跃表
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;

    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);     //创建第一列节点,固定32层级,保存用来执行每层的头结点
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {  //初始化
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}

//释放节点
void zslFreeNode(zskiplistNode *node) {
    decrRefCount(node->obj);        //减少对象被引用引用数
    zfree(node);
}
//释放跳跃表
void zslFree(zskiplist *zsl) {
    zskiplistNode *node = zsl->header->level[0].forward, *next;

    zfree(zsl->header);         //释放第一列32层级的表头结点
    while(node) {
        next = node->level[0].forward;
        zslFreeNode(node);
        node = next;
    }
    zfree(zsl);
}

Redis在创建跳跃表的时候,会创建一个32层的表头节点,该表头节点用来指向各层的开始节点,初始化时,该节点的每一层的forward都指向NULLspan为0。

插入节点
//生成1~32的随机数
int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

//插入节点,从高往低计算,从低层往高层插入,从span可以计算出每个节点的rank
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    redisAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {                               //从最高层往下一层一层的插入
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
            rank[i] += x->level[i].span;            //记录在要插入的前节点的排名
            x = x->level[i].forward;
        }
        update[i] = x;          //记录每一层插入的前一节点
    }
    level = zslRandomLevel();           //设置该节点level
    if (level > zsl->level) {           //level比当前的level还高的
        for (i = zsl->level; i < level; i++) {         //设置新添层的rank和update
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,obj);         //创建节点
    for (i = 0; i < level; i++) {               //从低往高处理
        x->level[i].forward = update[i]->level[i].forward;              //设置每层该节点的forward指针
        update[i]->level[i].forward = x;                                //改变前一节点forward

        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);      //设置该层本节点span,因为有要插入的位置之前的节点rank信息
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;                     //设置前一节点的span
    }

    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}

Redis在添加节点的时候,该节点所拥有的层数通过zslRandomLevel计算得到的(作者注释该随机数满足powerlaw-alike 分布,还没研究,有空研究研究)。
每次插入节点,要更改每一层同层前置节点的forward和span。

删除节点
//删除节点
/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    //从底层往高层删
    for (i = 0; i < zsl->level; i++) {
        if (update[i]->level[i].forward == x) {
            update[i]->level[i].span += x->level[i].span - 1;           //本节点被删除,所以要减一
            update[i]->level[i].forward = x->level[i].forward;          //设置forward
        } else {                                                        //当前层级不包含x节点,只需减一
            update[i]->level[i].span -= 1;
        }
    }
    if (x->level[0].forward) {
        x->level[0].forward->backward = x->backward;
    } else {
        zsl->tail = x->backward;
    }
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;               //看最高层级是否需要减1
    zsl->length--;
}

//指定要删除的节点
/* Delete an element with matching score/object from the skiplist. */
int zslDelete(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {           //寻找该节点可以插入的前一节点
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,obj) < 0)))
            x = x->level[i].forward;
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    x = x->level[0].forward;            //还要再次确认是否存在该节点
    if (x && score == x->score && equalStringObjects(x->obj,obj)) {
        zslDeleteNode(zsl, x, update);
        zslFreeNode(x);
        return 1;
    }
    return 0; /* not found */
}

删除节点比较简单,首先要寻找每一层(该节点存在的每一层)该节点的前置节点,然后更改每一个同层节点前置节点的forwardspan

获取对应节点的rank以及返回获取rank为指定值的节点
//获取值对应的rank
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
    zskiplistNode *x;
    unsigned long rank = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
            rank += x->level[i].span;
            x = x->level[i].forward;
        }

        /* x might be equal to zsl->header, so test if obj is non-NULL */
        if (x->obj && equalStringObjects(x->obj,o)) {
            return rank;
        }
    }
    return 0;
}

//获取值对应的rank
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
    zskiplistNode *x;
    unsigned long rank = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
            rank += x->level[i].span;
            x = x->level[i].forward;
        }

        /* x might be equal to zsl->header, so test if obj is non-NULL */
        if (x->obj && equalStringObjects(x->obj,o)) {
            return rank;
        }
    }
    return 0;
}

利用每一层记录的span,可以快速的计算出节点的rank。

删除指定score范围的节点

static int zslValueGteMin(double value, zrangespec *spec) {
    return spec->minex ? (value > spec->min) : (value >= spec->min);
}

static int zslValueLteMax(double value, zrangespec *spec) {
    return spec->maxex ? (value < spec->max) : (value <= spec->max);
}

//删除指定score范围的节点,返回被删除的节点数量
unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned long removed = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward && (range->minex ?
            x->level[i].forward->score <= range->min :
            x->level[i].forward->score < range->min))
                x = x->level[i].forward;
        update[i] = x;
    }

    /* Current node is the last with score < or <= min. */
    //找到第一个大于range->min的值
    x = x->level[0].forward;

    /* Delete nodes while in range. */
    //删除直到score>max
    while (x &&
           (range->maxex ? x->score < range->max : x->score <= range->max))
    {
        zskiplistNode *next = x->level[0].forward;
        zslDeleteNode(zsl,x,update);
        dictDelete(dict,x->obj);
        zslFreeNode(x);
        removed++;
        x = next;
    }
    return removed;
}

首先要找到第一个符合条件的节点。

总结

跳跃表的名字听起来会觉得这个算法很复杂,但实际懂了以后,会发现跳跃表其实很简单。

Redis的跳跃表由以下几个特点:

  1. 跳跃表节点的层数是1~32的随机数;
  2. 同一个跳跃表中,节点可以包含相同的值,但每个节点的成员对象必须是唯一的;
  3. 跳跃表是先按score进行排序,然后在根据成员对象的大小进行排序;
  4. 跳跃表查找的平均时间复杂度为O(LogN),最坏时间复杂度是O(N)。

跳跃表API

function description time complexity
zslCreate 创建一个跳跃表 O(1)
zslFree 释放一个跳跃表 O(N)
zslInsert 插入一个节点 平均O(LogN),最坏O(LogN)
zslDelete 删除一个节点 平均O(LogN),最坏O(LogN)
zslFirstInRange 获取满足分值范围的第一个节点 平均O(LogN),最坏O(LogN)
zslLastInRange 获取满足分值范围的最后一个节点 平均O(LogN),最坏O(LogN)
zzlGetScore 获取节点score 平均O(LogN),最坏O(LogN)
zslGetRank 获取节点的index 平均O(LogN),最坏O(LogN)
zslGetElementByRank 获取指定index的节点 平均O(LogN),最坏O(LogN)
zslIsInRange 跳跃表是否在指定score范围之内 通过表头表尾,O(1)
zslDeleteRangeByScore 删除指定score范围之内的节点 平均O(N),N为被删节点数量
zslDeleteRangeByRank 删除指定rank范围之内的节点 平均O(N),N为被删节点数量

Reference

  1. 漫画算法:什么是跳跃表?
  2. 跳跃表原理

相关文章

  • Redis源码阅读笔记(6)-跳跃表

    跳跃表在平常的数据结构和算法中比较少接触,它是一种有序的数据结构,通过建立类似索引的来实现快速的查找,其支持O(L...

  • Redis 源码研究之skiplist

    本文主要记录Redis源码中skiplist数据结构的一些函数实现。 建议阅读: 1、Redis中跳跃表的理论说...

  • redis笔记:跳跃表

    本人博客同步发表,排版更佳 通过在每个节点中维持多个只想其他节点的指针,从而达到快速访问节点的目的。 作为redi...

  • [redis 源码走读] 跳跃表(skiplist)

    跳跃表 张铁蕾的博客将 skiplist 原理和算法复杂度描述得很清楚,具体可以参考。我分享一下自己对部分源码的阅...

  • 4.8-Redis6数据结构之SortedSet类型介绍和跳跃表

    Redis6数据结构之SortedSet类型介绍和跳跃表介绍 简介:Redis6数据结构之SortedSet类型介...

  • redis设计与实现读书笔记(二)

    本文内容为《redis设计与实现》一书学习笔记。本文主要概述五到八章内容。 第5章 跳跃表 跳跃表(skiplis...

  • 跳跃表

    redis中有序集合zset是用跳跃表实现的,所以今天学习了一下跳跃表。本文主要讲redis中跳跃表的实现。 一、...

  • Redis跳跃表

    本文摘抄自redis 源码剖析 跳跃表是一种随机化的数据,跳跃表以有序的方式在层次化的链表中保存元素,效率和平衡树...

  • redis跳跃表

    跳跃表(skiplist)是一种有序数据结果,它通过在每个节点中维持多个指向其他节点的指针,从而达到快速访问节点的...

  • 跳跃表 redis

    为什么选择跳跃表目前经常使用的平衡数据结构有:B树,红黑树,AVL树,Splay Tree, Treep等。想象一...

网友评论

    本文标题:Redis源码阅读笔记(6)-跳跃表

    本文链接:https://www.haomeiwen.com/subject/ujjebftx.html