美文网首页
无锁实现之Hazard Pointer (to be conti

无锁实现之Hazard Pointer (to be conti

作者: GOGOYAO | 来源:发表于2019-10-15 17:18 被阅读0次

    [TOC]

    参考

    1. 实现无锁的栈与队列(5):Hazard Pointer
    2. Lock-free 编程:A Case Study(下)
    3. Lock Free中的Hazard Pointer(中)
    4. Lock Free中的Hazard Pointer(上)
    5. Lock Free中的Hazard Pointer(下)
    6. 并行编程中的内存回收Hazard Pointer
    7. [LockFree之美] 使用Hazard Version实现的无锁Stack与Queue
    8. Lock-free 编程:A Case Study(上)
    9. 共享变量的并发读写
    10. http://blog.kongfy.com/2017/02/hazard-pointer/
    11. http://blog.kongfy.com/2017/02/%e6%97%a0%e9%94%81%e9%98%9f%e5%88%97%e7%9a%84%e4%b8%80%e7%a7%8d%e5%ae%9e%e7%8e%b0/#more-1627
    12. https://blog.csdn.net/zhang_shuai_2011/article/details/39636889
    13. https://www.cnblogs.com/cobbliu/articles/8370746.html

    [2-4]比较全面的解释了hp,很值得一读。

    1. COW + CAS + 引用计数 实现一个简单的无锁map

    参考gcc atiomic 接口,gcc提供了CAS接口如下

    // for gcc
    bool __sync_bool_compare_and_swap(type *ptr, type oldval, type newval, ...)
    type __sync_val_compare_and_swap(type *ptr, type oldval, type newval, ...)
    

    但是我们需要128位的cas操作,引入cas2,参考Lock-free 编程:A Case Study(上),直接上源码

    // define uint128_t
    #ifndef uint128_t
    #define uint128_t __uint128_t
    #endif
    
    // cas2 for 128 bit atomic
    #define CAS2(val, oldval, newval) \ 
        __sync_bool_compare_and_swap((uint128_t*)(val), \
        (*(uint128_t*)(&oldval)), (*(uint128_t*)(&newval)))
        
    
    // Align memory to 128-bit
    #define MEM_ALIGNED __attribute__(( __aligned__(16) ))
    
    template <typename K, typename V>
    class LockFreeMap2 {
    private:
        struct MapPointer {
            std::map<K, V> *p_;
            long count_;
    
            MapPointer() {
                p_ = new std::map<K, V>();
                count_ = 0;
            };
        } MEM_ALIGNED;  // align to 128 bit
    
        typedef struct MapPointer MapPointer;
        MapPointer pMap_;
    
    public:
      V LookUp(K &key) {
        MapPointer pOld, pNew;
        do {
            pOld = pMap_;
            pNew = pOld;
            pNew.count_++;
        } while (!CAS2(&pMap_, pOld, pNew));
        V temp = (*pNew.p_)[key];
        do {
            pOld = pMap_;
            pNew = pOld;
            pNew.count_--;
        } while (!CAS2(&pMap_, pOld, pNew));
        return temp;
      }
      void Update(K &key, V& val) {
        MapPointer pOld, pNew;
        do {
            pOld.p_ = pMap_.p_;
            if(pNew.p_ != nullptr) {
                delete pNew.p_;
            }
            pNew.p_ = new std::map<K, V>(*pOld.p_);
            (*pNew.p_)[key] = val;
        } while(!CAS2(&pMap_, pOld, pNew));
        delete pOld.p_;
      }
    };
    

    以上代码中,首先是定义了MapPointer(包含map的指针和map引用的count),占用16个字节(128位),配合CAS2,则可以实现MapPointer(内部map的指针和map引用的count)的原子操作。

    在每次读取之前,我们需要先通过 CAS2 来增加 pMap_ 对应的 count_ 值,在获得了所需值之后,我们再将其对应的 count_ 值减一。因此 count_ 值为 0 时表示当前可以安全地释放 pMap_ 指向的内存。

    Update 操作的时候,pNew和pOld的count都初始化为0,pOld.p_ = pMap_.p_; 每次均先将当前的 pMap_ 记录下来,然后将记录下来的数据复制到 pNew 中,并且更新 pNew。如果 CAS2 执行成功,表明当前 pMap_ 的 count_ 值一定为 0,因此将被替换成为 pNew;替换完成之后,可以安全的删除 pOld 中的垃圾内存。

    2. CAS实现无锁stack

    参考[LockFree之美] 使用Hazard Version实现的无锁Stack与Queue,可以有这样的实现。

    #define CAS(val, oldval, newval) \
        __sync_bool_compare_and_swap((val), (oldval), (newval))
    
    void push(Node *node) {
        Node *curr = top;
        Node *old = curr;
        node→next = curr;
        while (old != (curr = CAS(&top, old, node))) {
            old = curr;
            node→next = curr;       
        }
    }
    Node *pop() {
        Node *curr = top;
        Node *old = curr;
        while (NULL != curr && old != (curr = CAS(&top, old, curr→next))) {
            old = curr;
        }
        return curr
    }
    

    push的逻辑没问题,问题在于pop,它在还未“锁定”top的情况下,就引用了top的next字段,而此时的top指向的内存空间可能已经被释放甚至重用,一方面可能直接引起非法地址访问使得程序直接崩溃(这就是和上文一样的内存释放问题);更严重的可能造成隐蔽的“ABA问题”,考虑如下时序:

    • Stack状态为 A→B →C,top指向A
    • 线程I将top(即节点A的地址)赋值给curr,并取得curr→next指针(为B)放入寄存器
    • 线程II执行完整pop流程,Stack状态变为B→C,top指向B,节点A被弹出后释放
    • 线程II执行完整pop流程,Stack状态变为C,top指向C,节点B被弹出后释放
    • 线程II执行完整push流程,新申请节点为被复用的A,Stack状态变为A→C,top指向A
    • 线程I继续执行,CAS判断top值仍为A,则原子替换top为B,链表被改坏

    3. Hazard Pointer

    上文的两个例子,简单说明了下无锁编程常见的两个问题:

    • 内存的释放
    • ABA问题

    Hazard Pointer(Lock-Free Data Structures with Hazard Pointers)的提出,可以解决这两个问题。本文将基于HP重新实现map[2]。

    每一个读线程都拥有一个『单写多读』共享的 Hazard Pointer. 当读线程将想要访问的内存指针 P 赋值给他的 Hazard Pointer 时,它实际上是在向其他的写线程宣布: 我想要从指针 p 指向的内存读取数据了,你可以将指针 p 替换指向新的数据,但你不能修改其中的内容,更不能直接将其删除。

    这段描述非常简洁的说明了 Hazard Pointer 的作用:通过遍历检查所有的 Hazard Pointer,我们可以判断 p 指向的内容是否正在被某些读线程所访问,从而可以确定当前能否安全地删除 p 指向的内存。这样说可能不是很明白,下面我们还是从实现 Lock-free Map 的过程来具体说明 Hazard Pointer 的威力。

    相关文章

      网友评论

          本文标题:无锁实现之Hazard Pointer (to be conti

          本文链接:https://www.haomeiwen.com/subject/qigsmctx.html