美文网首页
看!源码Redis之skiplist(跳跃表)

看!源码Redis之skiplist(跳跃表)

作者: starskye | 来源:发表于2020-03-26 17:37 被阅读0次

skiplist是跳跃表效率堪比红黑树zset便使用的此结构

//最大支持层数为64
#define ZSKIPLIST_MAXLEVEL 64 /* Should be enough for 2^64 elements */
//随机增加level的概率计算一般是1/2或者1/4此处使用的是1/4
#define ZSKIPLIST_P 0.25      /* Skiplist P = 1/4 */
//存储字符串数据使用
typedef char* sds;
//跳表的Node节点
typedef struct zskiplistNode {
    //跳表的数据存储
    sds ele;
    //因为跳表是在有序链表上进行操作的所以需要score进行排序
    double score;
    //后退指针用于链表从后向前遍历使用
    struct zskiplistNode *backward;
    //当前node的层级
    struct zskiplistLevel {
        //当前层级节点跳转一个相同层级的指针,进步指针
        //1     >       5   3层
        //1  >  3   >   5   2层
        //1  2  3   4   5   1层
        //按照上方的案例 如果当前node是3层级是2 那么forward指向的则是值为5的节点
        struct zskiplistNode *forward;
        //当前node到下一个node的跨越数 还是上方的案例 node3 到node5 span为 2
        unsigned long span;
    } level[];
} zskiplistNode;
//跳跃表的结构
typedef struct zskiplist {
    //其中包含指向header的头结点 还有指向尾部的tail尾节点指针
    struct zskiplistNode *header, *tail;
    //当前链表的个数 不算层级因为 跳跃表总归需要一个完整的链表作支撑 此处length则代表完整链表的长度
    unsigned long length;
    //已使用的最大层级 跳跃表之所以效率高就取决于层级越高跨域可能越大(因为跨域层级是需要随机的也有可能相邻但是概率小)
    //所以从顶部开始遍历时最好的选择能够最快的将区间范围缩小
    int level;
} zskiplist;

int srandInt = 0;

//随机产生十分需要提升等级
int zslRandomLevel(void) {

    //默认一级最底层的链表则为一级
    int level = 1;
    //random采用的是glibc的random实现 支持重入 说白了就是在srand的调用使用了互斥锁
    //同时在每次计算完随机数后在设置一个新的种子再次随机计算
    //随机的结果与65535的二进制做与计算 结果如果小于 16383.75 则level+1 直到随机结果大于16383.75为止
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    //如果当前随机的层数大于了最大限制则返回最大限制64
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
//创建一个node节点 传入需要创建的层级数因为需要创建level柔性数组
//score存储当前数据的评分 ele 具体数据值
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
    //分配当前的node空间 sizeof(*zn)代表着node结构体的大小 level*sizeof(struct zskiplistLevel)是柔性数组的大小
    //相加则为总大小,zmalloc内部实现是根据在分配的内存的基础上增加了 size_t的大小 在分配的头部用于存储当前空间的大小
    //有点绕 比如 sizeof(size_t) = 4 zmalloc(8) 最终需要分配12个byte 而 zmalloc返回的结果是从 分配首地址+4的偏移地址
    //而分配的首地址则存储着数字8用于存储使用的内存空间
    zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    //赋值操作
    zn->score = score;
    zn->ele = ele;
    return zn;
}
//创建跳跃表
zskiplist *zslCreate(void) {
    //变量使用
    int j;
    //跳跃表的指针
    zskiplist *zsl;
    //首先分配跳跃表的内存空间
    zsl = zmalloc(sizeof(*zsl));
    //设置当前最高层级为1层 因为是刚创建默认的一层
    zsl->level = 1;
    //链表的个数为0
    zsl->length = 0;
    //初始化头部节点 注意头部节点的创建为maxlevel = 64 说明跳跃表的头节点是不参与存储的 初始化表的时候就需要初始化所有层级
    //score为0 值为null
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    //既然header不参与存储那么他的层级数据也需要重置为初始值
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        //设置当前层级的next节点为null 跨域数为0
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    //设置头结点的回退指针为null
    zsl->header->backward = NULL;
    //因为没有数据所以尾节点为null
    zsl->tail = NULL;
    return zsl;
}
//跳表的数据插入 zsl需要插入数据的目标表指针
//score 插入node的分值 ele插入node的数据
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    //创建一个update数组这里长度为64 因为最坏的打算是将64层数据都遍历完才找到位置那么就需要更新之前所有层级的连接数据
    //forward与span需要更新 x 遍历level的forward使用
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    //计算span使用如果当前插入会影响到之前的level数据则需要重新计算span
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    //遍历使用i 随机的当前node的层级数据
    int i, level;
    //判断当前score是否是一个合格的数字如果是无效数字则exit停止当前程序
//    serverAssert(!isnan(score));
    //获取当前头用户遍历
    x = zsl->header;
    //遍历链表的层级由高到低在计算时以0层算在存储level时以1开始计算 所以此处-1
    for (i = zsl->level-1; i >= 0; i--) {
        //判断当前i是否等于当前层级 如果等于则返回0 因为并没有进行span跨越
        //否则返回上一层的span值 因为层级遍历时高到低所以i+1
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        //进行level的横向查找 首先判断当前level的next是否有值如果没有则不循环
        //否则查看当前level是否小于score则进入while循环forward
        //不小于则判断是否相等如果相等则判断sdscmp判断两个sds是否相等如果相等则不在进行level遍历
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            //记录当前level所在的span总数
            rank[i] += x->level[i].span;
            //设置当前层的forward给x
            x = x->level[i].forward;
        }
        //遍历结束后记录下当前层的最后一个值 这样到时候会更新
        update[i] = x;
    }
    //随机获取一个等级
    level = zslRandomLevel();
    //如果当前随机的等级大于了当前最大的等级
    if (level > zsl->level) {
        //则遍历当前最高等级到刚随机的等级数
        for (i = zsl->level; i < level; i++) {
            //设置当前为0因为下方遍历会有使用到因为是初始化所以不应该参与计算
            rank[i] = 0;
            //将当前的头设置为需要更新的node
            update[i] = zsl->header;
            //设置当前header的span是直接指到尾部进行初始化下方在继续计算
            update[i]->level[i].span = zsl->length;
        }
        //设置当前表的最高高度为当前随机的level
        zsl->level = level;
    }
    //创建一个节点并且创建level柔性数组
    x = zslCreateNode(level,score,ele);
    //遍历初始化柔性数组
    for (i = 0; i < level; i++) {
        //首先获取第一层的前一个找到的目标节点将他的forward赋值给当前x
        x->level[i].forward = update[i]->level[i].forward;
        //然后再将x设置为前一个节点的forward 这样形成了入队操作 需要注意的是此处遍历时从0开始
        //所以先入队的一定是完整数据的队列
        update[i]->level[i].forward = x;

        //更新当前层级的下的所有有关的span值
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }
    //上方更新了所有关联的span值但是其他未关联的也需要增加一个span 所以此处将高于level的update进行了更新
    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }
    //设置回退指针 如果update[0]是header那么直接指向null否则指向前一个节点做为 backward
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    //如果当前forward有值那么说明不是最后一个节点则设置他的下一个节点的回退节点backward为当前节点
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        //否则设置链表的尾节点为当前x
        zsl->tail = x;
    //节点数++
    zsl->length++;
    return x;
}

虽然上方每行都有注释但是有些地方还是很绕下面会有一段文字是假设在当前skiplist中插入数据的执行流程

//1     >       5   3层
//1  >  3   >   5   2层  
//1  2  3   4   5   1层  
x = zsl->header

i = 2 rank[i] = 0 i == level-1 所以为0
update[2] = x

i = 1 rank[1] = 0 i != level-1 但是rank[i+1] = rank[2] = 0
因为都是使用x->level->forwrad所以当前while判断使用的是2:3
2:3 (满足while条件 所以 当前rank[1] = 2 x = 2:3) 2:5不符合所以退出循环
x = 2:3 rank[1] = 2 update[1] = 2:3

i = 0 rank[0] = 2 i != level-1 但是rank[1+1] = rank[1] = 2
当前x=2:3所以forward为 4 满足条件 所以
rank[0] = 2 x = 1:3 update[0] = 1:3

随机当前等级为4
则遍历3 - 4级需要更新的数据
i = 3
rank[3] = 0
update[3] = header
update[3]->level[3].span = 5
zsl->level = 5

x = newNode
遍历当前节点所有层级
i -> 4
i = 0
update[0]->level[0].forward 是上方计算的1:3
将他插入到x的forward中 即 x->level[0].forward = update[0]->level[0].forward
x->level[0].forward = 1:3 = update[0]->level[0].forward
update[0]->level[0].forward = x = 1:3.5
这样就形成了链表的入列 
结果
1   ->  2   ->  3   ->  3.5 ->  4   ->  5
计算当前的span
x->level[0].span = update[0]->level[0].span - (rank[0] - rank[i])  i = 0
x->level[0].span = 1 - (2 - 2) span = 1
update[0]->level[0].span = (rank[0] - rank[i]) +1 = 1

i = 1
update[1]->level[1].forward 是上方计算的2:3
将他插入到x的forward中 即 x->level[1].forward = update[1]->level[1].forward
x->level[1].forward = 2:5 = update[1]->level[1].forward
update[1]->level[1].forward = x = 2:3.5
结果为
1   ->  3 -> 3.5 -> 5
计算当前span
x->level[1].span = update[1]->level[1].span - (rank[0] - rank[i])  i = 1
x->level[1].span = 2 - (2 - 2) = 2
update[1]->level[1].span = (rank[0] - rank[i]) +1 = 1

i = 2
update[2]->level[2].forward 是上方计算的x header
将他插入到x的forward中 即 x->level[2].forward = update[2]->level[2].forward
x->level[2].forward = header->level[2].forward = 3:5
header->level[2].forward = x = 3:3.5
结果为
1   ->  3.5 ->  5
计算当前span
x->level[2].span = update[2]->level[2].span - (rank[0] - rank[i])  i = 2
x->level[2].span = 4 - (2 - 0) = 2
update[2]->level[2].span = (rank[0] - rank[i]) +1 = 3

i = 3
也是需要给header新增层级之前初始化过update[3] = zsl->header;
将他插入到x的forward中 即 x->level[3].forward = update[3]->level[3].forward
x->level[3].forward = header->level[3].forward = NULL因为header当前是最新层没有指向
header->level[2].forward = x = 4:3.5
结果为
header -> 3.5 此处没有1
计算当前span
x->level[3].span = update[3]->level[3].span - (rank[0] - rank[i])  i = 3
x->level[3].span = 4 - (2 - 0) = 2
update[3]->level[3].span = (rank[0] - rank[i]) +1 = 3

//此处为手写链表的大概流程 如果对上方描述有理解的读者不防自己写写加深印象
1、创建链表 管理元素数层级数和链表头尾
2、创建node节点 回退指针指向前一个元素用于后向前遍历 score 比较分数 ele数据 
    level[]数组存储当前node的层级元素 其中存有 span下一个节点的跨度 forward下一个节点的指针
3、插入数据
    3.1、查找最后一个满足查询条件的node
    3.2、随机生成等级 如果等级超过当前的level那么需要初始化新增高度的level 方便进行后续更新
    3.3、创建node节点
    3.4、处理0-随机level之间所有节点的入队和span计算
    3.5、如果随机等级小于当前的level那么需要处理随机level-当前level的等级
    3.6、回退指针和levelforward的回退指针设置

讲解到此结束 如果参与自己手写的读者可以看看下方源码是笔者根据上方流程写的。

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <math.h>
#include <string.h>
#define LIST_LEVEL_MAX 64
typedef struct _Node{
    struct _Node * backward;
    int score;
    char * ele;
    struct _NodeList{
        struct _Node * forward;
        int span;
    }level[];
}Node;

typedef struct{
    struct _Node *header,*tail;
    int level;
    int length;
}SkipList;

Node* createNode(int level,int score,char *ele){
    Node* node = malloc(sizeof(Node)+level* sizeof(struct _NodeList));
    node->score = score;
    node->ele = ele;
    return node;
}

SkipList * createList(){
    SkipList *list = malloc(sizeof(SkipList));
    list->header = createNode(LIST_LEVEL_MAX,0,NULL);
    list->level = 1;
    list->length = 0;
    for (int i = 0; i < LIST_LEVEL_MAX; ++i) {
        list->header->level[i].forward = NULL;
        list->header->level[i].span = 0;
    }
    return list;
}

int randomLevel(){
    _sleep(1000);
    int level = 1,temp = time(NULL);
    srand(temp);
    while ((temp = rand())%4 == 0){
        srand(temp);
        level++;
    }
    return level > LIST_LEVEL_MAX ? LIST_LEVEL_MAX:level ;
}

void listQueue(SkipList *list,int score,char* ele){
    Node* update[LIST_LEVEL_MAX],*x;
    int rank[LIST_LEVEL_MAX];

    x = list->header;
    int maxLevelIndex = (list->level-1);
    for (int i = maxLevelIndex; i >= 0; i--) {
        rank[i] = i == maxLevelIndex ? 0 : rank[i+1];
        while(x->level[i].forward&&
                (x->level[i].forward->score < score ||
                x->level[i].forward->score == score && strcmp(x->ele,ele) != 0)){
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    int level = randomLevel();

   if (level > list->level){
       for (int l = list->level; l <  level; ++l) {
           rank[l] = 0;
           update[l] = list->header;
           update[l]->level[l].span = 0;
       }
       list->level = level;
   }

    x = createNode(level,score,ele);
    for (int j = 0; j < level; ++j) {
        x->level[j].forward = update[j]->level[j].forward;
        update[j]->level[j].forward = x;

        x->level[j].span = update[j]->level[j].span - (rank[0] - rank[j]);
        update[j]->level[j].span = (rank[0]-rank[j]) + 1;
    }

    for (int k = level; k < list->level ; ++k) {
        update[k]->level[k].span++;
    }

    x->backward = (update[0] == list->header)? NULL:update[0];

    if (x->level[0].forward){
        x->level[0].forward->backward = x;
    } else {
        list->tail = x;
    }
    list->length++;
}

int main(){
    SkipList* list = createList();
    listQueue(list,1,"111");
    listQueue(list,2,"222");
    listQueue(list,3,"333");
    listQueue(list,4,"444");
    listQueue(list,5,"555");
    listQueue(list,6,"666");
    listQueue(list,7,"777");
    listQueue(list,8,"888");
    for (int i = (list->level-1); i >=0 ;i--) {
        Node* node = list->header->level[i].forward;
        printf("header\t->");
        for (int k = 1; k < list->header->level[i].span; ++k) {
            printf("   \t->\t");
        }
        while (node!=NULL){
            printf("%s\t->\t",node->ele);
            for (int j = 1; j < node->level[i].span; ++j) {
                printf("   \t->\t");
            }
            node = node->level[i].forward;
        }
        printf("\n");
    }
    return 1;
};

值得一看的便是最后的打印, 这里随机函数横简单而且不合理如果有算法大佬可以在下方留言

相关文章

  • Redis 源码研究之skiplist

    本文主要记录Redis源码中skiplist数据结构的一些函数实现。 建议阅读: 1、Redis中跳跃表的理论说...

  • [redis 源码走读] 跳跃表(skiplist)

    跳跃表 张铁蕾的博客将 skiplist 原理和算法复杂度描述得很清楚,具体可以参考。我分享一下自己对部分源码的阅...

  • Redis 跳跃表(skiplist)

    Redis基础类型中的有序集合、集群节点的内部数据结构用到了跳跃表(skiplist)。 5.1 跳跃表的实现 图...

  • redis数据结构

    redis数据结构列表:简单动态字符串(sds)链表(list)字典(dict)跳跃表(skiplist)整数集合...

  • 跳跃表(skiplist)

    鸣谢: https://blog.csdn.net/xiaolei1982/article/details/507...

  • SkipList(跳跃表)

    简介   跳跃表是一种单链表形式的链式结构,不同于一般的链式结构其为多层链式结构。正因为这种多层结构从而相比于单式...

  • 「Redis设计与实现」跳跃表篇

    跳跃表简介 我们先抛开redis,单独了解下跳越表 skiplist本质上也是一种查找结构,用于解决算法中的查找问...

  • skiplist 跳跃表分析

    跳表(skip List)是一种随机化的数据结构,基于并联的链表,实现简单,插入、删除、查找的复杂度均为O(log...

  • 科普跳跃表-SkipList

    什么是跳表 跳表是一种有序的数据结构,它通过在每个节点中维持多个指向其它节点的指针,从而达到快速访问的目的。 核心...

  • 跳跃表skiplist原理

    github地址:https://github.com/arkulo56/thought/blob/master/...

网友评论

      本文标题:看!源码Redis之skiplist(跳跃表)

      本文链接:https://www.haomeiwen.com/subject/erkruhtx.html