美文网首页
无锁队列

无锁队列

作者: _张晓龙_ | 来源:发表于2016-08-11 23:31 被阅读2893次

简介

无锁队列是lock-free中最基本的数据结构,一般应用场景是资源分配,比如TimerId的分配,WorkerId的分配,上电内存初始块数的申请等等。

对于多线程用户来说,无锁队列的入队和出队操作是线程安全的,不用再加锁控制。

API

ErrCode initQueue(void** queue, U32 unitSize, U32 maxUnitNum);
ErrCode enQueue(void* queue, void* unit);
ErrCode deQueue(void* queue, void* unit);
U32 getQueueSize(void* queue);
BOOL isQueueEmpty(void* queue);

initQueue

初始化队列:根据unitSize和maxUnitNum申请内存,并对内存进行初始化。

enQueue

入队:从队尾增加元素

dequeue

出队:从队头删除元素

getQueueSize

获取队列大小:返回队列中的元素数

isQueueEmpty

队列是否为空:true表示队列为空,false表示队列非空

API使用示例

我们以定时器Id的管理为例,了解一下无锁队列主要API的使用。

初始化:主线程调用


ErrCode ret = initQueue(&queue, sizeof(U32), MAX_TMCB_NUM);
if (ret != ERR_TIMER_SUCC)
{
   ERR_LOG("lock free init_queue error: %u\n", ret);
   return;
}

for (U32 timerId = 0; timerId < MAX_TMCB_NUM; timerId++)
{
    ret = enQueue(queue, &timerId);
    if (ret != ERR_TIMER_SUCC)
    {
        ERR_LOG("lock free enqueue error: %u\n", ret);
        return;
    }
}

timerId分配:多线程调用


U32 timerId;
ErrCode ret = deQueue(queue, &timerId);
if (ret != ERR_TIMER_SUCC)
{
    ERR_LOG("deQueue failed!");
    return INVALID_TIMER_ID;
}

timerId回收:多线程调用

ErrCode ret = enQueue(queue, &timerId);
if (ret != ERR_TIMER_SUCC)
{
    ERR_LOG("enQueue failed!");
}

核心实现

显然,队列操作的核心实现为入队和出队操作。

入队

入队的关键点有下面几点:

  1. 通过写次数确保队列元素数小于最大元素数
  2. 获取next tail的位置
  3. 将新元素插入到队尾
  4. 尾指针偏移
  5. 读次数加1

最大元素数校验

do
  

在入队操作开始,就判断写次数是否超过队列元素的最大值,如果已超过,则反馈队列已满的错误码,否则通过CAS操作将写次数加1。如果CAS操作失败,说明有多个线程同时判断了写次数都小于队列最大元素数,那么只有一个线程CAS操作成功,其他线程则需要重新做循环操作。

获取next tail的位置

do
{
    do
    {
        nextTail = queueHead->nextTail;
    } while (!__sync_bool_compare_and_swap(&queueHead->nextTail, nextTail, (nextTail + 1) % (queueHead->maxUnitNum + 1)));

    unitHead = UNIT_HEAD(queue, nextTail);
} while (unitHead->hasUsed);

当前next tail的位置就是即将入队的元素的目标位置,并通过CAS操作更新队列头中nextTail的值。如果CAS操作失败,则说明其他线程也正在进行入队操作,并比本线程快,则需要进行重新尝试,从而更新next tail的值,确保该入队元素的位置正确。

然而事情并没有这么简单!
由于多线程的抢占,导致队列并不是按下标大小依次链接起来的,所以要判断一下next tail的位置是否正被占用。如果next tail的位置正被占用,则需要重新竞争next tail,直到next tail的位置是空闲的。

将新元素插入到队尾

初始化新元素:

unitHead->next = LIST_END;
memcpy(UNIT_DATA(queue, nextTail), unit, queueHead->unitSize);

插入到队尾:

do
{
    listTail = queueHead->listTail;
    oldListTail = listTail;
    unitHead = UNIT_HEAD(queue, listTail);

    if ((++tryTimes) >= 3)
    {
        while (unitHead->next != LIST_END)
        {
            listTail = unitHead->next;
            unitHead = UNIT_HEAD(queue, listTail);
        }
    }
} while (!__sync_bool_compare_and_swap(&unitHead->next, LIST_END, nextTail));

通过CAS操作判断当前指针是否到达队尾,如果到达队尾,则将新元素连接到队尾元素之后(next),否则进行追赶。

在这里,我们做了优化,当CAS操作连续失败3次后,那么就直接通过next的递推找到队尾,这样比CAS操作的效率高很多。我们在测试多线程的队列操作时,出现过一个线程插入到tail为400多的时候,已有线程插入到tail为1000多的场景。

尾指针偏移

do
{
    __sync_val_compare_and_swap(&queueHead->listTail, oldListTail, nextTail);
    oldListTail = queueHead->listTail;
    unitHead = UNIT_HEAD(queue, oldListTail);
    nextTail = unitHead->next;
} while (nextTail != LIST_END);

在多线程场景下,队尾指针是动态变化的,当前尾可能不是新尾了,所以通过CAS操作更新队尾。当CAS操作失败时,说明队尾已经被其他线程更新,此时不能将nextTail赋值给队尾。

读次数加1

__sync_fetch_and_add(&queueHead->readCount, 1);

写次数加1是为了保证队列元素的数不能超过最大元素数,而读次数加1是为了确保不能从空队列出队。

出队

出队的关键点有下面几点:

  1. 通过读次数确保不能从空队列出队
  2. 头指针偏移
  3. dummy头指针
  4. 写次数减1

空队列校验

do
{
    readCount = queueHead->readCount;
    if (readCount == 0) return ERR_QUEUE_HAS_EMPTY;
} while (!__sync_bool_compare_and_swap(&queueHead->readCount, readCount, readCount - 1));

读次数为0,说明队列为空,否则通过CAS操作将读次数减1。如果CAS操作失败,说明其他线程已更新读次数成功,必须重试,直到成功。

头指针偏移

U32 readCount;
do
{
    listHead = queueHead->listHead;
    unitHead = UNIT_HEAD(queue, listHead);
} while (!__sync_bool_compare_and_swap(&queueHead->listHead, listHead, unitHead->next));

如果CAS操作失败,说明队头指针已经在其他线程的操作下进行了偏移,所以要重试,直到成功。

dummy头指针

memcpy(unit, UNIT_DATA(queue, unitHead->next), queueHead->unitSize);

我们可以看出,头元素为head->next,这就是说队列的第一个元素都是基于head->next而不是head。

这样设计是有原因的。
考虑一个边界条件:在队列只有一个元素条件下,如果head和tail指针指向同一个结点,这样入队操作和出队操作本身就需要互斥了。
通过引入一个dummy头指针来解决这个问题,如下图所示。

dummy-head.png

写次数减1

 __sync_fetch_and_sub(&queueHead->writeCount, 1);

出队操作结束前,要将写次数减1,以便入队操作能成功。

无锁队列的ABA问题分析

我们再看一下头指针偏移的代码:

do
{
    listHead = queueHead->listHead;
    unitHead = UNIT_HEAD(queue, listHead);
} while (!__sync_bool_compare_and_swap(&queueHead->listHead, listHead, unitHead->next));

假设队列中只有两个元素A和B,那么

  1. 线程T1 执行出队操作,当执行到while循环时被线程T2 抢占,线程T1 等待
  2. 线程T2 成功执行了两次出队操作,并free了A和B结点的内存
  3. 线程T3 进行入队操作,malloc的内存地址和A相同,入队操作成功
  4. 线程T1 重新获得CPU,执行while中的CAS操作,发现A的地址没有变,其实内容已经今非昔比了,而线程T1 并不知情,继续更新头指针,使得头指针指向B所在结点,但是B所在结点的内存早已释放。

这就是无锁队列的ABA问题。

为解决ABA问题,我们可以采用具有原子性的内存引用计数等办法,而利用循环数组实现无锁队列也可以解决ABA问题,因为循环数组不涉及内存的动态分配和回收,在线程T2 成功执行两次出队操作后,队列的head指针已经变化(指向到了下标head+2),线程T3 进行入队操作不会改变队列的head指针,当线程T1 重新获得CPU进行CAS操作时,会因失败重新do while,这时临时head更新成了队列head,所以规避了ABA问题。

小结

我们在上一篇简要介绍了无锁编程基础,这一篇通过深度剖析无锁队列,对CAS原子操作的使用有了感性的认识,我们了解了无锁队列的API和核心实现,可以看出,要实现一个没有问题且高效的无锁队列是非常困难的,最后对无锁队列的ABA问题进行了实例化,笔者在无锁队列的编程实践中通过循环数组的方法规避了ABA问题。

你是否已感到有些烧脑?
我们将在下一篇文章中深度剖析无锁双向链表,是否会烧的更厉害?让我们拭目以待。

相关文章

  • 无锁数组队列

    无锁数组队列

  • 无锁队列

    简介 无锁队列是lock-free中最基本的数据结构,一般应用场景是资源分配,比如TimerId的分配,Worke...

  • 无锁队列

    rte_ring 关键点 无锁: rte_atomic32_cmpset 直到成功(CAS)环: 总长度c...

  • 无锁环形队列

    并发编程中,经常会遇到资源竞争问题,而保持竞争资源的正确使用,可以通过锁的方式,但synchronized blo...

  • JUC并发集合总结

    ConcurrentLinkedQueue 线程安全的支持高并发的队列,使用链表实现。非阻塞,无锁,无界。该队列也...

  • 多线程之非阻塞队列

    ConcurrentLinkedQueue 相对于阻塞队列加锁实现阻塞,非阻塞队列采用无锁CAS的方式来实现。

  • 无锁队列的总结

    首次接触无锁数据结构的设计,请各位大佬多多指教~~~ CAS(Compare && Swap)原子操作 CAS是无...

  • 无锁队列C实现

    入队列 我们可以看到,程序中的那个 do- while 的 Re-Try-Loop。就是说,很有可能我在准备在队列...

  • 内核无锁队列 -- kfifo

    理论证明,在一个生产者和一个消费者的情况下,两者之间的同步无需加锁,即可并发访问。Linux内核无锁队列kfifo...

  • java多线程-7-ReentrantReadWriteLock

    概述 成功获取读锁(含读锁重入),会有自旋锁(CLH,无饥饿)特性,传递唤醒队列线程直到写锁或队尾 释放锁时比较严...

网友评论

      本文标题:无锁队列

      本文链接:https://www.haomeiwen.com/subject/htvssttx.html