美文网首页
ConcurrentLinkedQueue

ConcurrentLinkedQueue

作者: 3ngu | 来源:发表于2020-04-30 20:17 被阅读0次

概述

实现一个线程安全的队列有两种方式:

(1)使用阻塞算法:用一个锁 (入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。
(2)非阻塞算法:循环使用CAS。 ConcurrentLinkedQueue采用的是"wait-free"算法(即CAS算法)来实现。

ConcurrentLinkedQueue采用的是"wait-free"算法(即CAS算法)来实现。

ConcurrentLinkedQueue的相关类

2.png

默认情况下head节点存储元素为空,tail节点等于head节点。

出队列

从队列里返回一个节点,并清除该节点对元素的引用。
下图是出队列的示意图,并不是没每次出队都更新head节点。当head节点有元素时,直接弹出head节点里的元素,此时不会更新head节点。只要当head节点没有元素时,出队操作才会更新节点。


1.png

出队方法poll源码

   public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
            /*
                初始时的head是一个空节点,因此head不一定是第一个元素。
            */
                E item = p.item;
                if (item != null && p.casItem(item, null)) {
            /*
                当head是一个空节点,到了第二次循环就会满足此p != h,
                此时要弹出的节点为q,因此判断p是否有下一个节点q:
                    (1)如有则将q作为head,下一次执行poll方法时head指向的就是第一个元素,
                因此下一次只是将第一个元素的值设置为null,而不会更新head。
                    (2)若没有则将p设置为head,下一次执行poll方法时head.next是第一个元素,
                因此下一次的poll会更新head。
            */
                    if (p != h) 
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                //哨兵节点
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

//将节点 p设置为新的节点(这是原子操作),
  //之后将原节点的next指向自己, 直接变成一个哨兵节点(为queue节点删除及garbage做准备)
  final void updateHead(Node<E> h, Node<E> p) { 
       if (h != p && casHead(h, p)) 
           h.lazySetNext(h); 
    }

入队列

入队过程分为两步:

(1)将入队节点设置为当前队列尾节点的下一节点;

(2)更新tail节点,若tail节点的next节点不为空,则将入队节点设置tail节点,否则将入队节点设置为tail的next节点。

入队示意图

3.png

定位尾节点
tail节点并不总是尾节点,所以每次入队都必须先通过tail节点来找到尾节点:通过判断tail节点是否有next节点,有则表示next节点可能是尾节点。

在获取tail节点的next节点时需要注意 p节点可能等于p.next的情况,这种情况发生在队列刚初始化,p节点和p.next节点此时都为空。

final Node<E> succ(Node<E> p) {
    Node<E> next = p.getNext();
    return (p == next) head : next;
}

下面是入队的offer方法源码。

    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);
        //死循环,入队不成功反复入队.
        //p,t节点指向队列的尾节点,其中t表示循环前的tail节点。
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;    //将q设为p的下一节点
        //若尾节点p的next节点q为空,表明p是尾节点,
            if (q == null) {
        //通过casNext方法将入队节点newNode设置为p的next节点。
                if (p.casNext(null, newNode)) {
        //如果第一次循环进行到此步,p是等于t的,而当进行到第二次循环,则p != t,因此我们将
        //t设置为最新节点。此处允许设置失败。每两次更新1次
                    if (p != t) 
                        casTail(t, newNode);  
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
        /*
            在多线程环境下,进行入队的同时也可能在进行出队操作,当入队的时候节点p被出队了
            那么poll方法的updateHead方法会将head指向当前的q,p.next指向自己,
            即p == q(q.next)。
            这样就无法通过next取得后续节点,因此只能通过将p指向head,来从链表头部重新遍历。
            如果在执行这条语句的过程中,tail节点被其他线程修改,则进行一次打赌:使用新的tail
            作为链表尾部,这样避免重新查找tail的开销
            (t != (t = tail)) 语句表示先取出节点t,然后执行t = tail,然后判断t != tail
        */
            else if (p == q)
                p = (t != (t = tail)) ? t : head;
        //否则tail不是尾节点,那么将p指向最后一个节点。
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

tail的更新是有条件的:
(1)假设tail指向当前最后一个节点,添加节点后,tail不会更新,此时指向倒数第二个节点。等再次添加节点时,就会进入p!=t的条件,于是开始更新tail的位置;
(2)tail被其他线程更新了,此时的节点p指向的位置与tail指向的位置不同。

tail的极限位置
tail可能指向最后一个节点,倒数第二个节点或是倒数第三个节点(即在新加入入队节点时,还没更新tail,此时tail指向倒数第三个位置),在并发度大的情况下,还会出现tail指向倒数第4个节点的极限情况:

(1)tail指向倒数第二个节点,然后两个线程并发添加。假设当前队列为a->b,tail指向a
(2)一个线程x使用casNext方法成功往队列里添加它的入队节点c,此时队列为a-b->c,tail指向a。另一个线程y虽然没有成功添加,但是y又经过了一次循环后,此时y的循环中节点p指向的是队列里的c,成功判断完q == null后,y成功添加了它的入队节点d,此时x循环中的casTail方法才开始执行,这个瞬间tail指向的a是倒数第四个节点。

为什么会出现p == q?
假设在多线程环境下,当前的队列情况为:

1.png

线程A执行offer方法,在第一次循环里:

1.此时tail = node0,p = t = node0;
2.q = p.next,不满足 q == nul的条件;
3.到了else if(p == q)这一步时,线程A暂停;

另一个线程B执行poll操作,在第一次循环里:

1.head = node0,p = h = node0;
2.执行E item = p.item = null;
3.后续的三个判断都不满足,执行p = q = node1;

然后线程B进入第二次循环:

1.此时head = node0,h = node0,p = node1;
2.执行 E item = p.item;,item = node1;
3.满足iitem != null ,假设casItem操作成功;
4.p != h成立,执行 updateHead(h, ((q = p.next) != null) ? q : p),之后,q = node2
5.在updateHead方法里:满足h != p,如果casHead(h, p)操作成功(将head设置为node2)。
6.执行h.lazySetNext(h),此时h = node0、执行完方法后node0.next = node0
7.返回item

此时队列的情况如下 2.png

线程A的结果为:

head = node0
tail = node0
t = node0
p = node0
q = node0.next = node0

因此出现了p = q = node0,主要原因是updateHead方法的 h.lazySetNext(h)操作中,把旧的head.next设置为了自己。

参考资料

https://www.cnblogs.com/sunshine-2015/p/6067709.html
https://my.oschina.net/xpbob/blog/895679

相关文章

网友评论

      本文标题:ConcurrentLinkedQueue

      本文链接:https://www.haomeiwen.com/subject/yygbwhtx.html