美文网首页
无锁队列C++实现

无锁队列C++实现

作者: 沐一同学 | 来源:发表于2018-04-02 14:43 被阅读0次

    实现了一个lock free queue,自测通过。有需要的拿去。


    #ifndef __LFQUEUE_H__

    #define __LFQUEUE_H__

    #include <thread>

    #include <iostream>

    #include <atomic>

    #include "dtype.h"

    #include "os/Win.h"

    #include "logger.h"

    #define CAS2(obj, expected, desired) std::atomic::atomic_compare_exchange_weak(obj, expected, desired);

    #ifdef WIN32

    #define CAS(ptr, oldvalue, newvalue) InterlockedCompareExchange(ptr, newvalue, oldvalue)

    #else

    #define CAS(ptr, oldvalue, newvalue) __sync_val_compare_and_swap(ptr , oldvalue , newvalue)

    #endif

    typedef struct queue_elem

    {

    void* data;

    bool origin; // 0: from GRPC, 1: from Self

    queue_elem(void* e, uint32 o) { data = e; origin = o; }

    } QueueElem;

    class LockFreeQueue

    {

    public:

    LockFreeQueue(int32 capicity)

    : capicity_(capicity), header_(0), tailer_(0), guard_(0)

    {

    if (capicity_ < 4) { capicity_ = 4; }

    lf_queue_ = new QueueElem*[capicity_];

    for (int i = 0; i < capicity_; i++)

    lf_queue_[i] = NULL;

    }

    ~LockFreeQueue() { if (lf_queue_) { delete [] lf_queue_; } }

    public:

    enum QStatus {

    Empty,

    Full,

    Normal,

    Unknown

    };

    //bool isempty() { return header_ == tailer_; }

    // guard_ is the maximum dequeue item

    bool isempty() { return header_ == guard_; }

    bool isfull() { return (internal_index(tailer_ + 1)) == header_; }

    int32 internal_index(int32 v) { return (v % capicity_); }

    bool enqueue(QueueElem* item)

    {

    int32 temp,guard;

    assert(item);

    do

    {

    // fetch tailer_ first and then judge isfull, else encounter concurrent problem

    temp = tailer_;

    guard = guard_;

    if (isfull())

    {

    return false;

    }

    // cross operate

    if (CAS(&tailer_, temp, internal_index(temp + 1)) == temp)

    {

    lf_queue_[temp] = item;

    // update the guard_ for the max dequeue item

    CAS(&guard_, guard, internal_index(guard + 1));

    break;

    }

    else

    {

    //std::cout << "enqueue Cas failure one times" << std::endl;

    }

    } while (true);

    return true;

    }

    bool dequeue(QueueElem** item)

    {

    int32 temp;

    do

    {

    // fetch header first and then judge isempty, else encounter concurrent problem

    temp = header_;

    *item = NULL;

    if (isempty())

    {

    return false;

    }

    // cross operate CAS failure

    *item = lf_queue_[temp];

    if (CAS(&header_, temp, internal_index(temp + 1)) == temp)

    {

    // some producer lock one slot, but doesn't push back

    // while (!lf_queue_[temp])

    // {

    // std::this_thread::yield();

    // }

    //*item = lf_queue_[temp];

    lf_queue_[temp] = NULL;

    break;

    }

    else

    {

    //std::cout << "dequeue Cas failure one times" << std::endl;

    }

    } while (true);

    return true;

    }

    private:

    QueueElem** lf_queue_;

    int32 capicity_;

    #ifdef WIN32

    LONG header_;

    LONG tailer_;

    LONG guard_; // header <= guard <= tailer

    #else

    int32 header_;

    int32 tailer_;

    int32 guard_; // header <= guard <= tailer

    #endif

    };

    #endif


    相关文章

      网友评论

          本文标题:无锁队列C++实现

          本文链接:https://www.haomeiwen.com/subject/eafxhftx.html