Redis深度历险-跳表
跳表是一个比较经典的数据结构,非常有用但又不像红黑树那么复杂,非常值得学习;在
Ridis
中除了提供set
这种无序集合,还提供了zset
这种有序集合,有序集合就是使用跳表实现的
跳表的意义
跳表是一种结合了二分法和链表特性的数据结构,使用链表的方式提供快速的插入和删除,使用二分法的思想快速定位数据。
链表
list.jpg链表是最基础的数据结构,其特性有
- 在任意位置快速的插入和删除
- 不允许随机访问数据
跳表
skiplist.png对于一个有序链表来说,在进行插入数据时必须从头到尾进行遍历以查找插入的位置,这个时间复杂度时
O(n)
级别的,对于Redis
是不可接受的,而跳表将查找这个时间复杂度降低为O(logn)
跳表是基于链表实现的,链表无法实现二分法的原因就是无法快速定位中间节点;跳表的实现原理就是:
- 维护一个多层次的链表
- 底层包含所有数据,并保持有序
- 上一层链表是下一层的索引
- 在查找时从最上层开始逐层往下查找,理想情况每次都能排除底层一半数据
跳表的问题
假设某一层有[a......b......c]
很多个数据,在上一层为这一层建立索引[a,b,c]
,最理想的情况是b
刚好位于a
和c
的中间,那么在[a,b,c]
中进行比较的时候能够一次性排出下一层一半的数据。
这样建立起来的就类似于一颗二叉树了,但是实际实现非常困难;Redis采用的是随机的方式。
Redis中的跳表
数据结构
typedef struct zskiplistNode {
sds ele; //节点数据
double score; //节点权值
struct zskiplistNode *backward; //前一个节点
struct zskiplistLevel {
struct zskiplistNode *forward; //下一个节点
unsigned long span; //到下一个节点的距离
} level[];
} zskiplistNode;
- 一个节点存在于多行中,并保存了自身在每一行中的信息
- 在
level
字段中的forward
存储来了该节点在每一层的下一个节点 - 在
level
字段中的span
存储来该节点到每一层下一个节点的距离
typedef struct zskiplist {
struct zskiplistNode *header, *tail; //链表头尾节点
unsigned long length; //链表总长度
int level; //此链表在第几层
} zskiplist;
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;
zset
中还使用了一个字典,用来根据字符串查找权值,保证复杂度为o(1)
级别
创建跳表
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
//创建一个虚拟头节点,此节点存储在所有层中
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
在创建跳表的时候会创建一个虚拟头节点,同时存在于32层中。
插入
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
//按跳表的规则从上往下进行查找,优先比较score其次比较字符串
for (i = zsl->level-1; i >= 0; i--) {
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
//update存储的是新节点插入位置,插入到每一层的update[i]后
//rank则表示的是update节点的跨度
//如果新节点的高度大于现有跳表的最大高度,那么高出来的那一截新节点的前驱节点就是虚拟节点header
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,ele);
//将节点插入到update[i]后
for (i = 0; i < level; i++) {
//插入元素
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
//更新跨度
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
//当新节点的高度小于现有跳表的高度时,需要更新一下高出来的那一截前驱节点的跨度
//上一段循环代码只处理到level了
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
//更新一下相关链表的指向
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
这里的跨度问题会导致整个流程变得复杂,不过了解span
是什么意思就可以了,不必过分深究实现细节;
重要的是这里使用的zslCreateNode(level,score,ele)
随机算法
层数随机算法
每隔节点层数的选择是非常关键的,如果选的不好可能导致跳表的退化
int zslRandomLevel(void) {
int level = 1;
//ZSKIPLIST_P等于0.25
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
这里使用了很简单的算法(random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF)
的概率是1/4
,即意味着从下一层到上一层的概率约为1/4
,类似于一颗四叉树
范围删除元素
unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned long traversed = 0, removed = 0;
int i;
//根据跨度快速定位到start所在的节点
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) < start) {
traversed += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
//traversed存储的是header到节点x的距离
//update存储的是每一层中最大一个小于start的节点,这些节点在删除后需要更新跨度
//在底层从一个一个删除x后的元素
traversed++;
x = x->level[0].forward;
while (x && traversed <= end) {
zskiplistNode *next = x->level[0].forward;
zslDeleteNode(zsl,x,update);
dictDelete(dict,x->ele);
zslFreeNode(x);
removed++;
traversed++;
x = next;
}
return removed;
}
此函数删除的是整个集合第start
到第end
个元素,这个时候span
字段就显示出作用来了;它其实是用来计算节点在集合中的排位的,因为当从上往下寻找元素的时候实际是不知道跨越了多少元素的
网友评论