美文网首页JDK源码解析
JDK1.8 SynchronousQueue源码解析

JDK1.8 SynchronousQueue源码解析

作者: i砖工 | 来源:发表于2020-04-27 11:06 被阅读0次

    同步队列:它继承了一般的AbstractQueue和实现了BlockingQueue接口。它与其它的BlockingQueue最大的区别就在它不存储任何数据,它的内部是一个栈(非公平)或者一个队列(公平策略)用来存储访问SynchronousQueue的线程,而访问它的线程有消费者和生产者,对应于方法put和take。当一个生产者或者消费者试图访问SynchronousQueue的时候,如果找不到与之能够配对的消费者或者生产者,则当前线程会阻塞,直到对应的线程将其唤醒,或者等待超时,或者中断。
    以下是它的put和take方法的实现,不管是put还是take,其核心都是调用transferer对象的transfer方法,所以要弄明白SynchronousQueue,就需要弄清楚Transferer。

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }
    
    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }
    

    我们先从整体上来看一下SynchronousQueue的内部结构:


    类结构图

    从类图上可以看出,SynchronousQueue内部引用了一个Transfer,而Transfer的实现有两种,一个是TransferStack,一个是TransferQueue.
    今天我们分析的重点对象是TransferStack.
    1.先来看下TransferStack是如何初始化的:

    //SynchronousQueue的构造方法
    public SynchronousQueue(boolean fair) {
          //如果初始化为非公平策略,则使用TransferStack
          transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
    
    

    2.仔细看一下TransferStack的实现:

    static final class TransferStack<E> extends Transferer<E> {
            //代表本次transfer的操作是获取数据
            static final int REQUEST    = 0;
           //代笔本次transfer的操作是放入数据
            static final int DATA       = 1;
           //代表节点正在配对
            static final int FULFILLING = 2;
           //判断节点目前是否在匹配中,3&2 == 2(匹配中)  
           //2&2 == 2(匹配中)    1&2==0 , 0&2==0 (等待匹配)
            static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
    
            /** 内部类,栈节点*/
            static final class SNode {
                volatile SNode next;        // next node in stack
                volatile SNode match;       // 配对的节点,如果未配对则未空
                volatile Thread waiter;     // to control park/unpark
                Object item;                // data; or null for REQUESTs
                int mode;
    
                SNode(Object item) {
                    this.item = item;
                }
    
                boolean casNext(SNode cmp, SNode val) {
                    return cmp == next &&
                        UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
                }
    
               /**
               尝试配对
               s:被配对的节点
               配对的逻辑为:通过CAS设置节点的match属性,如果能设置成功,则说明配对成功,配对成功后,再通过LockSupport.unpark(w);
    将其对应的等待线程唤醒。
              这个方法比较简答,就不再累述
               **/
                boolean tryMatch(SNode s) {
                    if (match == null &&
                        UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                        Thread w = waiter;
                        if (w != null) {    // waiters need at most one unpark
                            waiter = null;
                            LockSupport.unpark(w);
                        }
                        return true;
                    }
                    return match == s;
                }
    
                /**
                 * Tries to cancel a wait by matching node to itself.
                 如果一个节点等待超时,或者线程被中断,则需要取消节点的等待,而取消等待的标志就是将match指向自己
                 */
                void tryCancel() {
                    UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
                }
                boolean isCancelled() {
                    return match == this;
                }
    
                // Unsafe mechanics
                //Unsafe机制, 不懂的就不要看juc包了,先搞懂Unsafe再看
                private static final sun.misc.Unsafe UNSAFE;
                private static final long matchOffset;
                private static final long nextOffset;
    
                static {
                    try {
                        UNSAFE = sun.misc.Unsafe.getUnsafe();
                        Class<?> k = SNode.class;
                        matchOffset = UNSAFE.objectFieldOffset
                            (k.getDeclaredField("match"));
                        nextOffset = UNSAFE.objectFieldOffset
                            (k.getDeclaredField("next"));
                    } catch (Exception e) {
                        throw new Error(e);
                    }
                }
            }
    
            /** The head (top) of the stack */
            //栈顶节点
            volatile SNode head;
           //重新CAS栈顶节点,一般都用在栈顶元素出栈的时候
            boolean casHead(SNode h, SNode nh) {
                return h == head &&
                    UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
            }
    
            //构造栈节点
            static SNode snode(SNode s, Object e, SNode next, int mode) {
                if (s == null) s = new SNode(e);
                s.mode = mode;
                s.next = next;
                return s;
            }
    
            //关键方法transfer, 通过e是否为空来判断本次操作是获取元素还是放入元素
            E transfer(E e, boolean timed, long nanos) {
                SNode s = null; // constructed/reused as needed
                int mode = (e == null) ? REQUEST : DATA;  //e为空, 表示该次请求为获取元素,e不为空则是生产放入元素
                for (;;) {
                    SNode h = head;
                    //①当头节点为空,则新加入节点直接入栈,然后进入等待,当头节点不为空,并且本次操作的模式和头节点模式一样,则继续进入栈等待,否则去到代码②
                    if (h == null || h.mode == mode) {  // empty or same-mode       
                        if (timed && nanos <= 0) {      // can't wait //不需要等待
                            if (h != null && h.isCancelled()) //头节点已经中断或者超时,重新设置头节点
                                casHead(h, h.next);     // pop cancelled node
                            else
                                return null; //由于不能等待,而头节点与本次入队列的节点操作模式相同,所以直接返回空(如果可以等待的话会进入自旋)
                        } else if (casHead(h, s = snode(s, e, h, mode))) { //新节点入栈
                            SNode m = awaitFulfill(s, timed, nanos); //等待该节点被配对或者超时中断,详情见下面的方法具体解释。
                            if (m == s) {               // wait was cancelled
                                clean(s);  //清理从head节点到s.next节点之间的“取消”节点
                                return null;  //返回null,线程被中断或者节点等待超时,调用者(上层应用)能处理线程中断逻辑。
                            }
                    
                             //s节点从awaitFulfill中出来,说明是配对成功节点将其唤醒,而唤醒的节点一定是栈顶节点(见代码②逻辑,因为每次只允许有一个节点进入配对状态,
                             //进入配对状态后,其它节点无法加入栈,只有等到配对成功,重新casHead后其它节点才能入栈,所以这里直接判断头节点)
                            if ((h = head) != null && h.next == s) //因为匹配线程那边也可能在调用casHead,所以这里h.next == s判断一下head是否已经被重置  
                                casHead(h, s.next);     // help s's fulfiller   
                            
                            return (E) ((mode == REQUEST) ? m.item : s.item); //如果是消费者则返回匹配上的m.item, 如果是生产者,则返回生产的节点的item
                         }
                    }
                    //②     
                    //本次操作模式和头节点模式不相等,先检查头节点模式是否为匹配中。
                    //头节点模式为 3或者2时为匹配中,这时任何新增的节点都无法入栈,因为任何新增操作的模式都不等于3或者2,所以在这次匹配完成之前,head节点是不会变化的。
                    else if (!isFulfilling(h.mode)) { 
                        //头节点模式为:0或者1
                        //head节点已经取消,所以将head指向下一个节点,head出栈
                        if (h.isCancelled())            
                             casHead(h, h.next);         // pop and retry 
                
                        //②-1
                        //如果头节点模式为0或1,则本次操作的节点入栈,入栈的模式为:2或者3, ps(这个时候头节点判断isFullfilling则返回true),
                        //所以一旦有节点在匹配中,则其它新增节点都会直接去到代码③
                        //FULFILLING=2,  2|0 == 2, 2|1==3
                        else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { 
                
                            for (;;) { // loop until matched or waiters disappear
                                SNode m = s.next;       // m is s's match
                                //如果新的头节点后面没有等待节点了(等待节点有可能被中断或者超时出栈了),则清空栈,自旋重新开始
                                if (m == null) {        // all waiters are gone
                                    casHead(s, null);   // pop fulfill node
                                    s = null;           // use new node next time
                                    break;              // restart main loop
                                }
                                //②-2如果下一个节点不为空,则尝试匹配
                                SNode mn = m.next;
                                if (m.tryMatch(s)) { 
                                    casHead(s, mn);     // pop both s and m //匹配成功,则弹出头节点和等待节点
                                    //如果本次操作是获取数据,则返回匹配上的生产数据的item,相反, 如果本次操作是生产数据,则直接返回自己生产的数据。
                                    return (E) ((mode == REQUEST) ? m.item : s.item);
                                } else                  // lost match
                                    //如果匹配失败(理论上一次只有一个节点能够进行匹配,见②-1,所以理论上不应该会匹配失败,但是为什么会失败呢?
                                    //因为匹配的m还有可能超时或者被中断),所以如果匹配失败,弹出m节点。
                                    s.casNext(m, mn);   // help unlink
                            }
                        }
                    } 
                    //③如果头节点不为空,并且本次操作的模式和头节点的模式不相同,并且头节点是正在匹配
                    else {                            // help a fulfiller
                        SNode m = h.next;               // m is h's match
                        //当②中新的匹配节点进入后,有可能等待节点超时或者被中断了,即waiter is gone
                        if (m == null)                  // waiter is gone
                        //删除匹配节点,栈清空重新开始,因为最外层有一个自旋,所以又会重新安置这个节点。
                        casHead(h, null);           // pop fulfilling node
                        else {
                            //这一段代码逻辑与②-2相同
                            SNode mn = m.next;
                            if (m.tryMatch(h))          // help match
                                casHead(h, mn);         // pop both h and m
                            else                        // lost match
                                h.casNext(m, mn);       // help unlink
                        }
                    }
                }
            }
    
            //让节点进入等待(通常发生在节点入栈时发现没有对应的匹配者)
            //根据设置的是否等待超时条件,让当前线程进入等待配对的自旋逻辑中,只有当配对成功,或者线程中断才会退出。
            SNode awaitFulfill(SNode s, boolean timed, long nanos) {
                final long deadline = timed ? System.nanoTime() + nanos : 0L;
                Thread w = Thread.currentThread();
                int spins = (shouldSpin(s) ?
                             (timed ? maxTimedSpins : maxUntimedSpins) : 0);
                for (;;) {
                    if (w.isInterrupted())  //当前线程已经中断,则取消当前节点的配对等待
                        s.tryCancel();
                    SNode m = s.match;
                    if (m != null) //当前节点已经配对成功,或者取消(m==s)
                        return m;
                    if (timed) { //有超时设置
                        nanos = deadline - System.nanoTime();
                        if (nanos <= 0L) { //已经超时
                            s.tryCancel(); //取消s节点的等待
                            continue;
                        }
                    }
                    //不超时(一直等待),或者设置超时时间后还没有超时。
                    if (spins > 0)
                        spins = shouldSpin(s) ? (spins-1) : 0; //减少循环次数
                    else if (s.waiter == null) //不再自旋,设置线程
                        s.waiter = w; // establish waiter so can park next iter
                    else if (!timed) //如果设置的是一直等待,则睡眠当前线程
                        LockSupport.park(this);
                    else if (nanos > spinForTimeoutThreshold) //如果是有时间的等待,但是如果还剩下小于1秒的时间,则不再park,直接继续自旋
                        LockSupport.parkNanos(this, nanos);
                }
            }
    
    
            //判断是否还需要继续自旋还是睡眠
            //这里要注意的是该方法只在awaitFulfill方法被调用,而awaitFulfill只在入栈元素和栈顶元素操作模式一致时(者说明栈顶元素不是匹配中状态,见代码②处的说明)
            boolean shouldSpin(SNode s) {
                SNode h = head;
                //这个逻辑尤其是h==null确实没看明白什么时候会出现h!=s然后h == null,因为s入栈时head会指向s,就算s在自旋过程中有新的head节点加入,那h也不会为null。 先打???
               //但是如果我来写这个方法的话, 我我会写一个shouldNotSpin,如下
                return (h == s || h == null || isFulfilling(h.mode));
            }
           //s不在栈顶,并且栈顶元素不在匹配中,则栈顶以后的元素可以暂时不用自旋,这样逻辑就很容易说得通
           //因为栈顶元素还没有进来匹配者,说明栈顶元素和s一样属于等待者(前者都还没被匹配,s自然不会被匹配,所以先睡一会儿)
            boolean shouldNotSpin(Snode s){
                Snode h = head;
                return (h!=null && h!=s && !isFulfilling(h.mode));
            }
    
            //清理节点出栈
            void clean(SNode s) {
                s.item = null;   // forget item
                s.waiter = null; // forget thread
                //这里为什么要尝试下一个,而不到下下一个????
                SNode past = s.next;
                if (past != null && past.isCancelled())
                    past = past.next;
            
                // Absorb cancelled nodes at head
                //清理头节点到past节点之间,从头节点开始连续的“取消”状态的节点(主要是清理头节点)
                SNode p;
                while ((p = head) != null && p != past && p.isCancelled())
                    casHead(p, p.next);
            
                // Unsplice embedded nodes
                //清理头节点到past节点之间,跳跃的被“取消”的节点(主要是清理头节点与past中间的节点)
                while (p != null && p != past) {
                    SNode n = p.next;
                    if (n != null && n.isCancelled())
                        p.casNext(n, n.next);
                    else
                        p = n;
                }
            }
    
            // Unsafe mechanics
            private static final sun.misc.Unsafe UNSAFE;
            private static final long headOffset;
            static {
                try {
                    UNSAFE = sun.misc.Unsafe.getUnsafe();
                    Class<?> k = TransferStack.class;
                    headOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("head"));
                } catch (Exception e) {
                    throw new Error(e);
                }
            }
        }
    

    核心逻辑就是以上的代码,只要记住:SynchronousQueue不存储实际的数据,它的栈或者队列中存储的是生产者和消费者节点,最开始进入栈的节点就成为了等待者(这里不管是生产还是消费),而后面进入的节点都需要根据操作的mode来判断是继续入栈等待还是入栈后立即进行匹配。

    相关文章

      网友评论

        本文标题:JDK1.8 SynchronousQueue源码解析

        本文链接:https://www.haomeiwen.com/subject/djvewhtx.html