美文网首页
跳跃表详解

跳跃表详解

作者: _假行僧_ | 来源:发表于2020-09-28 14:07 被阅读0次

    概述

    跳跃表是一个随机化的数据结构,可以看做是二叉树的一个变种,它在性能上和红黑树,AVL树不想上下。目前在Reids和lucene的倒排索引文件中都有应用(后文会介绍跳跃表在这两种情况下的应用)。

    原理

    跳表的原理非常简单,跳表其实就是一种可以进行二分查找的有序链表。跳表的数据结构模型如图:


    跳跃表.png

    可以看到,跳表在原有的有序链表上面增加了多级索引,通过索引来实现快速查找。首先在最高级索引上查找最后一个小于当前查找元素的位置,然后再跳到次高级索引继续查找,直到跳到最底层为止,这时候以及十分接近要查找的元素的位置了(如果查找元素存在的话)。由于根据索引可以一次跳过多个元素,所以跳查找的查找速度也就变快了

    在有n个元素的普通有序链表里查询一个数据最坏情况下需要比较n次,因此时间复杂度为O(n)。如果在链中部节点加一个指针,则比较次数可以减少到n/2+1。搜索时,首先将欲搜索元素与中间元素进行比较。如果欲搜索的元素较小,则仅需搜索链表的左半部分,否则,只要在链表右半部分进行比较即可。由此可见,跳跃表的查询的时间复杂度为O(logn)。插入,删除也是寻找合适的位置的操作,因此时间复杂度均为O(logn),最坏情况下时间复杂度为O(n)。

    层的分配

    每一次插入一个新结点时,最好的做法就是根据当前表的结构得到一个合适的层数,插入后可以让跳跃表尽量接近理想的结构,但这在实现上会非常困难。Pugh 的论文中提出的方法是根据概率随机为新结点生成一个层数,具体的算法如下:
    a.给定一个概率 p(p 小于 1),产生一个 [0,1) 之间的随机数;
    b.如果这个随机数小于 p,则层数加 1;
    c.重复以上动作,直到随机数大于概率 p(或层数大于程序给定的最大层数限制)。

    完整代码:

    #include <iostream>
    #include <cstdlib>
    #include <cstring>
    #define P 0.25
    #define MAX_LEVEL 32
    using namespace std;
    struct Node
    {
        int key;
        Node ** forward;
        Node(int key = 0, int level = MAX_LEVEL)
        {
            this->key = key;
            forward = new Node*[level];
            memset(forward, 0, level * sizeof(Node*));
        }
    };
    
    class SkipList
    {
    private:
        Node * header;
        int level;
    private:
        int random_level();
    public:
        SkipList();
        ~SkipList();
        bool insert(int key);
        bool find(int key);
        bool erase(int key);
        void print();
    };
    //确定层数
    int SkipList::random_level()
    {
        int level = 1;
        while ((rand() & 0xffff) < (P * 0xffff) && level < MAX_LEVEL)
        {
            level++;
        }
        return level;
    }
    
    SkipList::SkipList()
    {
        header = new Node;
        level = 0;
    }
    
    SkipList::~SkipList()
    {
        Node * cur = header;
        Node * next = nullptr;
        while (cur)
        {
            next = cur->forward[0];
            delete cur;
            cur = next;
        }
        header = nullptr;
    }
    
    bool SkipList::insert(int key)
    {
        Node * node = header;
        Node * update[MAX_LEVEL];
        memset(update, 0, MAX_LEVEL * sizeof(Node*));
        for (int i = level - 1; i >= 0; i--)
        {
            while (node->forward[i] && node->forward[i]->key < key)
            {
                node = node->forward[i];
            }
            update[i] = node;
        }
        node = node->forward[0];
        if (node == nullptr || node->key != key)
        {
            int new_level = random_level();
            if (new_level > level)
            {
                for (int i = level; i < new_level; i++)
                {
                    update[i] = header;
                }
                level = new_level;
            }
            Node * new_node = new Node(key, new_level);
            for (int i = 0; i < new_level; i++)
            {
                new_node->forward[i] = update[i]->forward[i];
                update[i]->forward[i] = new_node;
            }
            return true;
        }
        return false;
    }
    
    bool SkipList::find(int key)
    {
        Node * node = header;
        for (int i = level - 1; i >= 0; i--)
        {
            while (node->forward[i] && node->forward[i]->key <= key)
            {
                node = node->forward[i];
            }
            if (node->key == key)
                return true;
        }
        return false;
    }
    
    bool SkipList::erase(int key)
    {
        Node * node = header;
        Node * update[MAX_LEVEL];
        fill(update, update + MAX_LEVEL, nullptr);
        for (int i = level - 1; i >= 0; i--)
        {
            while (node->forward[i] && node->forward[i]->key < key)
            {
                node = node->forward[i];
            }
            update[i] = node;
        }
        node = node->forward[0];
        if (node && node->key == key)
        {
            for (int i = 0; i < level; i++)
            {
                if (update[i]->forward[i] == node)
                {
                    update[i]->forward[i] = node->forward[i];
                }
            }
    
            delete node;
            for (int i = level - 1; i >= 0; i--)
            {
                if (header->forward[i] == nullptr)
                    level--;
                else
                    break;
            }
        }
        return false;
    }
    
    void SkipList::print()
    {
        Node * node = nullptr;
        for (int i = 0; i < level; i++)
        {
            node = header->forward[i];
            cout << "Level " << i << " : ";
            while (node)
            {
                cout << node->key << " ";
                node = node->forward[i];
            }
            cout << endl;
        }
        cout << endl;
    }
    

    应用实例

    在Lucene倒排列表中的应用

    这里参考的是clucene-core-2.3.3.4的源码
    可在下方链接获取源码:https://github.com/songqiyuan/CLucene
    跳跃表存储以三层为例:

     * Example for skipInterval = 3:
     *                                                     c            (skip level 2)
     *                 c                 c                 c            (skip level 1) 
     *     x     x     x     x     x     x     x     x     x     x      (skip level 0)
     * d d d d d d d d d d d d d d d d d d d d d d d d d d d d d d d d  (posting list)
     *     3     6     9     12    15    18    21    24    27    30     (df)
     * 
    

    1.跳跃表可根据倒排表本身的长度(DocFreq)和跳跃的幅度(SkipInterval)而分不同的层次,层次数为NumSkipLevels = Min(MaxSkipLevels, floor(log(DocFreq/log(SkipInterval)))).
    第Level层的节点数为DocFreq/(SkipInterval^(Level + 1)),level从零计数。

    2.除最低层以外,其他层都有SkipChildLevelPointer来指向下一层相应的节点所在文件位置。
    确定层数和跳跃表的构建代码如下:

    /*SkipListWriter.cpp*/
    void MultiLevelSkipListWriter::bufferSkip(int32_t df){
      int32_t numLevels;
     
      // determine max level
      for (numLevels = 0; (df % skipInterval) == 0 && numLevels < numberOfSkipLevels; df /= skipInterval) {
        numLevels++;
      }
      
      int64_t childPointer = 0;
      
      for (int32_t level = 0; level < numLevels; level++) {
        writeSkipData(level, (*skipBuffer)[level]);
        
        int64_t newChildPointer = (*skipBuffer)[level]->getFilePointer();
        
        if (level != 0) {
          // store child pointers for all levels except the lowest
          (*skipBuffer)[level]->writeVLong(childPointer);
        }
        
        //remember the childPointer for the next level
        childPointer = newChildPointer;
      }
    }
    
    MultiLevelSkipListWriter::MultiLevelSkipListWriter(int32_t skipInterval, int32_t maxSkipLevels, int32_t df) {
      this->skipBuffer = NULL;
      this->skipInterval = skipInterval;
      
      // calculate the maximum number of skip levels for this document frequency
      numberOfSkipLevels = df == 0 ? 0 : (int32_t) floor(log((float_t)df) / log((float_t)skipInterval));
      
      // make sure it does not exceed maxSkipLevels
      if (numberOfSkipLevels > maxSkipLevels) {
        numberOfSkipLevels = maxSkipLevels;
      }
    }
    
    跳跃表存储.png

    关于数据存储,有机会再分析CLucene文件格式时,会详细介绍,现在了解即可,这里主要说,跳跃表在lucene中的应用。
    3.除了最高层之外,其他层都有SkipLevelLength来表示此层的二进制长度(而非节点的个数),方便读取某一层的跳跃表到缓存里面。

    4.低层在前,高层在后,当读完所有的低层后,剩下的就是最后一层,因而最后一层不需要SkipLevelLength。这也是为什么Lucene文档中的格式描述为 NumSkipLevels-1, SkipLevel,也即低NumSKipLevels-1层有SkipLevelLength,最后一层只有SkipLevel,没有SkipLevelLength。

    5.每一个跳跃节点包含以下信息:文档号,payload的长度,文档号对应的倒排表中的节点在frq中的偏移量,文档号对应的倒排表中的节点在prx中的偏移量

    在Redis中的应用

    Redis使用跳跃表作为有序集合键的底层实现之一,如果一个有序集合包含的元素数量比较多,又或者有序集合中元素的成员(member)是比较长的字符串时,Redis就会使用跳跃表来作为有序集合键的底层实现

    当有序集合对象可以同时满足以下两个条件时,对象使用ziplist(压缩链表)编码:
    ❑有序集合保存的元素数量小于128个;
    ❑有序集合保存的所有元素成员的长度都小于64字节;不能满足以上两个条件的有序集合对象将使用skiplist(跳表)编码。
    注意以上两个条件的上限值是可以修改的,具体请看配置文件中关于zset-max-ziplist-entries选项和zset-max-ziplist-value选项的说明。

    源码位置:http://download.redis.io/releases/

    /*跳跃表的结构定义在server.h/zskiplist*/
    /*3.0版本的定义在redis.h文件中*/
    /*节点结构定义*/
    /* ZSETs use a specialized version of Skiplists */
    typedef struct zskiplistNode {
        sds ele;
        double score;
        struct zskiplistNode *backward;
        struct zskiplistLevel {
            struct zskiplistNode *forward;
            unsigned long span;
        } level[];
    } zskiplistNode;
    
    typedef struct zskiplist {
        struct zskiplistNode *header, *tail;
        unsigned long length;
        int level;
    } zskiplist;
    
    redis跳表.jpeg

    zskiplistj结构:
    ❑header:指向跳跃表的表头节点。
    ❑tail:指向跳跃表的表尾节点。
    ❑level:记录目前跳跃表内,层数最大的那个节点的层数(表头节点的层数不计算在内)。
    ❑length:记录跳跃表的长度,也即是,跳跃表目前包含节点的数量(表头节点不计算在内)。

    zskiplistNode结构:
    ❑层(level):节点中用L1、L2、L3等字样标记节点的各个层,L1代表第一层,L2代表第二层,以此类推。每个层都带有两个属性:前进指针和跨度。前进指针用于访问位于表尾方向的其他节点,而跨度则记录了前进指针所指向节点和当前节点的距离。在上面的图片中,连线上带有数字的箭头就代表前进指针,而那个数字就是跨度。当程序从表头向表尾进行遍历时,访问会沿着层的前进指针进行。
    ❑后退(backward)指针:节点中用BW字样标记节点的后退指针,它指向位于当前节点的前一个节点。后退指针在程序从表尾向表头遍历时使用。
    ❑分值(score):各个节点中的1.0、2.0和3.0是节点所保存的分值。在跳跃表中,节点按各自所保存的分值从小到大排列。
    ❑成员对象(ele):各个节点中的o1、o2和o3是节点所保存的成员对象。

    redis跳表创建、插入以及删除源码

    这里先讲一下节点中跨度的含义即zskiplistNode 中变量( unsigned long span):

    层的跨度(level[i].span属性)用于记录两个节点之间的距离:
    ❑两个节点之间的跨度越大,它们相距得就越远。
    ❑指向NULL的所有前进指针的跨度都为0,因为它们没有连向任何节点。
    初看上去,很容易以为跨度和遍历操作有关,但实际上并不是这样,遍历操作只使用前进指针就可以完成了,跨度实际上是用来计算排位(rank)的:在查找某个节点的过程中,将沿途访问过的所有层的跨度累计起来,得到的结果就是目标节点在跳跃表中的排位。

    有了前面的跳表基本知识,代码很容易理解、这里就不在赘述.
    跳表中的排序是以分值的大小来排序的。具体代码

    /*在redis中定义的最大层数和计算当前节点应为几层的概率*/
    #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64 elements */
    #define ZSKIPLIST_P 0.25      /* Skiplist P = 1/4 */
    
    /*源码位置:t_zset.c*/
    zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
        zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
        unsigned int rank[ZSKIPLIST_MAXLEVEL];
        int i, level;
    
        serverAssert(!isnan(score));
        x = zsl->header;
        for (i = zsl->level-1; i >= 0; i--) {
            /* store rank that is crossed to reach the insert position */
            rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
            while (x->level[i].forward &&
                    (x->level[i].forward->score < score ||
                        (x->level[i].forward->score == score &&
                        sdscmp(x->level[i].forward->ele,ele) < 0)))
            {
                rank[i] += x->level[i].span;
                x = x->level[i].forward;
            }
            update[i] = x;
        }
        /* we assume the element is not already inside, since we allow duplicated
         * scores, reinserting the same element should never happen since the
         * caller of zslInsert() should test in the hash table if the element is
         * already inside or not. */
        level = zslRandomLevel();
        if (level > zsl->level) {
            for (i = zsl->level; i < level; i++) {
                rank[i] = 0;
                update[i] = zsl->header;
                update[i]->level[i].span = zsl->length;
            }
            zsl->level = level;
        }
        x = zslCreateNode(level,score,ele);
        for (i = 0; i < level; i++) {
            x->level[i].forward = update[i]->level[i].forward;
            update[i]->level[i].forward = x;
    
            /* update span covered by update[i] as x is inserted here */
            x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
            update[i]->level[i].span = (rank[0] - rank[i]) + 1;
        }
    
        /* increment span for untouched levels */
        for (i = level; i < zsl->level; i++) {
            update[i]->level[i].span++;
        }
    
        x->backward = (update[0] == zsl->header) ? NULL : update[0];
        if (x->level[0].forward)
            x->level[0].forward->backward = x;
        else
            zsl->tail = x;
        zsl->length++;
        return x;
    }
    
    /* Internal function used by zslDelete, zslDeleteRangeByScore and
     * zslDeleteRangeByRank. */
    void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
        int i;
        for (i = 0; i < zsl->level; i++) {
            if (update[i]->level[i].forward == x) {
                update[i]->level[i].span += x->level[i].span - 1;
                update[i]->level[i].forward = x->level[i].forward;
            } else {
                update[i]->level[i].span -= 1;
            }
        }
        if (x->level[0].forward) {
            x->level[0].forward->backward = x->backward;
        } else {
            zsl->tail = x->backward;
        }
        while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
            zsl->level--;
        zsl->length--;
    }
    
    /* Delete an element with matching score/element from the skiplist.
     * The function returns 1 if the node was found and deleted, otherwise
     * 0 is returned.
     *
     * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
     * it is not freed (but just unlinked) and *node is set to the node pointer,
     * so that it is possible for the caller to reuse the node (including the
     * referenced SDS string at node->ele). */
    int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
        zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
        int i;
    
        x = zsl->header;
        for (i = zsl->level-1; i >= 0; i--) {
            while (x->level[i].forward &&
                    (x->level[i].forward->score < score ||
                        (x->level[i].forward->score == score &&
                         sdscmp(x->level[i].forward->ele,ele) < 0)))
            {
                x = x->level[i].forward;
            }
            update[i] = x;
        }
        /* We may have multiple elements with the same score, what we need
         * is to find the element with both the right score and object. */
        x = x->level[0].forward;
        if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
            zslDeleteNode(zsl, x, update);
            if (!node)
                zslFreeNode(x);
            else
                *node = x;
            return 1;
        }
        return 0; /* not found */
    }
    

    相关文章

      网友评论

          本文标题:跳跃表详解

          本文链接:https://www.haomeiwen.com/subject/gwuluktx.html