单生产单消费
ring buffer
- 两个原子变量 head和tail
- 通过改变这两个原子变量,来实现入队和出队
多生产多消费
链表+CAS
生产消息
void Enqueue(x):
auto q = new record();
q->value = x;
q->next = null;
auto p = tail;
auto null_ptr = null;
do{
while (p->next != null)
p = next;
null_ptr = null;
} while(!p->next.compare_exchange_weak(null_ptr, q));
auto cur = p;
do{
p = cur;
} while(!tail.compare_exchange_weak(p, q);
- 通过cas修改tail指针
消费消息
record Dequeue(){
auto p = head;
do{
p = head;
if(p == null) {
return -1;
}
}while(!head.compare_exchange_weak(p, p.next);
return p->value;
}
- 使用cas修改head指针
使用链表,会使得访问消息队列cache不亲和
disruptor(ring buffer+CAS)
生产消息
lock-free-queue-disruptor-writelong tryReserveWrite(int n)
{
if (n < 1)
{
return -1;
}
long current;
long next;
do
{
current = cursor.get();
next = current + n;
if (!hasAvailableCapacity(gatingSequences, n, current))
{
return -1;
}
}
while (!cursor.compare_exchange_weak(current, next));
return next;
}
- 每个生产端,通过cas抢到需要写的空间,然后写入ring buffer
消费消息
lock-free-queue-disruptor-readlong tryReserveRead(int n)
{
if (n < 1)
{
return -1;
}
long current;
long next;
do
{
current = reader_cursor.get();
next = current + n;
if (!checkReadAvailable(gatingSequences, n, current))
{
return -1;
}
}
while (!reader_cursor.compare_exchange_weak(current, next));
return next;
}
- 多消费者,通过CAS reader的游标,来获取需要消费的消息
- 通过判断available buffer来判断元素是否已经写入完成
使用数组,会让消息队列cache更加亲和
为了避免伪共享,需要把每个原子变量都align cache line的大小
网友评论