目录
引子
跳跃列表(Skip List),简称跳表。在前面我写过的一篇讲解HBase MemStore的文章中曾经提到,MemStore(其实也包括BlockCache)的基础数据结构都是JUC包提供的并发跳表ConcurrentSkipListMap。当时我就立flag说跳表是个有意思的数据结构,会写文章专门分析它,那么今天就来兑现承诺吧。
认识跳表
跳表的提出
跳表首先由William Pugh在其1990年的论文《Skip lists: A probabilistic alternative to balanced trees》中提出。由该论文的题目可以知道两点:
- 跳表是概率型数据结构。
- 跳表是用来替代平衡树的数据结构。准确来说,是用来替代自平衡二叉查找树(self-balancing BST)的结构。
看官可能会觉得这两点与之前文章中讲过的布隆过滤器有些相似,但实际上它们的原理和用途还是很不相同的。
由二叉树回归链表
考虑在有序序列中查找某个特定元素的情境:
- 如果该序列用支持随机访问的线性结构(数组)存储,那么我们很容易地用二分查找来做。
- 但是考虑到增删效率和内存扩展性,很多时候要用不支持随机访问的线性结构(链表)存储,就只能从头遍历、逐个比对。
- 作为折衷,如果用二叉树结构(BST)存储,就可以不靠随机访问特性进行二分查找了。
我们知道,普通BST插入元素越有序效率越低,最坏情况会退化回链表。因此很多大佬提出了自平衡BST结构,使其在任何情况下的增删查操作都保持O(logn)的时间复杂度。自平衡BST的代表就是AVL树、Splay树、2-3树及其衍生出的红黑树。如果推广之,不限于二叉树的话,我们耳熟能详的B树和B+树也属此类,常用于文件系统和数据库。
自平衡BST显然很香,但是它仍然有一个不那么香的点:树的自平衡过程比较复杂,实现起来麻烦,在高并发的情况下,加锁也会带来可观的overhead。如AVL树需要LL、LR、RL、RR四种旋转操作保持平衡,红黑树则需要左旋、右旋和节点变色三种操作。下面的动图展示的就是AVL树在插入元素时的平衡过程。
那么,有没有简单点的、与自平衡BST效率相近的实现方法呢?答案就是跳表,并且它简单很多,下面就来看一看。
设计思想与查找流程
跳表就是如同下图一样的许多链表的集合。
跳表具有如下的性质:
- 由多层组成,最底层为第1层,次底层为第2层,以此类推。层数不会超过一个固定的最大值Lmax。
- 每层都是一个有头节点的有序链表,第1层的链表包含跳表中的所有元素。
- 如果某个元素在第k层出现,那么在第1~k-1层也必定都会出现,但会按一定的概率p在第k+1层出现。
很显然这是一种空间换时间的思路,与索引异曲同工。第k层可以视为第k-1级索引,用来加速查找。为了避免占用空间过多,第1层之上都不存储实际数据,只有指针(包含指向同层下一个元素的指针与同一个元素下层的指针)。
当查找元素时,会从最顶层链表的头节点开始遍历。以升序跳表为例,如果当前节点的下一个节点包含的值比目标元素值小,则继续向右查找。如果下一个节点的值比目标值大,就转到当前层的下一层去查找。重复向右和向下的操作,直到找到与目标值相等的元素为止。下图中的蓝色箭头标记出了查找元素21的步骤。
通过图示,我们也可以更加明白“跳表”这个名称的含义,因为查找过程确实是跳跃的,比线性查找省时。若要查找在高层存在的元素(如25),步数就会变得更少。当数据量越来越大时,这种结构的优势就更加明显了。
插入元素的概率性
前文已经说过,跳表第k层的元素会按一定的概率p在第k+1层出现,这种概率性就是在插入过程中实现的。
当按照上述查找流程找到新元素的插入位置之后,首先将其插入第1层。至于是否要插入第2,3,4...层,就需要用随机数等方法来确定。最通用的实现方法描述如下。
int randomizeLevel(double p, int lmax) {
int level = 1;
Random random = new Random();
while (random.nextDouble() < p && level < lmax) {
level++;
}
return level;
}
得到层数k之后,就将新元素插入跳表的第1~k层。由上面的逻辑可知,随着层数的增加,元素被插入高层的概率会指数级下降。
下面的动图示出以p=1/2概率在跳表中插入元素的过程。这种方法也被叫做“抛钢镚儿”(coin flip),直到抛出正面/反面就停止。
相对于插入而言,删除元素没有这么多弯弯绕,基本上就是正常的单链表删除逻辑,因此不再展开。
复杂度分析
只要我们找出平均查找长度,自然就可以知道跳表的平均时间复杂度了。为了便于分析,我们将查找元素的流程反过来,从目的元素向头节点看,也就是变成向上、向左的操作。由于随机层数的计算是相互独立的,因此这样做并无妨。
假设从层数为l的某节点x出发,距离最顶层有k层。如果x有第l+1层的指针就向上走,没有的话就向左走。它们的概率分别为p与1-p。借用William Pugh画好的图如下。
设C[k]为向上走k层要走过的路径长度期望值,就有:C[0] = 0 ; C[k] = (1-p)·(1+C[k]) + p·(1+C[k-1]),最终可得:C[k] = k/p。
接下来设跳表中包含n个元素,那么第1层就有n个元素,第2层平均有np个,第3层平均有np2个……依次类推。可知,跳表层数的期望值为log1/pn。结合上面得出的结论,跳表的平均查找长度就是:[log1/pn - 1] / p。
也就是说,跳表增删查的平均时间复杂度为O(logn),达到了平衡BST的水准。当然它毕竟是不稳定的,如果运气奇差无比,总是无法建立起层级结构的话,最坏时间复杂度仍然会退化到O(n)级别,但这个概率是非常微小的了。
由上面的分析我们还可以知道,跳表中单个节点的层数为1的概率为1-p,层数为2的概率为p(1-p),层数为3的概率为p2(1-p)……依次类推。所以,单个节点层数的期望为:(1-p) · ∑[k=1→+∞](kpk-1) = 1/(1-p)。这也是单个节点中指针数量的平均值,列表如下。
可见,p越小,节点的空间开销也就越小,但(正规化的)查找长度会相应越大。
Redis的跳表实现
跳表在很多框架中都有广泛的应用,除Java并发包及HBase之外,比较著名的是Redis和leveldb。之前一直读Java系的源码,有些腻烦了,并且我最近正好在研究一些Redis调优方面的事情,就干脆拿Redis 4.0.14的源码来讲讲吧。
从zset到zskiplist
跳表在Redis中称为zskiplist,是其提供的有序集合(sorted set/zset)类型底层的数据结构之一。zset的定义如下,位于server.h中。
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;
可见,除了zskiplist之外,zset还使用了KV哈希表dict。Redis中有序集合的默认实现其实是更为普遍的ziplist(压缩双链表),但在redis.conf中有两个参数可以控制它转为zset实现。
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
也就是说,当有序集合中的元素数超过zset-max-ziplist-entries
时,或其中任意一个元素的数据长度超过zset-max-ziplist-value
时,就由ziplist自动转化为zset。具体逻辑参见t_zset.c中的zsetConvert()函数,不再赘述。
扯远了,回来看看zskiplist,它的定义就在zset上面。
typedef struct zskiplistNode {
robj *obj;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned int span;
} level[];
} zskiplistNode;
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;
zskiplist的节点定义是结构体zskiplistNode,其中有以下字段。
- obj:存放该节点的数据。
- score:数据对应的分数值,zset通过分数对数据升序排列。
- backward:指向链表上一个节点的指针,即后向指针。
- level[]:结构体zskiplistLevel的数组,表示跳表中的一层。每层又存放有两个字段:
- forward是指向链表下一个节点的指针,即前向指针。
- span表示这个前向指针跳跃过了多少个节点(不包括当前节点)。
zskiplist就是跳表本身,其中有以下字段。
- header、tail:头指针和尾指针。
- length:跳表的长度,不包括头指针。
- level:跳表的层数。
下图示出一个length=3,level=5的zskiplist。
可见,zskiplist的第1层是个双向链表,其他层仍然是单向链表,这样做是为了方便可能的逆向获取数据的需求。
另外,节点中还会保存前向指针跳过的节点数span,这是因为zset本身支持基于排名的操作,如zrevrank
指令(由数据查询排名)、zrevrange
指令(由排名范围查询数据)等。如果有span值的话,就可以方便地在查找过程中累积出排名了。
以上是zskiplist相对于前述的传统跳表的两点不同,并且都给我们带来了便利。下面我们来继续读代码,看看它的部分具体操作。
创建zskiplist
zslCreate()函数位于t_zset.c中。
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
并不难懂,唯一需要注意的是常量ZSKIPLIST_MAXLEVEL
,它定义了zskiplist的最大层数,值为32,这也是上节图中的节点最高只到L32的原因。
向zskiplist插入元素
插入元素靠的是zslInsert()函数,有点长。
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,obj);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
该方法涉及到的细节很多,其大致执行流程如下:
- 按照前面讲过的查找流程,找到合适的插入位置。注意zset允许分数score相同,这时会根据节点数据obj的字典序来排序。
- 调用zslRandomLevel()方法,随机出要插入的节点的层数。
- 调用zslCreateNode()方法,根据层数level、分数score和数据obj创建出新节点。
- 每层遍历,修改新节点以及其前后节点的前向指针forward和跳跃长度span,也要更新最底层的后向指针backward。
其中维护了两个数组update和rank。update数组用来记录每一层的最后一个分数小于待插入score的节点,也就是插入位置。rank数组用来记录上述插入位置的上一个节点的排名,以便于最后更新span值。
随机计算层数的zslRandomLevel()方法如下。
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
注意p值由ZSKIPLIST_P
常量定义,值为0.25,即被插入到高层的概率为1/4。
下图示出插入元素的流程。
查询元素排名/获取排名对应元素
从zslGetRank()和zslGetElementByRank()这两个函数可以更明显地看出通过累积span字段的值获取到排名的操作。代码本身比较容易理解,如下所示。
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
}
if (x->obj && equalStringObjects(x->obj,o)) {
return rank;
}
}
return 0;
}
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
zskiplistNode *x;
unsigned long traversed = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
{
traversed += x->level[i].span;
x = x->level[i].forward;
}
if (traversed == rank) {
return x;
}
}
return NULL;
}
Redis作者对采用跳表的解释
比起我多费口舌,不如来看看Salvatore Sanfillipo(@antirez)本人说的话。他多年之前在Hacker News的一篇帖子上解释了自己为什么要在Redis中用跳表而不是树,原文如下,浅显易懂,就不翻译了:
- They are not very memory intensive. It's up to you basically. Changing parameters about the probability of a node to have a given number of levels will make then less memory intensive than btrees.
- A sorted set is often target of many ZRANGE or ZREVRANGE operations, that is, traversing the skip list as a linked list. With this operation the cache locality of skip lists is at least as good as with other kind of balanced trees.
- They are simpler to implement, debug, and so forth. For instance thanks to the skip list simplicity I received a patch (already in Redis master) with augmented skip lists implementing ZRANK in O(log(N)). It required little changes to the code.
The End
谢谢食用~
网友评论