近期学习了一些底层的知识,但是代码方面触碰的很少。为了能落实到实处,我把之前一直拖着的LevelDB源码搬了出来,详细地分析了一下其跳表部分(SkipList)。
https://github.com/google/leveldb/blob/master/doc/index.md
上面的链接是LevelDB的GitHub源码位置,大家可以下载后比对进行学习。
为了能够加速学习的效率,起初我准备参考已经完成的blog来进行分析。后来发现,他们的解析虽然很多,但是都不太细致,也不系统。于是我在自己的理解基础上准备对这个跳表进行一下详细的分析。因为我在学习的过程中遇到了很多疑惑,通过查阅以及揣摩得到了答案,所以我更清楚代码中的核心问题。在此我将逐代码进行分析,并在后续慢慢补充代码中存在的知识点。
希望本文能为读者带来一些收获!
前言
本文同样更新在私人公众号上,在此推广一下欢迎大家关注:
公众号会定期更新一些计算机系统的底层知识,争取以最细节、最简洁的方式帮助读者理解系统的一些知识。
SkipList背景知识
skiplist 由William Pugh 在论文Skip Lists: A Probabilistic Alternative to Balanced Trees 中提出的一种数据结构,skiplist 是一种随机化存储的多层线性链表结构,插入,查找,删除的都是对数级别的时间复杂度。skiplist 和平衡树有相同的时间复杂度,但相比平衡树,skip实现起来更简单。
一个高度为4的skiplist如下图所示:
image从垂直角度看,skiplist 的第0层(最下面一层)以单链表的形式按照从小到大的顺序存储全部数据,越高层的链表的节点数越少,高层的节点用于索引的作用,通过最高层不断索引到第0层从而获得节点。通过在高层较少的节点中查找就可以确定需要定位的位置处于哪个区间,从高层到低层不断缩小查找区间。
LevelDB代码
为了简化我们的分析,我们分模块对代码进行讲解:
Skiplist-节点结构体
首先我们先讲述一下代码中Skiplist的结构体与Class类:
template <typename Key, class Comparator>
struct SkipList<Key, Comparator>::Node {
explicit Node(const Key& k) : key(k) {}
Key const key;
// Accessors/mutators for links. Wrapped in methods so we can
// add the appropriate barriers as necessary.
// 加载Next节点
Node* Next(int n) {
assert(n >= 0);
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
return next_[n].load(std::memory_order_acquire);
}
void SetNext(int n, Node* x) {
assert(n >= 0);
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
next_[n].store(x, std::memory_order_release);
}
// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n) {
assert(n >= 0);
return next_[n].load(std::memory_order_relaxed);
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].store(x, std::memory_order_relaxed);
}
private:
// Array of length equal to the node height. next_[0] is lowest level link.
std::atomic<Node*> next_[1];
};
上述代码为LevelDB中的Node结构,而Node代表跳表中的结点(key:value对)。
其中该结构体中包括两个私有变量:当前节点的Key值以及当前Node指向的下一个结构体next_[1](其中存储的是某一层中大于等于该节点的下一个节点Node,所以只有1即可)。
结构体包含了四个方法Next、SetNext、NoBarrier_Next、NoBarrier_SetNext。前两个方法对应加载与读取(内存使用barrier按需进行),后两个方案对应非屏障方案(可以乱序执行)加载与读取。这四种方案均对next_[1]进行操作。Node结构图如下所示。
后续我们专门对内存屏障做一些分享。
imageLevelDB中跳表的工具类
class Iterator {
public:
// Initialize an iterator over the specified list.
// The returned iterator is not valid.
explicit Iterator(const SkipList* list);
// Returns true iff the iterator is positioned at a valid node.
bool Valid() const;
// Returns the key at the current position.
// REQUIRES: Valid()
const Key& key() const;
// Advances to the next position.
// REQUIRES: Valid()
void Next();
// Advances to the previous position.
// REQUIRES: Valid()
void Prev();
// Advance to the first entry with a key >= target
void Seek(const Key& target);
// Position at the first entry in list.
// Final state of iterator is Valid() iff list is not empty.
void SeekToFirst();
// Position at the last entry in list.
// Final state of iterator is Valid() iff list is not empty.
void SeekToLast();
private:
const SkipList* list_;
Node* node_;
// Intentionally copyable
};
其中包括了7public个函数以及2个私有成员变量。
而工具类隶属于SkipList类中,并且在SkipList中同样有Node成员、kMaxHeight(level最高高度)、以及插入函数Insert()、包含函数Contains()、随机高度函数RandomHeight()、比较函数KeyIsAfterNode()以及三个搜索函数。
源码为:
class SkipList {
下面我们针对跳表的关键函数进行分析。
我们知道,对数据的操作包括最重要的四个步骤:增、删、改、查。而LevelDB中的SkipList并没有删除、修改操作,所以我们这里重点Follow插入与查询的过程。
我们首先在理论上分析一下跳表是如何插入数据的:
插入操作
我们来看一个插入数据的例子。
image我们将key-value
存放在在内存中采用的结构是 skiplist
。上图分为4层结构,由底部0到最上层3。
在上图中插入 30
:
-
先得到一个随机值( 1 到 kMaxHeight 之间)来设置 30 这个节点的层数,假设得到的是 5,定义一个 prev[5],用来存放这五层中30 的前一个节点。这里这个随机数 5 意味着从第0层到第4层均会存储30这个值;
-
上图中也就 4 层,咋整?(skiplist 有两个关于层数的变量,第一个是 kMaxHeight,表示允许的最大层;第二个是 max_height_,表示 skiplist 当前最大层);
-
没事,将大于 4 的层以上的节点的 prev 全部指向 head_,也就是将 prev[4] = head_(数组第五个位置赋值为head_), 并将当前最大层数 max_height_ 设置为 5;
-
还是按照上面查询的步骤,只不过把每次降一层时的节点放在 prev 中,那么;
prev[4] = head_(第5层);
说白了,假设我们插入的值是x,如果我们随机到的值是n,那么该数字在0~n-1层都会出现;****其中prev[]数组中存储的值是每一层x所在位置的前一个数(<=所插入值的值)。
PS:(head ---> 0 ---> 1 ---> 2 ---> 4.......,如果x为3,那么这里prev存储的就是2这个点)
得到 prev 后,我们要根据prev中的值来更新x中的内容。
在x结点内部存在私有变量next_[n],这个私有变量存储x在每一层的后续结点,例如下面图中的7这个结点:
7这个节点中的next_[0]代表第0层的10;****next_[1]代表第一层的15;****next_[2]代表第二层的31;****next_[3]代表第三层的31;
image了解了这些基本概念后,我们看代码就会非常轻松:
void SkipList<Key, Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
Node* prev[kMaxHeight];
// 在prev数组中找到不小于key的数 根据函数,这里传入指针的指针,即直接将prev[kMaxHeight]的值修改掉
Node* x = FindGreaterOrEqual(key, prev);
// Our data structure does not allow duplicate insertion
assert(x == nullptr || !Equal(key, x->key));
// 随机化一个高度出来
int height = RandomHeight();
// 如果随机出的高度 > 已有的高度
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
// 如果高度 > 当前的高度,那么会将当前高度之上的所有节点之上均初始化head节点
prev[i] = head_;
}
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (nullptr), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
//可以进行并发操作,这个高度不冲突
max_height_.store(height, std::memory_order_relaxed);
}
x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
// 在i层中的当前x节点所指向的下一个节点为(prev[i]的下一个节点)
// —— 即将x插入到prev[i] 与 next_prev[i]之间
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}
该流程分如下步骤:
1 首先初始化Node数组:
Node* prev[kMaxHeight];
2 调用函数FindGreaterOrEqal()与断言机制,有两个作用:首先是将prev[]数组赋初值(传入的是引用,可以直接把prev改掉),之后获取到x(>或者=key的值),并对x进行判断,如果相等则退出(只能找到>的值,而在list中不允许有相等的情况出现)
Node* x = FindGreaterOrEqual(key, prev);
3 随机一个高度出来
int height = RandomHeight();
这里详细扩展一下这个随机函数:
int height = RandomHeight();
这里详细扩展一下这个随机函数:
// 得到一个随机的高度
template<typename Key, class Comparator>
int SkipList<Key,Comparator>::RandomHeight() {
static const unsigned int kBranching = 4;
// 初始为 1
int height = 1;
// 得到一个随机值,如果随机值是 4 的倍数就返回 height,否则 keight 就加 1,为什么要这么做?
// 如果我们得到一个随机值,直接对 kMaxHeight 取模加 1,然后赋值给 height,那么 height 在 [1~12] 之前出现的概率一样的
// 如果节点个数为 n,那么有 12 层的节点有 n/12 个,11 层的有 n/12+n/12(需要把12层的也加上),
// 节点太多,最上层平均前进一次才右移 12 个节点,下面层就更不用说了,效率低;
// 作者的方法是每一层会按照4的倍数减少,出现4层的概率只有出现3层概率的1/4,这样查询起来效率是不是大大提升了呢
while (height < kMaxHeight && ((rnd_.Next() % kBranching) == 0)) {
height++;
}
assert(height > 0);
assert(height <= kMaxHeight);
return height;
}
- // 得到一个随机值,如果随机值是 4 的倍数就返回 height,否则 keight 就加 1,为什么要这么做?**
-
// 如果我们得到一个随机值,直接对 kMaxHeight 取模加 1,然后赋值给 height,那么 height 在 [1~12] 之前出现的概率一样的**
-
// 如果节点个数为 n,那么有 12 层的节点有 n/12 个,11 层的有 n/12+n/12(需要把12层的也加上),**
-
// 节点太多,最上层平均前进一次才右移 12 个节点,下面层就更不用说了,效率低;**
-
// 作者的方法是每一层会按照4的倍数减少,出现4层的概率只有出现3层概率的1/4,这样查询起来效率是不是大大提升了呢**
说白了就是让上层的数字不要太多,下层的可以多一些。呈现倒漏斗形状。
4 之后如果当前随机到的高度比list中最大值要大(比如例子中本来只有4层,而我们插入30时随机到了5),那么将多出来的层中的prev[]指向头结点(因为当前层是新建的所以没有其他结点emmmm)。
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
// 如果高度 > 当前的高度,那么会将当前高度之上的所有节点之上均初始化head节点
prev[i] = head_;
}
5 当我们得到了prev[]数组,以及所需插入key的后面的结点x后(x>key),我们很顺利进行更新操作了!!!
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
// 在i层中的当前x节点所指向的下一个节点为(prev[i]的下一个节点)
// —— 即将x插入到prev[i] 与 next_prev[i]之间
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
加入原始第0层结点为 head ——> 5 ——> 10 ——> 15 ——>20。此时我们需要插入一个结点18,那么prev[0]指向15,x为20。所以第5步得到更新:
head ——> 5 ——> 10 ——> 15 ——> 18 ——> 20
查询操作
image我们来看上述的结构,从0到4共有五层,此时我们计划查询29。
- 首先 skiplist 有一个变量 head_ 始终指向第一个节点的最上层,也就是图中最上层的 head_
- 29 和第 4 层 head_ 的 next 节点 7 比较,大于 7,此时头节点指向第 4 层的 7
- 29 和第 4 层 7 的 next 节点 31 比较,小于 31,降一层
- 29 和第 3 层 7 的 next 节点 31 比较,小于 31,降一层
- 29 和第 2 层 7 的 next 节点 15 比较,大于 15,此时头节点指向第 2 层的 15
- 29 和第 2 层 15 的 next 节点 31 比较,小于 31,降一层
- 29 和第 1 层 15 的 next 节点 24 比较,大于 24,此时头节点指向第 1 层的 24
- 29 和第 1 层 24 的 next 节点 29 比较,等于 29,且是第一层,没错就是你了
代码中的查询函数:
inline void SkipList<Key, Comparator>::Iterator::Seek(const Key& target) {
node_ = list_->FindGreaterOrEqual(target, nullptr);
}
而Seek指向了FindGreaterOrEqual()。
于是我们查看该函数:
// FindGreaterOrEqual 找到>或者=Key的节点,prev[]中存储的内容是每一层中比key小的数是哪个
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>:
:Node*
SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key,
Node** prev) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
// next指向的是x同层的下一个节点,可能为空
Node* next = x->Next(level);
// 如果key在next的后面,x指针指向next
if (KeyIsAfterNode(key, next)) {
// Keep searching in this list
x = next;
} else {
// 如果next为空或key在next的前面,
// 说明要插入的key是要比当前层的next节点小的即(head——>next,而key<next,所以放head与next中间)
// 此时要把当前prev[level]存上head,即找到当前层中比key小的点
// 即prev[level]中存的是各个层中比key小的点
if (prev != nullptr) prev[level] = x;
if (level == 0) {
// 层数为0的话,说明两个节点之间没有挑过其他节点,
// 而此时next是大于或等于key的,key肯定是大于next的前一个节点的(不大于的话走不到next节点)
// 返回的next节点只能大于或等于key节点
return next;
} else {
// Switch to next list
level--;
}
}
}
}
该函数需要我们传入需要查询的key值以及用于存储key在每一层的前一个数的prev[]数组。
1 首先该函数会初始化Node x,指向头结点head
Node* x = head_;
2 之后进入循环,该循环首先会从最顶层的level开始向下延伸
Node* next = x->Next(level);
将next指向head后面的结点,并且一层一层向后指
3 寻找key>next的情况,不断寻找指导找到紧邻key前面的那个值
if (KeyIsAfterNode(key, next)) {
// Keep searching in this list
x = next;
}
4 如果结点next比key大(注意,这里的next是x的下一个节点,所以x还是要小于key的),那么就向下层走。
else {
// 如果next为空或key在next的前面,
// 说明要插入的key是要比当前层的next节点小的即(head——>next,而key<next,所以放head与next中间)
// 此时要把当前prev[level]存上head,即找到当前层中比key小的点
// 即prev[level]中存的是各个层中比key小的点
if (prev != nullptr) prev[level] = x;
if (level == 0) {
// 而此时next是大于或等于key的,key肯定是大于next的前一个节点的(不大于的话走不到next节点)
// 返回的next节点只能大于或等于key节点
return next;
} else {
// Switch to next list
level--;
}
}
当搜索到最后一层时(第0层是数据最全的一层),此时如果遇到next>key的情况,那么就找到了正确的数据,我们就返回next即可。
其余工具函数解析
新建Node并分配空间的函数:
// 创建一个新的节点,而分配内存为:节点结构体大小+高度(每一层高度中均可以存一个Node)
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::NewNode(
const Key& key, int height) {
char* const node_memory = arena_->AllocateAligned(
sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1));
return new (node_memory) Node(key);
}
该函数中传入node的key值与树的高度height,并且分配空间(sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1)))
即:Node数据结构的大小+(height-1)个所需存储的指向下一个Node的空间(每一层都需要指向下一个结点)
Valid()函数:
inline bool SkipList<Key, Comparator>::Iterator::Valid() const {
return node_ != nullptr;
}
检测node是不是空。
Next函数与Prev函数:
template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Next() {
assert(Valid());
// Node结构体的私有变量,用于在level 0层中查找比他大的key
node_ = node_->Next(0);
}
template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Prev() {
// Instead of using explicit "prev" links, we just search for the
// last node that falls before key.
assert(Valid());
node_ = list_->FindLessThan(node_->key);
if (node_ == list_->head_) {
node_ = nullptr;
}
}
使用node结构体中存储的第0层的私有变量导出大于他的数(Next(0))
而找小于key的数需要调用下面的函数:
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindLessThan(const Key& key) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
// 条件返回错误,则终止程序执行。即必须满足当前x节点是头节点 或者 x的值比传入的key要小
assert(x == head_ || compare_(x->key, key) < 0);
Node* next = x->Next(level);
// 如果发现x的next的值比key要大或者与key相等,则将x返回
if (next == nullptr || compare_(next->key, key) >= 0) {
if (level == 0) {
return x;
} else {
// Switch to next list
level--;
}
} else {
x = next;
}
}
}
LevelDB中的跳表最核心的函数基本上分析完成了,其实最难理解的还是那个跳表Node的结构体,只要理解了他里面存储是是key以及多个level中的next结点。其中他在结构体代码中写到:
// Array of length equal to the node height. next_[0] is lowest level link.
std::atomic<Node*> next_[1];
总结
这个next_[1]应该是个变长的数组,里面存储的结点长度是level的高度。所以这个1在这里代表的不是长度为1的意思。需要我们注意一下。
希望本文能为大家带来一些启发,后期会针对levelDB中的并发访问以及内存屏障等内容进行深入探讨,欢迎大家继续阅读。
本文记录非常详细,就是为了弥补很多博客不注重细节的问题,所以读起来有可能会相对慢一些,但大家顺序读下来肯定会有很多收获的,感谢。
也感谢https://blog.csdn.net/xuxuan_csd/article/details/72965674博客,为了更清晰的解释我也使用了其中部分解析例子。
转载请标注来源哦!可以搜索微信公众号 :“计算机系统知识分享站” 来加关注。
网友评论