美文网首页
SynchronousQueue源码分析

SynchronousQueue源码分析

作者: 无聊之园 | 来源:发表于2019-01-27 19:09 被阅读0次

一、说明:SynchronousQueue是堵塞队列得一种,其特性是put操作和take操作必须是互相唤醒的,比如:put操作,必须take操作唤醒,take操作必须put操作唤醒,没有take操作,多个put操作都堵塞,等待一个一个take操作来一个一个唤醒,同理,take操作也一样,跟他的名字一样,同步队列,就好像,你是救我的药,我也是救你的药。
误区:很多人把其认为其没有容量,不存储元素,这是错的。
正解: SynchronousQueue的公平队列TransferQueue是一个单项链表,是会存储元素的。

二、源码分析
从构造方法入手。看代码注释。
无参构造方法,默认给得参数是false,也就是非公平队列。
非公平队列使用得是TransferStack, 公平队列使用得是TransferQueue。公平和非公平也就是put和take数据的顺序是否是公平的,也就是先放则先拿,后放则后拿。

 /**
     * Creates a {@code SynchronousQueue} with nonfair access policy.
     */
// 无参构造方法,默认给得参数是false,也就是非公平队列。
    public SynchronousQueue() {
        this(false);
    }
    /**
     * Creates a {@code SynchronousQueue} with the specified fairness policy.
     *
     * @param fair if true, waiting threads contend in FIFO order for
     *        access; otherwise the order is unspecified.
     */
// 非公平队列使用得是TransferStack, 公平队列使用得是TransferQueue。
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

这里我们研究公平队列,也就是传参true。
TransferQueue类中有一个静态内部类Qnode,TransferQueue中存储得一个元素就是一个Qnode,Qnode有next,说明是一个单项链表

static final class TransferQueue<E> extends Transferer<E> {
        /*
         * This extends Scherer-Scott dual queue algorithm, differing,
         * among other ways, by using modes within nodes rather than
         * marked pointers. The algorithm is a little simpler than
         * that for stacks because fulfillers do not need explicit
         * nodes, and matching is done by CAS'ing QNode.item field
         * from non-null to null (for put) or vice versa (for take).
         */

        /** Node class for TransferQueue. */
        static final class QNode {
            volatile QNode next;          // next node in queue
            volatile Object item;         // CAS'ed to or from null
            volatile Thread waiter;       // to control park/unpark
            final boolean isData;

            QNode(Object item, boolean isData) {
                this.item = item;
                this.isData = isData;
            }

TransferQueue中还有两个变量。1、head,头节点。2、tail,尾节点。

        /** Head of queue */
        transient volatile QNode head;
        /** Tail of queue */
        transient volatile QNode tail;
        /**
         * Reference to a cancelled node that might not yet have been
         * unlinked from queue because it was the last inserted node
         * when it was cancelled.
         */
        transient volatile QNode cleanMe;

还有三个方法。方法作用看代码注释。
advanceHead:cas替换,把TransferQuene的旧头节点替换为新节点,旧头节点得next指向旧头节点自己。
advanceTail:cas替换,把TransferQuene的旧尾节点替换为新节点。

/**
         * Tries to cas nh as new head; if successful, unlink
         * old head's next node to avoid garbage retention.
         */
// cas替换,把TransferQuene的旧头节点替换为新节点,旧头节点得next指向旧头节点自己。
        void advanceHead(QNode h, QNode nh) {
            if (h == head &&
                UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
                h.next = h; // forget old next
        }

        /**
         * Tries to cas nt as new tail.
         */
// cas替换,把TransferQuene的旧尾节点替换为新节点。
        void advanceTail(QNode t, QNode nt) {
            if (tail == t)
                UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
        }

        /**
         * Tries to CAS cleanMe slot.
         */
// 同上
        boolean casCleanMe(QNode cmp, QNode val) {
            return cleanMe == cmp &&
                UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
        }

接下来跟着SynchronousQueue的put方法走看源码。
调用构造方法的时候,就创建了一个dummy node(假节点),TransferQueue的头和尾指针都指向这个节点。也就是这种结构:head(tail)(dummy)

// 测试方法
    @Test
    public void testPut() throws InterruptedException {
        SynchronousQueue<String> queue = new SynchronousQueue<>(true);
        queue.put("hello");
    }
// SynchronousQueue的构造方法,我们传入了true,所以使用的queue是TransferQueue。
 public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
// 调用构造方法的时候,就创建了一个dummy node(假节点),TransferQueue的头和尾指针都指向这个节点。
    TransferQueue() {
            QNode h = new QNode(null, false); // initialize to dummy node.
            head = h;
            tail = h;
        }
static final class QNode {
        // next 指针   
         volatile QNode next;          // next node in queue
        // 实际保存的元素    
        volatile Object item;         // CAS'ed to or from null
        // 锁和唤醒的线程    
        volatile Thread waiter;       // to control park/unpark
        // 是否是数据(put则为true,take则会false)    
        final boolean isData;
            QNode(Object item, boolean isData) {
                this.item = item;
                this.isData = isData;
            }

然后看起put方法

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }
// e:put的元素。timed:是否具有超时机制。nanos:超时时间。
 E transfer(E e, boolean timed, long nanos) {
            QNode s = null; // constructed/reused as needed
            // put操作这里e不为null,isData为true
            boolean isData = (e != null);
            // 自旋
            for (;;) {
                QNode t = tail;
                QNode h = head;
                // 还没有初始化,则continue重新来过
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin
                // h == t则说明还没有存入元素。这是为true,进入。进入这个if就会堵塞元素,不管take还是put。
                if (h == t || t.isData == isData) { // empty or same-mode
                    // tn为null。(顺便说一句:next变量是volatile修饰,多线程可见,所以下面这个复制操作是线程安全的。)
                    QNode tn = t.next;
                  // 其他线程把尾节点改变了,则再旋一次
                    if (t != tail)                  // inconsistent read
                        continue;
                    // tn为null
                    if (tn != null) {               // lagging tail
                        advanceTail(t, tn);
                        continue;
                    }
                    // 无超时机制
                    if (timed && nanos <= 0)        // can't wait
                        return null;
                    // 创建put操作存放数据的新节点,isData为true。
                    // 如果没有元素,take操作进入这里,也创建了一个新node,也就是说take操作堵塞的时候,也会创建节点,
                    if (s == null)
                        s = new QNode(e, isData);
                    // cas替换,从:head(tail)(dummy)到head(tail)(dumy)->S
                    // cas替换,尾节点的next指向新建立的s节点。失败则再旋一次
                    if (!t.casNext(null, s))        // failed to link in
                        continue;
                    /*cas替换,把TransferQuene的旧尾节点t替换为新节点s。
                        至此,变成了head(dumy)=>s(tail)
                    */
                    advanceTail(t, s);              // swing tail and wait
                    // 下面这个方法调用就是具体的堵塞调用,看下一段代码分析
                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }
                    if (!s.isOffList()) {           // not already unlinked
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;

                } else {                            // complementary-mode
                    QNode m = h.next;               // node to fulfill
                    if (t != tail || m == null || h != head)
                        continue;                   // inconsistent read

                    Object x = m.item;
                    if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // lost CAS
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }

                    advanceHead(h, m);              // successfully fulfilled
                    LockSupport.unpark(m.waiter);
                    return (x != null) ? (E)x : e;
                }
            }
        }
 /**
         * Spins/blocks until node s is fulfilled.
         *
         * @param s the waiting node
         * @param e the comparison value for checking match
         * @param timed true if timed wait
         * @param nanos timeout value
         * @return matched item, or s if cancelled
         */
// s是新节点,e是put的真正数据,timed是是否超时,nanos是超时时间
 Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
            /* Same idea as TransferStack.awaitFulfill */
          // deadLine为0
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            // 自旋次数,第一次put的时候,链表结构为:head(dumy)=>s(tail),所以
// head.hext == s,timed为不超时false,所以spin=maxUntimedSpins=512,
// 如果第二个put堵塞,则结构为:head(dumy)=>s=>s1(tail),而head.next和s1(新put的元素是s1)不相等,所以,spin=0直接堵塞。
//(为什么会自旋,我估计是为了,高并发下,take操作
// 和put操作很可能再极短时间进行,这样的话,就不需要唤醒线程和堵塞线程)
            int spins = ((head.next == s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                // 当前线程准备中断,则s节点的item指向s节点自己
                if (w.isInterrupted())
                    s.tryCancel(e);
                Object x = s.item;
              // 这里为false,只有当其他线程把这个元素替换了,比如put堵塞在这
// 里的时候,take就会把这个元素替换掉,然后put唤醒的时候就能直接return了。
                if (x != e)
                    return x;
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel(e);
                        continue;
                    }
                }
                // 这里spins初始为512,一直递减到0
                if (spins > 0)
                    --spins;
                else if (s.waiter == null)
                    // node节点和waiter和当前线程关联上,为了公平的唤醒。
                    s.waiter = w;
                else if (!timed)
                    // 锁住当前线程。设置thread的block变量为parkBlocker指向的对象transferQueue。
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }

再来看take方法

public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }
E transfer(E e, boolean timed, long nanos) {
           QNode s = null; // constructed/reused as needed
           // isData为false
           boolean isData = (e != null);
           for (;;) {
               QNode t = tail;
               QNode h = head;
               if (t == null || h == null)         // saw uninitialized value
                   continue;                       // spin
             // 第一put后结构变成了:head(dumy)=>s(tail)   
             /* 所以这里h不等于t,t.isData为true,
              *所以这里不成立,走else块。只要不堵塞,都会走else块。put操作如 
              果不堵塞,也会走else块。
                */
               if (h == t || t.isData == isData) { // empty or same-mode
                   QNode tn = t.next;
                   if (t != tail)                  // inconsistent read
                       continue;
                   if (tn != null) {               // lagging tail
                       advanceTail(t, tn);
                       continue;
                   }
                   if (timed && nanos <= 0)        // can't wait
                       return null;
                   if (s == null)
                       s = new QNode(e, isData);
                   if (!t.casNext(null, s))        // failed to link in
                       continue;

                   advanceTail(t, s);              // swing tail and wait
                   Object x = awaitFulfill(s, e, timed, nanos);
                   if (x == s) {                   // wait was cancelled
                       clean(t, s);
                       return null;
                   }

                   if (!s.isOffList()) {           // not already unlinked
                       advanceHead(t, s);          // unlink if head
                       if (x != null)              // and forget fields
                           s.item = s;
                       s.waiter = null;
                   }
                   return (x != null) ? (E)x : e;

               } else {                            // complementary-mode
                 //head(dumy)=>s(tail)
                  // m就是之前put的哪个节点 
                  QNode m = h.next;               // node to fulfill
                   if (t != tail || m == null || h != head)
                       continue;                   // inconsistent read
                   Object x = m.item;
                    /* 
isData=(e!=null),这是take操作,所以e为null,所以isData为false,x为之前put进去的值,为非null
x == m说明已经取消了,之前put操作的时候,
                   *awaitFulfill方法里,如果当前线程准备中断,
                   *就会调用qnode的tryCancel方法,让qnode的next指向自己,代表 
                    * 这个节点取消了。
                     head(dumy)=>s(tail)
                     *!m.casItem(x, e):直接替换item的值,
                        这样,在take方法堵塞在awaitFulfill方法里的时候,
                     这里直接把之前take方法创建的node的item改掉,
                     然后唤醒take的线程,然后take操作获取到这个新值了和它之前的值不一样,则直接跳出循环,不堵塞。
                      */
                   if (isData == (x != null) ||    // m already fulfilled
                       x == m ||                   // m cancelled
                       !m.casItem(x, e)) {         // lost CAS
                       advanceHead(h, m);          // dequeue and retry
                       continue;
                   }
                 // 变成了head(s)(tail)                    
                   advanceHead(h, m);              // successfully fulfilled
                   LockSupport.unpark(m.waiter);
                   return (x != null) ? (E)x : e;
               }
           }
       }

SynchronousQueue还有offer方法,poll方法。add方法。
offer方法,不会堵塞,可以存进去,则存进去(poll或take操作正在堵塞等着获取元素),否则,直接返回false。
poll方法可以有超时时间,和take差不多,take没有超时时间。
add方法,调用的就是offer方法,不过add方法,添加不进去,则直接报错。

总结
新建TransQueue初始化结构为:head(dumy)(tail)
第一个次put后:head(dumy)=>s(tail)
第二个次put后:head(dumy)=>s=>s1(tail)

第一次put后紧接着take后:head(s)(tail)

链表中没有put过的节点,则自旋512次,有,则直接堵塞

put和take都会存储元素,take存储的元素item为null,再堵塞之后,put操作会cas替换这个元素的item,然后唤醒take操作,获取这个新元素。

相关文章

网友评论

      本文标题:SynchronousQueue源码分析

      本文链接:https://www.haomeiwen.com/subject/wnagjqtx.html