跳跃表在平常的数据结构和算法中比较少接触,它是一种有序的数据结构,通过建立类似索引的来实现快速的查找,其支持O(LogN)的平均时间复杂度,最坏O(N)的时间复杂度;在大部分情况下,其可以与平衡树相媲美,而且由于实现较平衡树简单,因此常常作为平衡树的替代方案。
设计的主要代码:
redis.h
t_zset.h
跳跃表的定义
我们知道,即使对于一个有序链表,其查询一个元素的时间复杂度为O(N),即使是排序的,也不能通过二分查找来缩短查找的时间。如下图,查找节点15需要比较8个节点。
链表
那么有没有其他优化的算法呢?我们可以利用索引的原理,在链表上提取部分节点建立索引,如下图,只需要比较6个节点。
一级索引
根据这个思路,可以在一级索引之上,再提取部分节点作为二级索引,如下图,只需要比较4个节点
二级索引
在此基础上,再往上建立一层,如下图,只需要比较3个节点。
三级索引以上就是跳跃表的思想,通过增加多层索引,以空间换时间,加快节点的查询。图中元素不多,查找速度的加快没有体现出,如果节点一多,那跳跃表的优势就体现出来了。
跳跃表具有如下性质:
- 以空间换时间,由多层组成;
- 每一层都是有序链表;
- 最后一层包含所有元素;
- 如果节点node出现在layer n出现,那么该节点在layer n之下的链表层一定会出现;
- 每一个节点包含两个指针,一个指向同一层链表的下一个节点,另一个指向下一层的节点。
跳跃表的实现
//跳跃表节点定义
typedef struct zskiplistNode {
robj *obj; //成员对象
double score; //分值 主要按分值来排序,其次才按对象
struct zskiplistNode *backward; //回退指针
struct zskiplistLevel { //层级,按1~32层随机指定
struct zskiplistNode *forward; //层级的前向指着
unsigned int span; //跨度
} level[];
} zskiplistNode;
//管理跳跃表
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level; //最大节点层数
} zskiplist;
Redis的跳跃表由zskiplistNode
和zskiplist
组成,其图像结构如下:
其中zskiplist
用来管理跳跃表,其包含了跳跃表的相关信息:
-
header
:指向跳跃表的表头; -
tail
:指向跳跃表的表尾; -
level
:记录该跳跃表目前的最大层级(表头节点不计算在内); -
length
:记录了该跳跃表的长度。
zskiplistNode
是实际跳跃表的节点,其记录了节点信息:
-
obj
:节点保存的对象; -
score
:节点排序的分值,节点的排序按照该值从小到大排序,如果该值相同,则按照obj的大小来排序; -
backward
:最底层是一个双向链表,用来从表尾向表头遍历节点用,因为每个节点只有一个后退指针,因此每次只能后退一个节点; -
level.forward
:前向指针,用来保存同层下一个节点的地址; -
level.span
记录了同层节点之间的距离span,即在最底层两节点相距多少个节点,用来快速计算节点的rank(即节点所在的index)时使用。指向null的span为0。
相关实现细节
创建和销毁
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
//跳跃表节点创建函数
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel)); //申请内存
zn->score = score;
zn->obj = obj;
return zn;
}
//创建跳跃表
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL); //创建第一列节点,固定32层级,保存用来执行每层的头结点
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) { //初始化
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
//释放节点
void zslFreeNode(zskiplistNode *node) {
decrRefCount(node->obj); //减少对象被引用引用数
zfree(node);
}
//释放跳跃表
void zslFree(zskiplist *zsl) {
zskiplistNode *node = zsl->header->level[0].forward, *next;
zfree(zsl->header); //释放第一列32层级的表头结点
while(node) {
next = node->level[0].forward;
zslFreeNode(node);
node = next;
}
zfree(zsl);
}
Redis在创建跳跃表的时候,会创建一个32层的表头节点,该表头节点用来指向各层的开始节点,初始化时,该节点的每一层的forward
都指向NULL
,span
为0。
插入节点
//生成1~32的随机数
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
//插入节点,从高往低计算,从低层往高层插入,从span可以计算出每个节点的rank
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
redisAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) { //从最高层往下一层一层的插入
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
rank[i] += x->level[i].span; //记录在要插入的前节点的排名
x = x->level[i].forward;
}
update[i] = x; //记录每一层插入的前一节点
}
level = zslRandomLevel(); //设置该节点level
if (level > zsl->level) { //level比当前的level还高的
for (i = zsl->level; i < level; i++) { //设置新添层的rank和update
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,obj); //创建节点
for (i = 0; i < level; i++) { //从低往高处理
x->level[i].forward = update[i]->level[i].forward; //设置每层该节点的forward指针
update[i]->level[i].forward = x; //改变前一节点forward
/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); //设置该层本节点span,因为有要插入的位置之前的节点rank信息
update[i]->level[i].span = (rank[0] - rank[i]) + 1; //设置前一节点的span
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
Redis在添加节点的时候,该节点所拥有的层数通过zslRandomLevel
计算得到的(作者注释该随机数满足powerlaw-alike
分布,还没研究,有空研究研究)。
每次插入节点,要更改每一层同层前置节点的forward和span。
删除节点
//删除节点
/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
//从底层往高层删
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1; //本节点被删除,所以要减一
update[i]->level[i].forward = x->level[i].forward; //设置forward
} else { //当前层级不包含x节点,只需减一
update[i]->level[i].span -= 1;
}
}
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--; //看最高层级是否需要减1
zsl->length--;
}
//指定要删除的节点
/* Delete an element with matching score/object from the skiplist. */
int zslDelete(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) { //寻找该节点可以插入的前一节点
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0)))
x = x->level[i].forward;
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward; //还要再次确认是否存在该节点
if (x && score == x->score && equalStringObjects(x->obj,obj)) {
zslDeleteNode(zsl, x, update);
zslFreeNode(x);
return 1;
}
return 0; /* not found */
}
删除节点比较简单,首先要寻找每一层(该节点存在的每一层)该节点的前置节点,然后更改每一个同层节点前置节点的forward
和span
。
获取对应节点的rank以及返回获取rank为指定值的节点
//获取值对应的rank
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
}
/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->obj && equalStringObjects(x->obj,o)) {
return rank;
}
}
return 0;
}
//获取值对应的rank
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
}
/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->obj && equalStringObjects(x->obj,o)) {
return rank;
}
}
return 0;
}
利用每一层记录的span,可以快速的计算出节点的rank。
删除指定score范围的节点
static int zslValueGteMin(double value, zrangespec *spec) {
return spec->minex ? (value > spec->min) : (value >= spec->min);
}
static int zslValueLteMax(double value, zrangespec *spec) {
return spec->maxex ? (value < spec->max) : (value <= spec->max);
}
//删除指定score范围的节点,返回被删除的节点数量
unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned long removed = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (range->minex ?
x->level[i].forward->score <= range->min :
x->level[i].forward->score < range->min))
x = x->level[i].forward;
update[i] = x;
}
/* Current node is the last with score < or <= min. */
//找到第一个大于range->min的值
x = x->level[0].forward;
/* Delete nodes while in range. */
//删除直到score>max
while (x &&
(range->maxex ? x->score < range->max : x->score <= range->max))
{
zskiplistNode *next = x->level[0].forward;
zslDeleteNode(zsl,x,update);
dictDelete(dict,x->obj);
zslFreeNode(x);
removed++;
x = next;
}
return removed;
}
首先要找到第一个符合条件的节点。
总结
跳跃表的名字听起来会觉得这个算法很复杂,但实际懂了以后,会发现跳跃表其实很简单。
Redis的跳跃表由以下几个特点:
- 跳跃表节点的层数是1~32的随机数;
- 同一个跳跃表中,节点可以包含相同的值,但每个节点的成员对象必须是唯一的;
- 跳跃表是先按score进行排序,然后在根据成员对象的大小进行排序;
- 跳跃表查找的平均时间复杂度为O(LogN),最坏时间复杂度是O(N)。
跳跃表API
function | description | time complexity |
---|---|---|
zslCreate | 创建一个跳跃表 | O(1) |
zslFree | 释放一个跳跃表 | O(N) |
zslInsert | 插入一个节点 | 平均O(LogN),最坏O(LogN) |
zslDelete | 删除一个节点 | 平均O(LogN),最坏O(LogN) |
zslFirstInRange | 获取满足分值范围的第一个节点 | 平均O(LogN),最坏O(LogN) |
zslLastInRange | 获取满足分值范围的最后一个节点 | 平均O(LogN),最坏O(LogN) |
zzlGetScore | 获取节点score | 平均O(LogN),最坏O(LogN) |
zslGetRank | 获取节点的index | 平均O(LogN),最坏O(LogN) |
zslGetElementByRank | 获取指定index的节点 | 平均O(LogN),最坏O(LogN) |
zslIsInRange | 跳跃表是否在指定score范围之内 | 通过表头表尾,O(1) |
zslDeleteRangeByScore | 删除指定score范围之内的节点 | 平均O(N),N为被删节点数量 |
zslDeleteRangeByRank | 删除指定rank范围之内的节点 | 平均O(N),N为被删节点数量 |
网友评论