美文网首页JUC并发相关
29. 并发终结之ConcurrentLinkedQueue

29. 并发终结之ConcurrentLinkedQueue

作者: 涣涣虚心0215 | 来源:发表于2020-10-02 08:15 被阅读0次

ConcurrentLinkedQueue:非阻塞线程安全队列,适用于高并发场景下的队列,性能高于BlockingQueue,基于链表的无界线程安全队列

内部数据结构

因为内部不用锁,所以都是通过CAS+volatile来保证原子性以及可见性,所以Node的成员变量都是volatile的,而且添加更新都是通过UNSAFE操作。

private static class Node<E> {
    volatile E item;
    volatile ConcurrentLinkedQueue.Node<E> next;

    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }
    
    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(ConcurrentLinkedQueue.Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    boolean casNext(ConcurrentLinkedQueue.Node<E> cmp, ConcurrentLinkedQueue.Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // Unsafe mechanics

    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentLinkedQueue.Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
//初始化的时候head=tail=new Node(null)
private transient volatile ConcurrentLinkedQueue.Node<E> head;
private transient volatile ConcurrentLinkedQueue.Node<E> tail;
offer

因为没有锁,所以在高并发情况下,什么情况都会发生;
初始化之后head = tail = new Node(null),所以开始head和tail都指向一个新的node节点,但是tail节点并不是真正的尾节点;因为多线程竞争的时候,一个线程更新当前节点为尾结点,但是另一个线程会更新另一个节点为尾结点,这时前面的线程的尾结点就会暂停,去找到正确的尾节点更新。

public boolean offer(E e) {
    checkNotNull(e);
    final ConcurrentLinkedQueue.Node<E> newNode = new ConcurrentLinkedQueue.Node<E>(e);
    //拿到tail节点,但是这个tail节点不一定是尾节点
    for (ConcurrentLinkedQueue.Node<E> t = tail, p = t;;) {
        ConcurrentLinkedQueue.Node<E> q = p.next;
        //如果q==null表示p是真正的尾节点,那么就将新节点CAS更新为p的next节点;
        //如果casNext成功,则返回true;如果不成功表示他的next节点已经被更新了
        if (q == null) {
            // p is last node
            if (p.casNext(null, newNode)) {
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                // 如果p != t,则将入队节点设置成tail节点,
                //更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        // 多线程操作时候,由于poll时候会把旧的head变为自引用,然后将head的next设置为新的head
        // 所以这里需要重新找新的head,因为新的head后面的节点才是激活的节点
        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;
        else
            // 寻找尾节点
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

为什么不是每次都更新tail,因为每次都更新,那么高并发情况下,CAS更新tail会非常多,性能CPU都会受到影响;那么现在不及时更新tail,带来的唯一不好就是需要循环去找到真正的tail节点,从本质上来看它通过增加对volatile变量的读操作来减少了对volatile变量的写操作,而对volatile变量的写操作开销要远远大于读操作,所以入队效率会有所提升。

poll

正常情况下,出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。
并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点。采用这种方式也是为了减少使用CAS更新head节点的消耗,从而提高出队效率

public E poll() {
    restartFromHead:
    for (;;) {
        for (ConcurrentLinkedQueue.Node<E> h = head, p = h, q;;) {
            E item = p.item;
            //拿到head,如果head的item不为null,就直接CAS更新这个item为null,并返回item
            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                //这里也不是每次都会更新head
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            // 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外一个线程修改了。
            // 那么获取p节点的下一个节点,如果p节点的下一节点为null,则表明队列已经空了
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            // p == q,则使用新的head重新开始
            else if (p == q)
                continue restartFromHead;
            // 如果(q = p.next)下一个元素不为空,则将头节点的下一个节点设置成头节点
            else
                p = q;
        }
    }
}

按正常想法,head应该始终为Node(null)的,不应该有值,但是ConcurrentLinkedQueue不是这样设计的,head可能会变成实际节点上。
可以看看peek方法,会移动head到实际节点上

peek
public E peek() {
    restartFromHead:
    for (;;) {
        //循环拿到head
        for (ConcurrentLinkedQueue.Node<E> h = head, p = h, q;;) {
            E item = p.item;
            //如果head的item不为空,则更新head到这个节点,返回item
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);
                return item;
            }
            //如果p==q,意味着p.next == p,重新拿head来进行
            else if (p == q)
                continue restartFromHead;
          //将p向后移动一位
            else
                p = q;
        }
    }
}

相关文章

网友评论

    本文标题:29. 并发终结之ConcurrentLinkedQueue

    本文链接:https://www.haomeiwen.com/subject/gmowhktx.html