ConcurrentLinkedQueue, LinkedBlockingQueue源码阅读
concurrentLinkedQueue 和 LinkedBlockQueue不同,前者是lock-free算法的可被并发读写的队列,后者是阻塞的并发读写的队列,性能上来说,必然是前者优于后者。但是他们又有区别,后者可以当作缓冲的消息代理使用,通知插入或者读取的线程可以进行操作,前者不行。
ConcurrentLinkedQueue <= Queue
LinkedBlockingQueue <= BlockingQueue <= Queue
先来看LinkedBlockingQueue
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list. * Invariant: head.item == null */transient Node<E> head;
/**
* Tail of linked list. * Invariant: last.next == null */private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
相比较于ConcurrentLinkedQueue,它比较简单,用了两个锁,以及关联在锁上的条件队列, 还有一个原子变量 AtomicInteger。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1; // set count = -1,是为了防御,当修改list操作发生错误的时候,count=-1可以指示错误
Node<E> node = new Node<E>(e); // 1. 新建node, 为什么node 以及node的域不是 volatile呢?以及insert的修改怎么能保证立刻可见呢?
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await(); // 2. 如果队列达到指定容量了,这个线程操作在notFull条件队列上的等待着
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity) // 3. 还有空的位置
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0) // 4. 为空,是怎么走到这一步的?
signalNotEmpty();
}
- node以及node域之所以可以不是volatile,是因为
ReentrantLock
和AtomicInteger
,首先lock的实现依赖于AQS
,AQS
的实现依赖于一个volatile
的state
域,加锁和释放锁,会对这个域进行读写会禁止某些重排序操作,并且将新的值写回主存,之后获取这个put锁的生产者线程将受到值变更的消息,重新从主存中读取;而消费者线程也会读取原子变量,而感知到自己需要的值的变化。 - 因为put是互斥的操作,只会有一个put线程能走到这里,然后如果容量满了,就等在条件上,直到被消费者线程唤醒,或者被其他生产者唤醒? c 是put之前的容量值, c+1是现在的,如果现在的容量之小于,说明还有空的,并且signal一下,防止有阻塞的生产者线程。
- 如2中陈述,如果put以后还有空位,继续唤醒其他的。()
- 在put完之后,检查c的值,如果是0, 即 count.getAndIncrement的之前的值是0,说明很可能有消费者已经阻塞,在
notEmpty
条件队列上等待着消费呢。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) { // 1. 为空,等
notEmpty.await();
}
x = dequeue(); // 4. 具体做了些什么
c = count.getAndDecrement();
if (c > 1) // 2. 之前不止有一个,有多个
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity) // 3. 怎么会走到这里
signalNotFull();
return x;
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null; Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
- 没啥好说的,如果没有元素,只能等待
- 当下还有元素,如果有正在阻塞的消费者,就让它继续执行
- c==capacity,是在消费之前,元素个数满了,很可能有正在阻塞的生产者。
- 将head引用向后移动,并且减少一些引用赋值,加快与gc的相遇
在put与take中,为什么有3和2的情况?即为什么会有其他同样类型的线程需要同样的线程来唤醒,而不是由对立线程唤醒?
答:
场景举例:
a: capacity=10, 现在有5个生产者阻塞,在等待notFull信号,n个消费者过来消费了当前的n个元素,然后只有一个会调用signalNotFull, 会挑选最先阻塞的那个put线程,让他离开条件等待队列,进入同步队列,这个put线程会立刻获取到lock(如果没有新的来竞争), 但还是n-1个空缺,所以需要让生产者自己来唤醒自己的同类。反过来也是同理。
这样做的几个特点和好处:
- 区别于信号量控制的生产者和消费者模式,固定工作区大小的生产者和消费者,由对方来唤起,且每增加或者消费,就发信号,这里之所以不这么做是因为,signal是没有计数状态的,而信号量是可以有计数状态的。
- 不signalAll, 还可以保证同步队列中的顺序和wait条件队列中的顺序一致(如果在第一个唤醒的生产者lock期间,没有其他新的生产者过来lock,因为如果有的话,他会先一步wait条件队列中的其他线程排在了同步队列里,不过这并不是问题),避免了争夺锁,失败重新进入队列
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
再回到可以指定等待时间的其他方法
nanos = notEmpty.awaitNanos(nanos); // 不过是awaiNanos,超时后,返回0或者负数,如果没超时,返回剩余的时间
删除操作会锁柱Put和Take,然后遍历,找到node,并且将其从链表中去掉,再检查是否有Put被阻塞,有则将其唤醒。
另外不能插入null, 因为poll方法在列表为空的时候返回就是null, 如果能插入null, 就无法判断null是插入的, 还是代表列表为空的情况
再看ConcurrentLinkedQueue
它的实现中没有用到同步器,主要依靠CAS实现,是一种无锁算法(Lock-Free),不会阻塞线程,但其实现也是最复杂的。
首先它的实现是参考的:
Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms by Maged M. Michael and Michael L. Scott.
实际上LinkedBlockingQueue
也是参考的这篇论文。
另外在这个知乎回答里:求推荐Lock-Free 算法相关论文? - 知乎 (zhihu.com)
有关于lock-free算法论文的推荐。

每个元素都会被包装成Node
private static class Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item) { UNSAFE.putObject( this, itemOffset, item); }
boolean castItem(E cmp, E val) { return UNSAFE.compareAndSwapObjectu( this, itemOffset, cmp, val); }
void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); }
boolean casetNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); }
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
关键的属性:
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
加入方法
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) { // 1. 循环,直至成功
Node<E> q = p.next;
if (q == null) { // 2. 说明q是尾后节点, p是尾节点
// p is last node
if (p.casNext(null, newNode)) { // 3. cas 设置
if (p != t) // hop two nodes at a time // 4. 一次跳两个来设置尾节点
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
} else if (p == q) // 5. 这个情况是怎么来的?
p = (t != (t = tail)) ? t : head;
else // 6. 这个情况又是怎么来的?
p = (p != t && t != (t = tail)) ? t : q;
}
}
- 循环,直到cas 插入成功才退出,循环每次都会因为 5, 6而更新 p
- 假设现在tail就是尾节点, 插入一个值。q == null , 说明p是尾节点,如果在这个时候,没有竞争,则cas操作插入一定没问题。如果有竞争,则cas操作会失败,此后p不再是最后的节点,是倒数第n个节点,则q 也不会再是 null。 随后可能是5, 也可能是6。 假设tail不是尾节点
- cas操作, 原子更新
- 根据代码注释: 属于一种优化,减少cas的次数
- 走到5是因为, 对单个元素的list进行出队操作,而造成的一种特殊情况,先看出队操作。
- p = (p != t && t != (t = tail)) ? t : q => p首先沿着链往下,在并发插入的过程中,寻找这最后的尾节点
a.(p != t && t != (t = tail)) == true
p 不是t ,且tail在并发插入的过程中,变化了,t是执行这条指令时最新的tail,则设置p为t ,为tail
b.(p != t && t != (t = tail)) == false
b1:p ==t
可以理解为a之后的cas插入又失败了, 因为还有其他线程在并发插入,则把p 设置为自己的后继,往下走 b2: b1之后, p != t, 但发生了 t == (t = tail), 即理解为并发插入停下了, p != t, 其实隐含着,p在t的后面的位置,所以p变成自己的后继,往下走
提取方法
public E poll() {
restartFromHead: // 1. label
for (;;) { // 死循环
for (Node<E> h = head, p = h, q;;) {
E item = p.item; // 2. 2->3可能发生变化
if (item != null && p.casItem(item, null)) { // 3.
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time // 4
updateHead(h, ((q = p.next) != null) ? q : p); // 5
return item;
} else if ((q = p.next) == null) { //6.
updateHead(h, p);
return null;
} else if (p == q) // 7.
continue restartFromHead;
else p = q; // 8.
}
}
}
- 首当其冲的是:
restartFromHead
标签 - 因为提取操作只会从头节点的下个元素开始,如果没有别提取:
a.item == null
, 走到8,p是第一个元素, 如果3成功,则提取完成, 走到4,修改head.
如果被提取,
a. 或者:当前的p其实可能指向旧的head(也可能是新的,如果还没updateHead),head的item == null
c. 或者 :当前的pp.casItem(item, null) 失败
还是会走到8, 直到3处成功 - cas
4.5 更新head节点,如果被弹出的节点,有后继节点,取后继节点, 没有后继节点,取被弹出去的node(反正item已经为null了), 作为 head。并且让原来的head的next指向自己。这个就造成了单个节点的自环,且由于从一开始head和tail是同一个元素,且tail是慢更新的,很可能tail指向这个旧的单个节点的自环,所以在上面的插入操作中的5,p == q
所以,要从head从头开始。 - 这个情况是 有一个线程要获取元素的时候,刚进入循环,读取了head, 其他一个线程迅速完成了其他元素的读取,导致这个敢读的head是个自环,所以需要从新开始。
网友评论