美文网首页JUC并发相关
29. 并发终结之ConcurrentLinkedQueue

29. 并发终结之ConcurrentLinkedQueue

作者: 涣涣虚心0215 | 来源:发表于2020-10-02 08:15 被阅读0次

    ConcurrentLinkedQueue:非阻塞线程安全队列,适用于高并发场景下的队列,性能高于BlockingQueue,基于链表的无界线程安全队列

    内部数据结构

    因为内部不用锁,所以都是通过CAS+volatile来保证原子性以及可见性,所以Node的成员变量都是volatile的,而且添加更新都是通过UNSAFE操作。

    private static class Node<E> {
        volatile E item;
        volatile ConcurrentLinkedQueue.Node<E> next;
    
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }
        
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }
    
        void lazySetNext(ConcurrentLinkedQueue.Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }
    
        boolean casNext(ConcurrentLinkedQueue.Node<E> cmp, ConcurrentLinkedQueue.Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
    
        // Unsafe mechanics
    
        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;
    
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = ConcurrentLinkedQueue.Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
    //初始化的时候head=tail=new Node(null)
    private transient volatile ConcurrentLinkedQueue.Node<E> head;
    private transient volatile ConcurrentLinkedQueue.Node<E> tail;
    
    offer

    因为没有锁,所以在高并发情况下,什么情况都会发生;
    初始化之后head = tail = new Node(null),所以开始head和tail都指向一个新的node节点,但是tail节点并不是真正的尾节点;因为多线程竞争的时候,一个线程更新当前节点为尾结点,但是另一个线程会更新另一个节点为尾结点,这时前面的线程的尾结点就会暂停,去找到正确的尾节点更新。

    public boolean offer(E e) {
        checkNotNull(e);
        final ConcurrentLinkedQueue.Node<E> newNode = new ConcurrentLinkedQueue.Node<E>(e);
        //拿到tail节点,但是这个tail节点不一定是尾节点
        for (ConcurrentLinkedQueue.Node<E> t = tail, p = t;;) {
            ConcurrentLinkedQueue.Node<E> q = p.next;
            //如果q==null表示p是真正的尾节点,那么就将新节点CAS更新为p的next节点;
            //如果casNext成功,则返回true;如果不成功表示他的next节点已经被更新了
            if (q == null) {
                // p is last node
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    // 如果p != t,则将入队节点设置成tail节点,
                    //更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            // 多线程操作时候,由于poll时候会把旧的head变为自引用,然后将head的next设置为新的head
            // 所以这里需要重新找新的head,因为新的head后面的节点才是激活的节点
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // 寻找尾节点
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }
    

    为什么不是每次都更新tail,因为每次都更新,那么高并发情况下,CAS更新tail会非常多,性能CPU都会受到影响;那么现在不及时更新tail,带来的唯一不好就是需要循环去找到真正的tail节点,从本质上来看它通过增加对volatile变量的读操作来减少了对volatile变量的写操作,而对volatile变量的写操作开销要远远大于读操作,所以入队效率会有所提升。

    poll

    正常情况下,出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。
    并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点。采用这种方式也是为了减少使用CAS更新head节点的消耗,从而提高出队效率

    public E poll() {
        restartFromHead:
        for (;;) {
            for (ConcurrentLinkedQueue.Node<E> h = head, p = h, q;;) {
                E item = p.item;
                //拿到head,如果head的item不为null,就直接CAS更新这个item为null,并返回item
                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    //这里也不是每次都会更新head
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                // 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外一个线程修改了。
                // 那么获取p节点的下一个节点,如果p节点的下一节点为null,则表明队列已经空了
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                // p == q,则使用新的head重新开始
                else if (p == q)
                    continue restartFromHead;
                // 如果(q = p.next)下一个元素不为空,则将头节点的下一个节点设置成头节点
                else
                    p = q;
            }
        }
    }
    

    按正常想法,head应该始终为Node(null)的,不应该有值,但是ConcurrentLinkedQueue不是这样设计的,head可能会变成实际节点上。
    可以看看peek方法,会移动head到实际节点上

    peek
    public E peek() {
        restartFromHead:
        for (;;) {
            //循环拿到head
            for (ConcurrentLinkedQueue.Node<E> h = head, p = h, q;;) {
                E item = p.item;
                //如果head的item不为空,则更新head到这个节点,返回item
                if (item != null || (q = p.next) == null) {
                    updateHead(h, p);
                    return item;
                }
                //如果p==q,意味着p.next == p,重新拿head来进行
                else if (p == q)
                    continue restartFromHead;
              //将p向后移动一位
                else
                    p = q;
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:29. 并发终结之ConcurrentLinkedQueue

        本文链接:https://www.haomeiwen.com/subject/gmowhktx.html