[TOC]
参考
- 实现无锁的栈与队列(5):Hazard Pointer
- Lock-free 编程:A Case Study(下)
- Lock Free中的Hazard Pointer(中)
- Lock Free中的Hazard Pointer(上)
- Lock Free中的Hazard Pointer(下)
- 并行编程中的内存回收Hazard Pointer
- [LockFree之美] 使用Hazard Version实现的无锁Stack与Queue
- Lock-free 编程:A Case Study(上)
- 共享变量的并发读写
- http://blog.kongfy.com/2017/02/hazard-pointer/
- http://blog.kongfy.com/2017/02/%e6%97%a0%e9%94%81%e9%98%9f%e5%88%97%e7%9a%84%e4%b8%80%e7%a7%8d%e5%ae%9e%e7%8e%b0/#more-1627
- https://blog.csdn.net/zhang_shuai_2011/article/details/39636889
- https://www.cnblogs.com/cobbliu/articles/8370746.html
[2-4]比较全面的解释了hp,很值得一读。
1. COW + CAS + 引用计数 实现一个简单的无锁map
参考gcc atiomic 接口,gcc提供了CAS接口如下
// for gcc
bool __sync_bool_compare_and_swap(type *ptr, type oldval, type newval, ...)
type __sync_val_compare_and_swap(type *ptr, type oldval, type newval, ...)
但是我们需要128位的cas操作,引入cas2,参考Lock-free 编程:A Case Study(上),直接上源码
// define uint128_t
#ifndef uint128_t
#define uint128_t __uint128_t
#endif
// cas2 for 128 bit atomic
#define CAS2(val, oldval, newval) \
__sync_bool_compare_and_swap((uint128_t*)(val), \
(*(uint128_t*)(&oldval)), (*(uint128_t*)(&newval)))
// Align memory to 128-bit
#define MEM_ALIGNED __attribute__(( __aligned__(16) ))
template <typename K, typename V>
class LockFreeMap2 {
private:
struct MapPointer {
std::map<K, V> *p_;
long count_;
MapPointer() {
p_ = new std::map<K, V>();
count_ = 0;
};
} MEM_ALIGNED; // align to 128 bit
typedef struct MapPointer MapPointer;
MapPointer pMap_;
public:
V LookUp(K &key) {
MapPointer pOld, pNew;
do {
pOld = pMap_;
pNew = pOld;
pNew.count_++;
} while (!CAS2(&pMap_, pOld, pNew));
V temp = (*pNew.p_)[key];
do {
pOld = pMap_;
pNew = pOld;
pNew.count_--;
} while (!CAS2(&pMap_, pOld, pNew));
return temp;
}
void Update(K &key, V& val) {
MapPointer pOld, pNew;
do {
pOld.p_ = pMap_.p_;
if(pNew.p_ != nullptr) {
delete pNew.p_;
}
pNew.p_ = new std::map<K, V>(*pOld.p_);
(*pNew.p_)[key] = val;
} while(!CAS2(&pMap_, pOld, pNew));
delete pOld.p_;
}
};
以上代码中,首先是定义了MapPointer(包含map的指针和map引用的count),占用16个字节(128位),配合CAS2,则可以实现MapPointer(内部map的指针和map引用的count)的原子操作。
在每次读取之前,我们需要先通过 CAS2 来增加 pMap_ 对应的 count_ 值,在获得了所需值之后,我们再将其对应的 count_ 值减一。因此 count_ 值为 0 时表示当前可以安全地释放 pMap_ 指向的内存。
Update 操作的时候,pNew和pOld的count都初始化为0,pOld.p_ = pMap_.p_;
每次均先将当前的 pMap_ 记录下来,然后将记录下来的数据复制到 pNew 中,并且更新 pNew。如果 CAS2
执行成功,表明当前 pMap_ 的 count_ 值一定为 0,因此将被替换成为 pNew;替换完成之后,可以安全的删除 pOld 中的垃圾内存。
2. CAS实现无锁stack
参考[LockFree之美] 使用Hazard Version实现的无锁Stack与Queue,可以有这样的实现。
#define CAS(val, oldval, newval) \
__sync_bool_compare_and_swap((val), (oldval), (newval))
void push(Node *node) {
Node *curr = top;
Node *old = curr;
node→next = curr;
while (old != (curr = CAS(&top, old, node))) {
old = curr;
node→next = curr;
}
}
Node *pop() {
Node *curr = top;
Node *old = curr;
while (NULL != curr && old != (curr = CAS(&top, old, curr→next))) {
old = curr;
}
return curr
}
push的逻辑没问题,问题在于pop,它在还未“锁定”top的情况下,就引用了top的next字段,而此时的top指向的内存空间可能已经被释放甚至重用,一方面可能直接引起非法地址访问使得程序直接崩溃(这就是和上文一样的内存释放问题);更严重的可能造成隐蔽的“ABA问题”,考虑如下时序:
- Stack状态为 A→B →C,top指向A
- 线程I将top(即节点A的地址)赋值给curr,并取得curr→next指针(为B)放入寄存器
- 线程II执行完整pop流程,Stack状态变为B→C,top指向B,节点A被弹出后释放
- 线程II执行完整pop流程,Stack状态变为C,top指向C,节点B被弹出后释放
- 线程II执行完整push流程,新申请节点为被复用的A,Stack状态变为A→C,top指向A
- 线程I继续执行,CAS判断top值仍为A,则原子替换top为B,链表被改坏
3. Hazard Pointer
上文的两个例子,简单说明了下无锁编程常见的两个问题:
- 内存的释放
- ABA问题
Hazard Pointer(Lock-Free Data Structures with Hazard Pointers)的提出,可以解决这两个问题。本文将基于HP重新实现map[2]。
每一个读线程都拥有一个『单写多读』共享的 Hazard Pointer. 当读线程将想要访问的内存指针 P
赋值给他的 Hazard Pointer 时,它实际上是在向其他的写线程宣布: 我想要从指针 p
指向的内存读取数据了,你可以将指针 p
替换指向新的数据,但你不能修改其中的内容,更不能直接将其删除。
这段描述非常简洁的说明了 Hazard Pointer 的作用:通过遍历检查所有的 Hazard Pointer,我们可以判断 p 指向的内容是否正在被某些读线程所访问,从而可以确定当前能否安全地删除 p 指向的内存。这样说可能不是很明白,下面我们还是从实现 Lock-free Map 的过程来具体说明 Hazard Pointer 的威力。
网友评论