概述
实现一个线程安全的队列有两种方式:
(1)使用阻塞算法:用一个锁 (入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。
(2)非阻塞算法:循环使用CAS。 ConcurrentLinkedQueue采用的是"wait-free"算法(即CAS算法)来实现。
ConcurrentLinkedQueue采用的是"wait-free"算法(即CAS算法)来实现。
ConcurrentLinkedQueue的相关类
默认情况下head节点存储元素为空,tail节点等于head节点。
出队列
从队列里返回一个节点,并清除该节点对元素的引用。
下图是出队列的示意图,并不是没每次出队都更新head节点。当head节点有元素时,直接弹出head节点里的元素,此时不会更新head节点。只要当head节点没有元素时,出队操作才会更新节点。
1.png
出队方法poll源码
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
/*
初始时的head是一个空节点,因此head不一定是第一个元素。
*/
E item = p.item;
if (item != null && p.casItem(item, null)) {
/*
当head是一个空节点,到了第二次循环就会满足此p != h,
此时要弹出的节点为q,因此判断p是否有下一个节点q:
(1)如有则将q作为head,下一次执行poll方法时head指向的就是第一个元素,
因此下一次只是将第一个元素的值设置为null,而不会更新head。
(2)若没有则将p设置为head,下一次执行poll方法时head.next是第一个元素,
因此下一次的poll会更新head。
*/
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
//哨兵节点
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
//将节点 p设置为新的节点(这是原子操作),
//之后将原节点的next指向自己, 直接变成一个哨兵节点(为queue节点删除及garbage做准备)
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
入队列
入队过程分为两步:
(1)将入队节点设置为当前队列尾节点的下一节点;
(2)更新tail节点,若tail节点的next节点不为空,则将入队节点设置tail节点,否则将入队节点设置为tail的next节点。
入队示意图
定位尾节点
tail节点并不总是尾节点,所以每次入队都必须先通过tail节点来找到尾节点:通过判断tail节点是否有next节点,有则表示next节点可能是尾节点。
在获取tail节点的next节点时需要注意 p节点可能等于p.next的情况,这种情况发生在队列刚初始化,p节点和p.next节点此时都为空。
final Node<E> succ(Node<E> p) {
Node<E> next = p.getNext();
return (p == next) head : next;
}
下面是入队的offer方法源码。
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
//死循环,入队不成功反复入队.
//p,t节点指向队列的尾节点,其中t表示循环前的tail节点。
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next; //将q设为p的下一节点
//若尾节点p的next节点q为空,表明p是尾节点,
if (q == null) {
//通过casNext方法将入队节点newNode设置为p的next节点。
if (p.casNext(null, newNode)) {
//如果第一次循环进行到此步,p是等于t的,而当进行到第二次循环,则p != t,因此我们将
//t设置为最新节点。此处允许设置失败。每两次更新1次
if (p != t)
casTail(t, newNode);
return true;
}
// Lost CAS race to another thread; re-read next
}
/*
在多线程环境下,进行入队的同时也可能在进行出队操作,当入队的时候节点p被出队了
那么poll方法的updateHead方法会将head指向当前的q,p.next指向自己,
即p == q(q.next)。
这样就无法通过next取得后续节点,因此只能通过将p指向head,来从链表头部重新遍历。
如果在执行这条语句的过程中,tail节点被其他线程修改,则进行一次打赌:使用新的tail
作为链表尾部,这样避免重新查找tail的开销
(t != (t = tail)) 语句表示先取出节点t,然后执行t = tail,然后判断t != tail
*/
else if (p == q)
p = (t != (t = tail)) ? t : head;
//否则tail不是尾节点,那么将p指向最后一个节点。
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
tail的更新是有条件的:
(1)假设tail指向当前最后一个节点,添加节点后,tail不会更新,此时指向倒数第二个节点。等再次添加节点时,就会进入p!=t的条件,于是开始更新tail的位置;
(2)tail被其他线程更新了,此时的节点p指向的位置与tail指向的位置不同。
tail的极限位置
tail可能指向最后一个节点,倒数第二个节点或是倒数第三个节点(即在新加入入队节点时,还没更新tail,此时tail指向倒数第三个位置),在并发度大的情况下,还会出现tail指向倒数第4个节点的极限情况:
(1)tail指向倒数第二个节点,然后两个线程并发添加。假设当前队列为a->b,tail指向a
(2)一个线程x使用casNext方法成功往队列里添加它的入队节点c,此时队列为a-b->c,tail指向a。另一个线程y虽然没有成功添加,但是y又经过了一次循环后,此时y的循环中节点p指向的是队列里的c,成功判断完q == null后,y成功添加了它的入队节点d,此时x循环中的casTail方法才开始执行,这个瞬间tail指向的a是倒数第四个节点。
为什么会出现p == q?
假设在多线程环境下,当前的队列情况为:
线程A执行offer方法,在第一次循环里:
1.此时tail = node0,p = t = node0;
2.q = p.next,不满足 q == nul的条件;
3.到了else if(p == q)这一步时,线程A暂停;
另一个线程B执行poll操作,在第一次循环里:
1.head = node0,p = h = node0;
2.执行E item = p.item = null;
3.后续的三个判断都不满足,执行p = q = node1;
然后线程B进入第二次循环:
此时队列的情况如下 2.png1.此时head = node0,h = node0,p = node1;
2.执行 E item = p.item;,item = node1;
3.满足iitem != null ,假设casItem操作成功;
4.p != h成立,执行 updateHead(h, ((q = p.next) != null) ? q : p),之后,q = node2
5.在updateHead方法里:满足h != p,如果casHead(h, p)操作成功(将head设置为node2)。
6.执行h.lazySetNext(h),此时h = node0、执行完方法后node0.next = node0
7.返回item
线程A的结果为:
head = node0
tail = node0
t = node0
p = node0
q = node0.next = node0
因此出现了p = q = node0,主要原因是updateHead方法的 h.lazySetNext(h)操作中,把旧的head.next设置为了自己。
参考资料
https://www.cnblogs.com/sunshine-2015/p/6067709.html
https://my.oschina.net/xpbob/blog/895679
网友评论