美文网首页
ConcurrentLinkedQueue

ConcurrentLinkedQueue

作者: 3ngu | 来源:发表于2020-04-30 20:17 被阅读0次

    概述

    实现一个线程安全的队列有两种方式:

    (1)使用阻塞算法:用一个锁 (入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。
    (2)非阻塞算法:循环使用CAS。 ConcurrentLinkedQueue采用的是"wait-free"算法(即CAS算法)来实现。

    ConcurrentLinkedQueue采用的是"wait-free"算法(即CAS算法)来实现。

    ConcurrentLinkedQueue的相关类

    2.png

    默认情况下head节点存储元素为空,tail节点等于head节点。

    出队列

    从队列里返回一个节点,并清除该节点对元素的引用。
    下图是出队列的示意图,并不是没每次出队都更新head节点。当head节点有元素时,直接弹出head节点里的元素,此时不会更新head节点。只要当head节点没有元素时,出队操作才会更新节点。


    1.png

    出队方法poll源码

       public E poll() {
            restartFromHead:
            for (;;) {
                for (Node<E> h = head, p = h, q;;) {
                /*
                    初始时的head是一个空节点,因此head不一定是第一个元素。
                */
                    E item = p.item;
                    if (item != null && p.casItem(item, null)) {
                /*
                    当head是一个空节点,到了第二次循环就会满足此p != h,
                    此时要弹出的节点为q,因此判断p是否有下一个节点q:
                        (1)如有则将q作为head,下一次执行poll方法时head指向的就是第一个元素,
                    因此下一次只是将第一个元素的值设置为null,而不会更新head。
                        (2)若没有则将p设置为head,下一次执行poll方法时head.next是第一个元素,
                    因此下一次的poll会更新head。
                */
                        if (p != h) 
                            updateHead(h, ((q = p.next) != null) ? q : p);
                        return item;
                    }
                    else if ((q = p.next) == null) {
                        updateHead(h, p);
                        return null;
                    }
                    //哨兵节点
                    else if (p == q)
                        continue restartFromHead;
                    else
                        p = q;
                }
            }
        }
    
    //将节点 p设置为新的节点(这是原子操作),
      //之后将原节点的next指向自己, 直接变成一个哨兵节点(为queue节点删除及garbage做准备)
      final void updateHead(Node<E> h, Node<E> p) { 
           if (h != p && casHead(h, p)) 
               h.lazySetNext(h); 
        }
    

    入队列

    入队过程分为两步:

    (1)将入队节点设置为当前队列尾节点的下一节点;

    (2)更新tail节点,若tail节点的next节点不为空,则将入队节点设置tail节点,否则将入队节点设置为tail的next节点。

    入队示意图

    3.png

    定位尾节点
    tail节点并不总是尾节点,所以每次入队都必须先通过tail节点来找到尾节点:通过判断tail节点是否有next节点,有则表示next节点可能是尾节点。

    在获取tail节点的next节点时需要注意 p节点可能等于p.next的情况,这种情况发生在队列刚初始化,p节点和p.next节点此时都为空。

    final Node<E> succ(Node<E> p) {
        Node<E> next = p.getNext();
        return (p == next) head : next;
    }
    

    下面是入队的offer方法源码。

        public boolean offer(E e) {
            checkNotNull(e);
            final Node<E> newNode = new Node<E>(e);
            //死循环,入队不成功反复入队.
            //p,t节点指向队列的尾节点,其中t表示循环前的tail节点。
            for (Node<E> t = tail, p = t;;) {
                Node<E> q = p.next;    //将q设为p的下一节点
            //若尾节点p的next节点q为空,表明p是尾节点,
                if (q == null) {
            //通过casNext方法将入队节点newNode设置为p的next节点。
                    if (p.casNext(null, newNode)) {
            //如果第一次循环进行到此步,p是等于t的,而当进行到第二次循环,则p != t,因此我们将
            //t设置为最新节点。此处允许设置失败。每两次更新1次
                        if (p != t) 
                            casTail(t, newNode);  
                        return true;
                    }
                    // Lost CAS race to another thread; re-read next
                }
            /*
                在多线程环境下,进行入队的同时也可能在进行出队操作,当入队的时候节点p被出队了
                那么poll方法的updateHead方法会将head指向当前的q,p.next指向自己,
                即p == q(q.next)。
                这样就无法通过next取得后续节点,因此只能通过将p指向head,来从链表头部重新遍历。
                如果在执行这条语句的过程中,tail节点被其他线程修改,则进行一次打赌:使用新的tail
                作为链表尾部,这样避免重新查找tail的开销
                (t != (t = tail)) 语句表示先取出节点t,然后执行t = tail,然后判断t != tail
            */
                else if (p == q)
                    p = (t != (t = tail)) ? t : head;
            //否则tail不是尾节点,那么将p指向最后一个节点。
                else
                    // Check for tail updates after two hops.
                    p = (p != t && t != (t = tail)) ? t : q;
            }
        }
    

    tail的更新是有条件的:
    (1)假设tail指向当前最后一个节点,添加节点后,tail不会更新,此时指向倒数第二个节点。等再次添加节点时,就会进入p!=t的条件,于是开始更新tail的位置;
    (2)tail被其他线程更新了,此时的节点p指向的位置与tail指向的位置不同。

    tail的极限位置
    tail可能指向最后一个节点,倒数第二个节点或是倒数第三个节点(即在新加入入队节点时,还没更新tail,此时tail指向倒数第三个位置),在并发度大的情况下,还会出现tail指向倒数第4个节点的极限情况:

    (1)tail指向倒数第二个节点,然后两个线程并发添加。假设当前队列为a->b,tail指向a
    (2)一个线程x使用casNext方法成功往队列里添加它的入队节点c,此时队列为a-b->c,tail指向a。另一个线程y虽然没有成功添加,但是y又经过了一次循环后,此时y的循环中节点p指向的是队列里的c,成功判断完q == null后,y成功添加了它的入队节点d,此时x循环中的casTail方法才开始执行,这个瞬间tail指向的a是倒数第四个节点。

    为什么会出现p == q?
    假设在多线程环境下,当前的队列情况为:

    1.png

    线程A执行offer方法,在第一次循环里:

    1.此时tail = node0,p = t = node0;
    2.q = p.next,不满足 q == nul的条件;
    3.到了else if(p == q)这一步时,线程A暂停;

    另一个线程B执行poll操作,在第一次循环里:

    1.head = node0,p = h = node0;
    2.执行E item = p.item = null;
    3.后续的三个判断都不满足,执行p = q = node1;

    然后线程B进入第二次循环:

    1.此时head = node0,h = node0,p = node1;
    2.执行 E item = p.item;,item = node1;
    3.满足iitem != null ,假设casItem操作成功;
    4.p != h成立,执行 updateHead(h, ((q = p.next) != null) ? q : p),之后,q = node2
    5.在updateHead方法里:满足h != p,如果casHead(h, p)操作成功(将head设置为node2)。
    6.执行h.lazySetNext(h),此时h = node0、执行完方法后node0.next = node0
    7.返回item

    此时队列的情况如下 2.png

    线程A的结果为:

    head = node0
    tail = node0
    t = node0
    p = node0
    q = node0.next = node0

    因此出现了p = q = node0,主要原因是updateHead方法的 h.lazySetNext(h)操作中,把旧的head.next设置为了自己。

    参考资料

    https://www.cnblogs.com/sunshine-2015/p/6067709.html
    https://my.oschina.net/xpbob/blog/895679

    相关文章

      网友评论

          本文标题:ConcurrentLinkedQueue

          本文链接:https://www.haomeiwen.com/subject/yygbwhtx.html