美文网首页
ConcurrentLinkedQueue全解析

ConcurrentLinkedQueue全解析

作者: longhuihu | 来源:发表于2018-09-25 17:12 被阅读71次

    使用java的concurrent包写程序已经快两三年了,至今对其实现原理不怎么了解,突然感到非常惭愧。下定决心从这个包里面找几个核心类,从源码级别做一个全面的分析,搞懂它们的实现原理,以及它们为啥这么快,这一篇挑选ConcurrentLinkedQueue。

    ConcurrentLinkedQueue

    ConcurrentLinkedQueue是一个无锁化、非阻塞、线程安全的单向队列,JDK1.5提供,由大名鼎鼎的Doug Lea编写。不过要是你认为这是大神灵感爆发的杰作,那就错了,仔细读读java源文件的注释就知道,这个类的实现原理来自Michael & Scott设计的算法(请参考论文),而Michael & Scott也是在很多前人研究的基础上加以综合完善得到该算法的,成为很多具体平台并发FIFO队列实现的蓝本。如果不读读这篇论文,直接阅读java源码,光凭那点注释,会有点摸不着头脑。

    我们要研究的ConcurrentLinkedQueue可以说是这个算法的java版本,当然针对java语言做了很多修改。到这里,你要是认为Doug Lea就干了点将原始算法翻译成java的活,那又错了,从一个理论算法到具体某个语言的实现还有相当长的距离;而且java语言本来就比较慢,所以为了挖掘性能,作者做了大量精细而巧妙的设计。

    ConcurrentLinkedQueue实现的依赖cas操作和java的valotile语义,所以不理解这两点是无法看懂代码的。关于cas操作和valotile关键字,不是本文要讲的内容,有很多的资料可以查询,比如:
    JAVA中的CAS
    volatile关键字解析

    说明:

    1. java里面并没有指针的概念,但是算法原理结构理论讲队列,有“头指针”、“尾指针”的说法,为了方便,下面在讲解ConcurrentLinkedQueue内部结构时也会使用这两个术语;
    2. 采用的源码是Java 1.8版本;
    3. 强烈建议先阅读cas和java内存模型相关资料。

    一、队列的内部链表结构

    下面是摘录自源码,经过简化的ConcurrentLinkedQueue定义片段。

    public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
            implements Queue<E>, java.io.Serializable {
         
        private transient volatile Node<E> head;
         private transient volatile Node<E> tail;
    
        public ConcurrentLinkedQueue() {
            head = tail = new Node<E>(null);
        }
     }
    

    一个空的ConcurrentLinkedQueue包含一个空节点(数据字段=null),队列的HEAD和TAIL指针都指向这个节点。当经过一些出入队列的操作以后,队列的结构可能类似下面:

    【N1】->【N2】->【N3】->【N4】->【N5】->【N6】
    HEAD                                                                    TAIL

    但是,也有可能类似这样:
    【N1】->【N2】->【N3】->【N4】->【N5】->【N6】
                      HEAD                                  TAIL

    上边这个结构,表示N1节点已经出了队列,此时的HEAD是N2节点,但是N1节点的next指针仍然保持着。而TAIL节点指向的N5却不是真正的队尾,队尾是N6。这是因为在入队列的过程中,由于并发,导致TAIL更新不及时。

    同样还是上面的结构,如果我把里面的数据字段标识出来,有可能是这样的:
    【N1(null)】->【N2(null)】->【N3(data)】->【N4(data)】->【N5(data)】->【N6(data)】
                              HEAD                                                                 TAIL
    HEAD节点的数据字段是NULL,说明这是一个空节点,这个队列语义上第一个节点应该是N3才对。

    再来看一个状态:
    【N1】<-| 【N2】->【N3】->【N4】->【N5】->【N6】
                      HEAD                                  TAIL
    N1节点的next字段指向自身,这表示N1节点彻底断开和队列的关系,为什么不直接将N1的next置为null呢,因为这样就不能直接断定N1到底是不是尾节点了。

    上面这些情形是ConcurrentLinkedQueue在执行一些操作之后可能处于的状态,之所于允许这些看起来不够严谨的状态,是为了在并发过程中提高效率。但是不管如何,在整个生命周期内,算法保持以下属性:

    1. 链表里面至少会有一个节点,数据字段为null的节点是空节点;
    2. 顺着HEAD指针肯定能找到的真正的头节点,并能访问到所有节点;
    3. TAIL指针不一定指向有效节点,更不能保证指向真正的尾节点,但是它大部分情况下指向尾节点或接近尾节点,因此可以提高效率;
    4. 和队列断开的节点,next字段指向自身;
    5. 对于入了队列的节点(哪怕又出了队列),只有尾节点的next字段才等于null。

    在ConcurrentLinkedQueue内部链表上,可能有一个或多个数据字段为null的空节点,空节点虽然没有数据,但是对高效地保持链表的连接状态至关重要。另一方面,节点的数据字段和next字段值的变化有很强的规律性:数据字段在节点入队列是不为null,出队列时变为null;next字段入队列时是null,出队列时先保持不变,再指向自身。这种规律性可以有效规避掉cas操作的ABA问题。

    上面这些也可以认为是ConcurrentLinkedQueue实现算法的不变式,有些在源码注释里面就有说明,有些是我总结的。带着这些原则才能理解每个方法的实现为什么是这样的。

    二、内部节点类

    直接上源码吧,由于Node提供一个比较简单的节点数据结构,逻辑不多,但是我在读的过程中还是有一些疑惑,直接将解释写在下面的源码块里面了。

    private static class Node<E> {
           volatile E item;
           volatile Node<E> next;
    
           /**
            * Constructs a new node.  Uses relaxed write because item can
            * only be seen after publication via casNext.
            */
           Node(E item) {
               UNSAFE.putObject(this, itemOffset, item);
           }
           为什么要使用UNSAFE.putObject而不直接赋值呢?刚看到“relaxed write”这个注释时一脸懵逼,
           后来才搞明白,因为item是volatile修饰的,如果直接赋值,会触发volatile的内存同步语义,
           在初始化阶段不需要如此,所以使用UNSAFE.putObject能提高一些性能。
    
           boolean casItem(E cmp, E val) {
               return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
           }
           cas操作设置数据字段,这个操作在出队列操作时,保证并发安全。
    
           void lazySetNext(Node<E> val) {
               UNSAFE.putOrderedObject(this, nextOffset, val);
           }
           这个操作相比UNSAFE.putObject,会一定程度禁止指令重排,
           相比volatile赋值,不保证全局可见性,性能也稍好一些,用于节点出队列后断开链接,
    
           boolean casNext(Node<E> cmp, Node<E> val) {
               return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
           }
           给节点设置next字段,入队列的关键操作,cas操作保证并发安全
           
           // Unsafe mechanics
           private static final sun.misc.Unsafe UNSAFE;
           private static final long itemOffset;
           private static final long nextOffset;
     } 
      
    

    三、出队列:poll方法

    poll的目标是安全地取出第一个有效节点的数据,如果没有返回null。
    源码如下,为了方便分析,对关键行,我加上了L1~Ln这样的行标记 。

        public E poll() {
            restartFromHead:
            for (;;) {
            L1 for (Node<E> h = head, p = h, q;;) {
                    E item = p.item;
    
            L2      if (item != null && p.casItem(item, null)) {
                        // Successful CAS is the linearization point
                        // for item to be removed from this queue.
            L3          if (p != h) // hop two nodes at a time
            L4              updateHead(h, ((q = p.next) != null) ? q : p);
                        return item;
                    }
            L5      else if ((q = p.next) == null) {
            L6          updateHead(h, p);
                        return null;
                    }
            L7      else if (p == q)
                        continue restartFromHead;
                    else
            L8          p = q;
                }
            }
        }
    
    • L1行:开始一个类似遍历的for循环,初始化h,p为head节点,p是当前检查的节点,q是下一个节点。
    • L2行:如果p节点的数据不为空,那么p肯定就是真正的第一个节点,只要能将节点的数据字段(item)置为null,出队列就算成功(参考第一节总结的节点属性);p.casItem(item, null)操作保证了并发poll的安全,如果有其他线程抢先一步执行成功,那么casItem操作会失败。

    L2行如果执行成功,那么出队列可以算是完成了,此时的head指向一个空节点,这种情况是允许的,但是队列里面也不能总是留很多空节点,所以L3L4行,就是把head指针后移。

    • L3行:p==h,说明刚出队列的p就是遍历开始的第一个节点,有2种情况,1)没有并发poll,head指向了空节点,队列里面只有且仅有一个空节点,此时没有必要做head移动;2)有并发poll,p==h说明本线程在竞争中领先,那么交由那个落后的线程来做head移动。
    • L4行:如果p.next==null,说明p是尾节点,链表空了,head也指向p;否则head指向p.next。两种情况都是正确的。
    • L5行:无论p的item字段为null,还是在并发中竞争失败,我们都要往后遍历;于是q=p.next,如果q==null,说明链表空了(参考第一节总结的节点属性),此时将head置为尾节点;当然,如果h==p是不需要执行的,updateHead里面有这个判断。
    • L7行:p==q实际上就是p==p.next,也即p的next指向自身,这说明p节点已经和链表断开了(参考第一节总结的节点属性),这种情况下,只能重新开始poll。
    • L8行:正常迭代下一个节点。

    上面L2~L4行,每个线程在成功poll一个节点之后,就会尝试把头结点移到p的下一个节点,这样保证了head后面最多只有一个空节点。L3行的那个判断如果没有,也不会影响程序的正确性,但是性能会差一些。

    再介绍一下更新头指针的updateHead方法:

        final void updateHead(Node<E> h, Node<E> p) {
            if (h != p && casHead(h, p))
                h.lazySetNext(h);
        }
    

    将头指针从h转移到p,就是将节点h出队列,一旦出队列成功,就将h的next字段指向自身,防止那些出队列的节点仍然互相链接,妨碍垃圾收集,这个操作也维护了第一节总结的节点属性。值得注意的是h.lazySetNext(h)操作没有volatile的语义,有可能对其他线程暂时不可见。 假设poll方法执行到L5行时,p节点已经被另外一个线程出队列了,但是本线程不知道,那么L7行的判断也可能失败,最终执行到L8行,这种情况下程序仍然是正确的,不过会多遍历一些节点。(只能认为,作者经过测量,认为这种情况造成的性能损失低于volatile造成的性能损失)。

    还有一个有意思的现象是,在poll的过程中,并不会去修正TAIL指针,所以在链表的TAIL指针是有可能落在HEAD之后的,甚至暂时指向一个已经出队列的节点。

    四、入队列:offer方法

    offer的目标是将新节点插入到尾节点的后面,即使在并发情况下也不丢失数据。

    public boolean offer(E e) {
            checkNotNull(e);
            final Node<E> newNode = new Node<E>(e);
    
         L1 for (Node<E> t = tail, p = t;;) {
         L2     Node<E> q = p.next;
         L3     if (q == null) {
                    // p is last node
         L4         if (p.casNext(null, newNode)) {
                        // Successful CAS is the linearization point
                        // for e to become an element of this queue,
                        // and for newNode to become "live".
         L5            if (p != t) // hop two nodes at a time
         L6                 casTail(t, newNode);  // Failure is OK.
                        return true;
                    }
                    // Lost CAS race to another thread; re-read next
                }
         L7     else if (p == q)
                    // We have fallen off list.  If tail is unchanged, it
                    // will also be off-list, in which case we need to
                    // jump to head, from which all live nodes are always
                    // reachable.  Else the new tail is a better bet.
         L8        p = (t != (t = tail)) ? t : head;
                else
                    // Check for tail updates after two hops.
         L9         p = (p != t && t != (t = tail)) ? t : q;
            }
        }
    
    • L1行:开启一个从tail指针开始遍历的循环,p指向当前疑似尾节点;
    • L2行:初始化q=p的下一个节点;
    • L3行:如果q==null,说明此刻p确实是尾节点,新节点应该插到后面;
    • L4行:casNext执行安全的插入操作;
      如果成功插入,那么应该考虑更新tail指针,L5L6行执行这个操作;
    • L5行:和poll方法一样,如果插入的地方不是在tail处,才往后移,这个行为可以保证,tail后面最多还有一个节点;
    • L6行:cas操作安全移动tail指针。
      后面的两个if分支说明p不是尾节点。
    • L7行:p==q,也即p的next字段指向自身,说明p是一个脱离了链表的节点(并发poll操作造成的,参考第一节总结的节点属性)),需要找一个节点重新开始遍历。
    • L8行:从head重新开始肯定正确,但是如果tail指针有更新过,那么从tail开始大概率可能效率更高。
    • L9行:按道理直接p=q跳到下一个节点就Ok了,但是代码里面做了这个判断(p != t && t != (t = tail)),如果p已经正常往后移动了一次,且tail发生了变化,那么从新的tail重新开始。为什么要加个(p!=t)的前置判断呢?我认为是为了提高效率,因为tail是valotile变量,读取有一定代价,当p==t的时候,作者认为再往后跳一下成功的概率挺高。(作者应该经过测量,可见对性能的压榨已经丧心病狂了)。

    移除操作:remove

    再来看移除操作,通过数据字段的比较,来移除数据相等的节点。

    public boolean remove(Object o) {
        if (o != null) {
            Node<E> next, pred = null;
            for (Node<E> p = first(); p != null; pred = p, p = next) {
                boolean removed = false;
                E item = p.item;
            L1   if (item != null) {
            L2      if (!o.equals(item)) {
            L3          next = succ(p);
            L4          continue;
                    }
            L5      removed = p.casItem(item, null);
                }
    
            L6  next = succ(p);
            L7  if (pred != null && next != null) // unlink
                    pred.casNext(p, next);
                if (removed)
                    return true;
            }
        }
        return false;
    }
    
    • L1行:判断当前是不是空节点。
    • L2行:如果节点数据与输入参数相等,说明这是一个需要移除的节点,否则应该跳到下一个节点;
    • L3/4行:节点数据不相等,取出当前节点p的下一个节点,然后重新开始循环;
    • L5行:节点数据相等,尝试清空数据;如果别的线程并发执行remove或poll,这一步操作可能失败;
    • L6行:执行到这一行说明,p是空节点,(本来就是空,或在L5被清空);取出next节点;
    • L7行:由于p是空节点,尝试将p的前继和后继相连;

    上面再取p的下一个节点的时候,用了调用了succ这个方法,这里也看一下:

       final Node<E> succ(Node<E> p) {
            Node<E> next = p.next;
            return (p == next) ? head : next;
        }
    

    很简单,如果p的next指向自己,说明p已经脱离链表,此时返回head指向的节点。

    源码可以看出,如果队列里面有多个相同数据的节点,一次remove调用最多删除一个。

    迭代器

    private class Itr implements Iterator<E> {
         L1 private Node<E> nextNode;
         L2 private E nextItem;
         L3 private Node<E> lastRet;
    
            Itr() {
         L4     advance();
            }
            private E advance() {
         L5     lastRet = nextNode;
         L6     E x = nextItem;
    
                Node<E> pred, p;
         L7       if (nextNode == null) {
         L8           p = first();
                    pred = null;
                } else {
                    pred = nextNode;
         L9         p = succ(nextNode);
                }
    
                for (;;) {
         L10        if (p == null) {
                        nextNode = null;
                        nextItem = null;
                        return x;
                    }
                    E item = p.item;
         L11        if (item != null) {
                        nextNode = p;
                        nextItem = item;
                        return x;
                    } else {
                        // skip over nulls
         L12            Node<E> next = succ(p);
         L13            if (pred != null && next != null)
         L14                pred.casNext(p, next);
         L15            p = next;
                    }
                }
            }
    
            public boolean hasNext() {
         L16    return nextNode != null;
            }
    
            public E next() {
                if (nextNode == null) throw new NoSuchElementException();
         L17    return advance();
            }
    
            public void remove() {
         L18    Node<E> l = lastRet;
                if (l == null) throw new IllegalStateException();
                // rely on a future traversal to relink.
         L19    l.item = null;
         L20    lastRet = null;
            }
        }
    

    源代码稍微长点,我删掉了注释。

    • L1行:成员变量nextNode指向下一个节点,用来支持hasNext,next等操作;
    • L2行:成员变量nextItem是下一个节点的值,即nextNode的item值;为什么单独需要这个变量呢,因为由于并发,nextNode的item字段有可能被置空,而迭代器在hasNext的时候是不允许返回空值的,所以在迭代到nextNode的时候,立即取出item字段保存起来。
    • L3行:成员变量lastRet指向上一个节点,用来支持删除操作;
    • L4行:构造函数仅调用advance方法,从名字上推测,应该是将迭代器往后推一步;
    • L5行:核心方法advance的第一行,先保存lastRet,因为这个方法执行完了以后,nextNode就指向下一个节点了;
    • L6行:临时变量x保存nextItem,因为advance执行完以后,nextItem就变成下一个节点的值了;
    • L7行:如果nextNode==null,说明这是初始化操作;
    • L8行:直接调用p=first(),将p赋值成第一个有效节点或null(队列空的);
    • L9行:不是初始化调用,那么p赋值成nextNode的后继节点;
      至此p保存了nextNode的备选值
    • L10行:如果p==null,说明已经到了链表尾部了,迭代结束;
    • L11行:如果p.item!=null,说明p是有效节点,ok,就是它了;
    • L12~L15行:如果p.item==null,说明p是无效节点,我们应该跳过它,继续寻找;
    • L13~L14行:如果p是空节点,我们尝试将它的前继与后继节点相连;
    • L16行:hasNext()方法的实现,就看nextNode是否空;
    • L17行:next()方法,很简单就是调用advance;
    • L18~L20行:remove()方法,将lastRet指向节点的数据清空;因为当我们调用next方法时,返回的值对应的是lastRet节点。

    从上面的实现我们可以知道,在迭代过程中,如果有数据并发入队列,这些数据是可以被迭代到的;如果有值被并发删除或出队列,那么这些数据有可能也被迭代到。尤其是迭代器的remove方法,删除的有可能是一个不存在的数据。

    队列长度

    ConcurrentLinkedQueue的size方法是很没效率的的,实际是把队列遍历了一遍来计算长度。所以推荐大家使用isEmpty来判断队列是否为空,而不要使用size()==0。

    public boolean isEmpty() {
        return first() == null;
    }
    
     public int size() {
        int count = 0;
        for (Node<E> p = first(); p != null; p = succ(p))
            if (p.item != null)
                // Collection.size() spec says to max out
                if (++count == Integer.MAX_VALUE)
                    break;
        return count;
    }
    

    为什么ConcurrentLinkedQueue内部不维护一个size变量来跟踪队列的长度呢?不是不想,是做不到,由于ConcurrentLinkedQueue使用无锁化设计,通过cas操作来保证并发安全。而cas操作只能保证单个变量的并发安全性,无法在出入队列操作的同时,维护size变量。

    小结

    上面基本讲解了ConcurrentLinkedQueue源代码的大部分核心内容。在开篇的时候也讲过,高效的队列算法不是一朝一夕的出来的,看似短短一两百行代码,是经过前人大量的研究才得到的;因此我们读起来有些吃力是理所当然的,如果不去读一读相关的论文及其他资料,那就更困难了。ConcurrentLinkedQueue的代码在深入挖掘jvm特性的基础上,做了不少性能优化,比如offer方法的L8L9行;当然,上文的解释是我的推测,不一定正确,如果大家发现谬误之处,欢迎指正。

    相关文章

      网友评论

          本文标题:ConcurrentLinkedQueue全解析

          本文链接:https://www.haomeiwen.com/subject/bcmloftx.html