简介
无锁队列是lock-free中最基本的数据结构,一般应用场景是资源分配,比如TimerId的分配,WorkerId的分配,上电内存初始块数的申请等等。
对于多线程用户来说,无锁队列的入队和出队操作是线程安全的,不用再加锁控制。
API
ErrCode initQueue(void** queue, U32 unitSize, U32 maxUnitNum);
ErrCode enQueue(void* queue, void* unit);
ErrCode deQueue(void* queue, void* unit);
U32 getQueueSize(void* queue);
BOOL isQueueEmpty(void* queue);
initQueue
初始化队列:根据unitSize和maxUnitNum申请内存,并对内存进行初始化。
enQueue
入队:从队尾增加元素
dequeue
出队:从队头删除元素
getQueueSize
获取队列大小:返回队列中的元素数
isQueueEmpty
队列是否为空:true表示队列为空,false表示队列非空
API使用示例
我们以定时器Id的管理为例,了解一下无锁队列主要API的使用。
初始化:主线程调用
ErrCode ret = initQueue(&queue, sizeof(U32), MAX_TMCB_NUM);
if (ret != ERR_TIMER_SUCC)
{
ERR_LOG("lock free init_queue error: %u\n", ret);
return;
}
for (U32 timerId = 0; timerId < MAX_TMCB_NUM; timerId++)
{
ret = enQueue(queue, &timerId);
if (ret != ERR_TIMER_SUCC)
{
ERR_LOG("lock free enqueue error: %u\n", ret);
return;
}
}
timerId分配:多线程调用
U32 timerId;
ErrCode ret = deQueue(queue, &timerId);
if (ret != ERR_TIMER_SUCC)
{
ERR_LOG("deQueue failed!");
return INVALID_TIMER_ID;
}
timerId回收:多线程调用
ErrCode ret = enQueue(queue, &timerId);
if (ret != ERR_TIMER_SUCC)
{
ERR_LOG("enQueue failed!");
}
核心实现
显然,队列操作的核心实现为入队和出队操作。
入队
入队的关键点有下面几点:
- 通过写次数确保队列元素数小于最大元素数
- 获取next tail的位置
- 将新元素插入到队尾
- 尾指针偏移
- 读次数加1
最大元素数校验
do
在入队操作开始,就判断写次数是否超过队列元素的最大值,如果已超过,则反馈队列已满的错误码,否则通过CAS操作将写次数加1。如果CAS操作失败,说明有多个线程同时判断了写次数都小于队列最大元素数,那么只有一个线程CAS操作成功,其他线程则需要重新做循环操作。
获取next tail的位置
do
{
do
{
nextTail = queueHead->nextTail;
} while (!__sync_bool_compare_and_swap(&queueHead->nextTail, nextTail, (nextTail + 1) % (queueHead->maxUnitNum + 1)));
unitHead = UNIT_HEAD(queue, nextTail);
} while (unitHead->hasUsed);
当前next tail的位置就是即将入队的元素的目标位置,并通过CAS操作更新队列头中nextTail的值。如果CAS操作失败,则说明其他线程也正在进行入队操作,并比本线程快,则需要进行重新尝试,从而更新next tail的值,确保该入队元素的位置正确。
然而事情并没有这么简单!
由于多线程的抢占,导致队列并不是按下标大小依次链接起来的,所以要判断一下next tail的位置是否正被占用。如果next tail的位置正被占用,则需要重新竞争next tail,直到next tail的位置是空闲的。
将新元素插入到队尾
初始化新元素:
unitHead->next = LIST_END;
memcpy(UNIT_DATA(queue, nextTail), unit, queueHead->unitSize);
插入到队尾:
do
{
listTail = queueHead->listTail;
oldListTail = listTail;
unitHead = UNIT_HEAD(queue, listTail);
if ((++tryTimes) >= 3)
{
while (unitHead->next != LIST_END)
{
listTail = unitHead->next;
unitHead = UNIT_HEAD(queue, listTail);
}
}
} while (!__sync_bool_compare_and_swap(&unitHead->next, LIST_END, nextTail));
通过CAS操作判断当前指针是否到达队尾,如果到达队尾,则将新元素连接到队尾元素之后(next),否则进行追赶。
在这里,我们做了优化,当CAS操作连续失败3次后,那么就直接通过next的递推找到队尾,这样比CAS操作的效率高很多。我们在测试多线程的队列操作时,出现过一个线程插入到tail为400多的时候,已有线程插入到tail为1000多的场景。
尾指针偏移
do
{
__sync_val_compare_and_swap(&queueHead->listTail, oldListTail, nextTail);
oldListTail = queueHead->listTail;
unitHead = UNIT_HEAD(queue, oldListTail);
nextTail = unitHead->next;
} while (nextTail != LIST_END);
在多线程场景下,队尾指针是动态变化的,当前尾可能不是新尾了,所以通过CAS操作更新队尾。当CAS操作失败时,说明队尾已经被其他线程更新,此时不能将nextTail赋值给队尾。
读次数加1
__sync_fetch_and_add(&queueHead->readCount, 1);
写次数加1是为了保证队列元素的数不能超过最大元素数,而读次数加1是为了确保不能从空队列出队。
出队
出队的关键点有下面几点:
- 通过读次数确保不能从空队列出队
- 头指针偏移
- dummy头指针
- 写次数减1
空队列校验
do
{
readCount = queueHead->readCount;
if (readCount == 0) return ERR_QUEUE_HAS_EMPTY;
} while (!__sync_bool_compare_and_swap(&queueHead->readCount, readCount, readCount - 1));
读次数为0,说明队列为空,否则通过CAS操作将读次数减1。如果CAS操作失败,说明其他线程已更新读次数成功,必须重试,直到成功。
头指针偏移
U32 readCount;
do
{
listHead = queueHead->listHead;
unitHead = UNIT_HEAD(queue, listHead);
} while (!__sync_bool_compare_and_swap(&queueHead->listHead, listHead, unitHead->next));
如果CAS操作失败,说明队头指针已经在其他线程的操作下进行了偏移,所以要重试,直到成功。
dummy头指针
memcpy(unit, UNIT_DATA(queue, unitHead->next), queueHead->unitSize);
我们可以看出,头元素为head->next,这就是说队列的第一个元素都是基于head->next而不是head。
这样设计是有原因的。
考虑一个边界条件:在队列只有一个元素条件下,如果head和tail指针指向同一个结点,这样入队操作和出队操作本身就需要互斥了。
通过引入一个dummy头指针来解决这个问题,如下图所示。
写次数减1
__sync_fetch_and_sub(&queueHead->writeCount, 1);
出队操作结束前,要将写次数减1,以便入队操作能成功。
无锁队列的ABA问题分析
我们再看一下头指针偏移的代码:
do
{
listHead = queueHead->listHead;
unitHead = UNIT_HEAD(queue, listHead);
} while (!__sync_bool_compare_and_swap(&queueHead->listHead, listHead, unitHead->next));
假设队列中只有两个元素A和B,那么
- 线程T1 执行出队操作,当执行到while循环时被线程T2 抢占,线程T1 等待
- 线程T2 成功执行了两次出队操作,并free了A和B结点的内存
- 线程T3 进行入队操作,malloc的内存地址和A相同,入队操作成功
- 线程T1 重新获得CPU,执行while中的CAS操作,发现A的地址没有变,其实内容已经今非昔比了,而线程T1 并不知情,继续更新头指针,使得头指针指向B所在结点,但是B所在结点的内存早已释放。
这就是无锁队列的ABA问题。
为解决ABA问题,我们可以采用具有原子性的内存引用计数等办法,而利用循环数组实现无锁队列也可以解决ABA问题,因为循环数组不涉及内存的动态分配和回收,在线程T2 成功执行两次出队操作后,队列的head指针已经变化(指向到了下标head+2),线程T3 进行入队操作不会改变队列的head指针,当线程T1 重新获得CPU进行CAS操作时,会因失败重新do while,这时临时head更新成了队列head,所以规避了ABA问题。
小结
我们在上一篇简要介绍了无锁编程基础,这一篇通过深度剖析无锁队列,对CAS原子操作的使用有了感性的认识,我们了解了无锁队列的API和核心实现,可以看出,要实现一个没有问题且高效的无锁队列是非常困难的,最后对无锁队列的ABA问题进行了实例化,笔者在无锁队列的编程实践中通过循环数组的方法规避了ABA问题。
你是否已感到有些烧脑?
我们将在下一篇文章中深度剖析无锁双向链表,是否会烧的更厉害?让我们拭目以待。
网友评论