美文网首页Java 并发编程
并发编程之 ConcurrentLinkedQueue 源码剖析

并发编程之 ConcurrentLinkedQueue 源码剖析

作者: 莫那一鲁道 | 来源:发表于2018-01-14 00:26 被阅读22次

    前言

    今天我们继续分析 java 并发包的源码,今天的主角是谁呢?ConcurrentLinkedQueue,上次我们分析了并发下 ArrayList 的替代 CopyOnWriteArrayList,这次分析则是并发下 LinkedArrayList 的替代 ConcurrentLinkedQueue, 也就是并发链表。

    Demo

    Demo

    该类继承结构如下:

    继承图

    该类是 Collection 框架下的实现。也就是Java 类库提供的数据结构。

    add 方法将指定元素插入此队列的尾部。
    poll 方法 获取并移除此队列的头,如果此队列为空,则返回 null。
    peek 方法 获取但不移除此队列的头;如果此队列为空,则返回 null。

    那么我们就看看 doug lea 是如何实现并发安全的吧。在这之前,我们可以试想一下,实现并发安全无非两种方式,一种是锁,就像我们之前分析的容器,比如 concurrentHashMap,CopyOnWriteArrayList , LinkedBolckingQueue,还有一种是 CAS,在这些容器里也用到了。那么,如果是我们来实现这个队列,使用什么方式呢?有趣的问题。

    开始看源码吧。

    add 方法源码剖析

    实际上是调用 offer 方法,add 方法是 Collection 接口规定的容器方法,而 offer 方法是 Queue 接口的方法。

    add方法

    那我们就看看 offer 方法:

        public boolean offer(E e) {
            // 检查是否是null,如果是null ,抛出NullPointerException
            checkNotNull(e);
            // 创建一个node 对象,使用  CAS 创建对象
            final Node<E> newNode = new Node<E>(e);
            // 轮询链表节点,知道找到节点的 next 为null,才会进行赋值
            for (Node<E> t = tail, p = t;;) {
                Node<E> q = p.next;
                if (q == null) {
                    // 找到null值之后将刚刚创建的值通过CAS放入
                    if (p.casNext(null, newNode)) {
                        // 因为 p 遍历在轮询后会变化,因此需要判断,如果不相等,则使用CAS将新节点作为尾部节点。
                        if (p != t)
                            casTail(t, newNode);  // Failure is OK.
                         // 放入成功后返回 ture
                        return true;
                    }
                }
                // 轮询后  p 有可能等于 q,此时,就需要对 p 重新赋值。
                else if (p == q)
                    // 这里需要注意一下:判断t != t,是因为并发下可能 tail 被改了,如果被改了,则使用新的 t,否则从链表头重新轮询。
                    p = (t != (t = tail)) ? t : head;
                else
                    // 同样,当 t 不等于 p 时,说明 p 在上面被重新赋值了,并且 tail 也被别的线程改了,则使用新的 tail,否则循环检查p的下个节点
                    p = (p != t && t != (t = tail)) ? t : q;
            }
        }
    

    代码行数很少,楼主注释也写了,这里可以看到 doug lea 使用了 CAS 的方式防止并发错误,同时,也看得出对 tail 变量被修改的担忧,通过 t != t 的判断,来检查 tail 是否被其他线程修改了,而这个offer 操作,如果不成功,则永远不会返回,这个队列同时也是无界的。这点在使用的时候需要注意一下。

    那么 poll 方法如何实现呢?

    poll 方法源码剖析

        public E poll() {
            // 循环跳出标记,类似goto
            restartFromHead:
            // 死循环
            for (;;) {
                // 死循环,从 head 开始遍历
                for (Node<E> h = head, p = h, q;;) {
                    E item = p.item;
                    // 如果 head 不是null 且 将 head 的 item 属性设置为null成功,则返回并更新头节点
                    if (item != null && p.casItem(item, null)) {
                        // 如果 p != h 说明在 p 轮询时被修改了
                        if (p != h) 
                             // 如果p 的next 属性不是null ,将 p 作为头节点,而 q 将会消失
                            updateHead(h, ((q = p.next) != null) ? q : p);
                        return item;
                    }
                    // 如果 p(head) 的 next 节点 q 也是null,则表示没有数据了,返回null,则将 head 设置为null
                    // 注意:  updateHead 方法最后还会将原有的 head 作为自己 next 节点,方便offer 连接。
                    else if ((q = p.next) == null) {
                        updateHead(h, p);
                        return null;
                    }
                    // 如果 p == q,说明别的线程取出了 head,并将 head 更新了。就需要重新开始
                    else if (p == q)
                        // 从头开始重新循环
                        continue restartFromHead;
                   // 如果都不是,则将 h 的 next 赋给 h,并重新循环。
                    else
                        p = q;
                }
            }
        }
    

    上面楼主已经写了注释,但是有一个非常困扰哦楼主的疑点,就是 else if (p == q) 这行代码,楼主分析的没有问题,但是再楼主的单线程测试这段代码时,出现了诡异的情况,无法解释,因此, 楼主贴出测试用例,大家一起看看:

    测试代码:

    断点代码:

    注意,断点位置一定要和我的一致。会出现一些奇怪的效果。楼主无法解释,因为这个问题,楼主一直都不敢发这篇文章出来,但楼主觉得有必要说出这个问题,抛砖引玉。

    问题在于:单线程怎么会进入这段代码?按道理,但线程是不会出现这个情况的。

    总结

    这次的源码分析让楼主很痛苦,网上很多的文章也无法解释这是为什么,希望有高人能告诉楼主,到底是怎么回事?

    相关文章

      网友评论

        本文标题:并发编程之 ConcurrentLinkedQueue 源码剖析

        本文链接:https://www.haomeiwen.com/subject/zcqanxtx.html