J.U.C 阻塞队列源码剖析系列(四)之 Synchronous

作者: 爱打乒乓的程序员 | 来源:发表于2020-03-10 18:20 被阅读0次

上一篇文章剖析了 LinkedBlockingQueue 的相关源码,那这篇文章接着看另外一个常见的阻塞队列 —— SynchronousQueue

简介

SynchronousQueue 是一个比较特殊的阻塞队列类,为什么这样说呢?我们不妨从官方的类注释说起...

根据类注释可大概得出以下几点:

  • 每一个插入操作都必须等待另一个线程完成删除操作
  • 队列没有内部容量,所以不能迭代数据
  • 可以选择公平策略。公平策略是使用队列先入先出,非公平策略是使用堆栈先入后出

咦?SynchronousQueue 对象没有容量,那这个阻塞队列的使用场景是什么呢?
其实线程池的其中一种实现——Executors.newCachedThreadPool就使用了SynchronousQueue作为阻塞队列

那先从一个demo开始揭开 SynchronousQueue 的庐山真面目吧!

public class SynchronousQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue synchronousQueue = new SynchronousQueue();
        new Thread(() -> {
            try {
                synchronousQueue.put("Hello World!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程一").start();
        new Thread(() -> {
            try {
                System.out.println(synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程二").start();
    }
}

示例中为什么在main方法里使用两个线程分别执行put和take操作呢?因为在同一线程中,有可能存在先执行take操作,当程序执行take方法的时候发现队列为空就会阻塞当前线程,那么之后的put方法就不会执行,线程将会一直等待。

源码剖析

成员变量

    // SynchronousQueue 定义的抽象类,由 TransferStack 和 TransferQueue 实现
    private transient volatile Transferer<E> transferer;

    // CPU 数量
    static final int NCPUS = Runtime.getRuntime().availableProcessors();

    // 自旋次数,如果transfer指定了timeout时间,则使用maxTimeSpins,如果CPU数量小于2则自旋次数为0,否则为32。不会随CPU数量增加而变化
    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;

    // 自旋次数,如果没有指定时间设置,则使用maxUntimedSpins。如果NCPUS数量大于等于2则设定为为32*16,否则为0
    static final int maxUntimedSpins = maxTimedSpins * 16;

    // 为了防止自定义的时间限过长,为了优化而设置,如果自定义时间长于这个值则取默认的 spinForTimeoutThreshold ,单位为纳秒。
    static final long spinForTimeoutThreshold = 1000L;

构造函数

    // 默认使用非公平策略
    public SynchronousQueue() {
        this(false);
    }

    // fair为false是非公平策略,使用的数据结构是栈;fair为true是公平策略,使用的数据结构是队列
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

先列出相关方法的源码,但并没有加上注释,因为核心方法都在 Transferer 对象中声明!(值得注意的是,SynchronousQueue类并没有实现remove、removeAll、peek、clear等方法,都是使用默认值)

添加(add、offer、put)、刪除、查找元素

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
            return true;
        if (!Thread.interrupted())
            return false;
        throw new InterruptedException();
    }

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        return transferer.transfer(e, true, 0) != null;
    }
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }
    
    public E poll() {
        return transferer.transfer(null, true, 0);
    }
    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

相对于 ArrayBlockingQueue 和 LinkedBlockingQueue,可以发现类似poll、take等相关方法都被抽象成统一方法来进行操作,通过抽象出内部类 Transferer 实现不同的操作。接下来,咱们重点看看公平模式与非公平模式下的源码。

1.SynchronousQueue 的非公平模式(TransferStack)

众所周知,栈是FILO(First in last out)的方式,所以也可以理解为非公平模式为什么使用栈这种数据结构,如果排队的时候第一个进来,最后一个才能走,这很不公平嘛!

在 TransferStack 内部有 REQUEST、DATA、FULFILLING 这三个状态。

REQUEST 表示请求从栈获取数据操作的消费者,如:take 方法;
DATA 表示往栈内部放数据的生产者,如:put 方法;
FULFILLING 表示正在交易的生产者或消费者

REQUEST 和 DATA 这两种状态理解起来还不难,但或许 FULFILLING 还是不太清楚有什么用,先带着疑问往下去看,现在只需要简单的理解为:不同状态 REQUEST 和 DATA 可以相互匹配的,当与栈顶匹配后就会将他们状态转换为 FULFILLING,当匹配成功后就会将栈顶和匹配的元素一同出栈。

成员变量

   //表示一个未填充的消费者
   static final int REQUEST = 0;
   //表示一个未填充的生产者
   static final int DATA = 1;
   // 表示生产者正在给等待资源的消费者补给资源,或生产者在等待消费者消费资源
   static final int FULFILLING = 2;
   //栈的头结点
   volatile SNode head;

栈节点

    // 栈节点
    static final class SNode {
        // 节点的后继
        volatile SNode next;
        // 相匹配的节点
        volatile SNode match;
        // 等待的线程
        volatile Thread waiter;

        // item和mode不需要可见,由于他们总是在其他可见/原子操作写之前,读之后
        Object item;// 数据
        int mode;//节点模式

        SNode(Object item) {
            this.item = item;
        }

        // cas保证线程安全设置节点后继节点
        boolean casNext(SNode cmp, SNode val) {
            return cmp == next && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        //尝试匹配目标节点与本节点,如果匹配,可以唤醒线程。补给者调用tryMatch方法,确定它们的等待线程。等待线程阻塞到它们自己被匹配。如果匹配返回true
        boolean tryMatch(SNode s) {
            // 设置本节点的匹配为s节点
            if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                Thread w = waiter;
                if (w != null) {
                    waiter = null;
                    LockSupport.unpark(w);
                }
                return true;
            }
            return match == s;
        }

        // 节点尝试取消等待,match 从原来的 null 变为this
        void tryCancel() {
            UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
        }

        // match 指向自己,则取消等待
        boolean isCancelled() {
            return match == this;
        }
    }

核心方法

  • isFulfilling:判断指定类型是否是互补模式
  • casHead(SNode h, SNode nh):替换当前头结点
  • SNode snode(SNode s, Object e, SNode next, int mode):生成SNode节点对象
  • transfer(E e, boolean timed, long nanos): 主要处理逻辑
  • awaitFulfill(SNode s, boolean timed, long nanos): 等待fulfill操作
  • shouldSpin(SNode s):判断节点s是头结点或是fulfill节点则返回true
  • clean(SNode s):将head节点到S节点之间所有已经取消的节点全部移出
    // 如果m是一个填充为单元,则返回true
    static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
        
    // 比较head是否为h,并且CAS操作nh为当前head
    boolean casHead(SNode h, SNode nh) {
        return h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
    }

    //创建或重新设置节点的变量。在节点入栈时创建,在当可能需要保证减少intervals(间隔)读和head的CAS操或避免由于竞争CAS操作节点入栈引起的垃圾时,此方法会被transfer调用
    static SNode snode(SNode s, Object e, SNode next, int mode) {
        if (s == null) s = new SNode(e);
        s.mode = mode;
        s.next = next;
        return s;
    }

    E transfer(E e, boolean timed, long nanos) {
        // 1.如果队列为空或已经包含相同模式的节点,则尝试节点入栈,等待匹配返回,如果取消返回null。
        // 2.如果包含一个互补模式的节点(take(REQUEST)->put(DATA);put(DATA)->take(REQUEST)),则尝试一个FULFILLING节点入栈,同时匹配等待的协同节点,两个节点同时出栈,返回匹配的元素。由于其他线程执行步骤3,实际匹配和解除链接指针动作不会发生。
        // 3.如果栈顶存在另外一个FULFILLING的节点,则匹配节点,并出栈。这段的代码与fulfilling相同,除非没有元素返回
        SNode s = null;
        // 根据元素判断节点模式,元素不为null,则为DATA,否则为REQUEST
        int mode = (e == null) ? REQUEST : DATA;

        for (;;) {
            //刚开始头节点为null,第一个进来的节点就是头节点。
            SNode h = head;
            if (h == null || h.mode == mode) {// 如果是空队列,或栈头节点的模式与要放入的节点模式相同
                if (timed && nanos <= 0) {
                    //如果超时,则取消等待,出栈,设置栈头为其后继
                    if (h != null && h.isCancelled())
                        casHead(h, h.next);
                    else
                        return null;
                } else if (casHead(h, s = snode(s, e, h, mode))) {
                    //如果非超时,则将创建的新节点入栈成功,即放在栈头,自旋等待匹配节点(timed决定是否超时)
                    SNode m = awaitFulfill(s, timed, nanos);
                    // 返回的m == s 表示该节点被取消了或者超时、中断了
                    if (m == s) {
                        // 如果返回的是自己,节点取消等待,从栈中移除,并遍历栈移除取消等待的节点
                        clean(s);
                        return null;
                    }
                    if ((h = head) != null && h.next == s)
                        //s节点匹配成功,则设置栈头为s的后继
                        casHead(h, s.next);
                    // 匹配成功,REQUEST模式返回,匹配到的节点元素(DATA),DATA模式返回当前节点元素
                    return (E) ((mode == REQUEST) ? m.item : s.item);
                }
            } else if (!isFulfilling(h.mode)) { // 如果栈头节点模式不为Fulfilling,判断是否取消等待,是则出栈
                if (h.isCancelled())            // already cancelled
                    casHead(h, h.next);         // pop and retry
                else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { //非取消等待,则是节点入栈
                    for (;;) { // 自旋直到节点匹配或者等待节点都没有
                        SNode m = s.next;
                        //后继节点为null,则出栈
                        if (m == null) {        // 堆栈中没有等待节点
                            casHead(s, null);   // 将栈头节点位置设置为null
                            s = null;           // 栈头节点设置为null,便于GC
                            break;              // 跳出当前循环,重新执行主循环
                        }
                        SNode mn = m.next;
                        // 尝试匹配 s 节点
                        if (m.tryMatch(s)) {
                            //匹配成功两个节点则出栈
                            casHead(s, mn);     // pop both s and m
                            return (E) ((mode == REQUEST) ? m.item : s.item);
                        } else
                            // 如果没有匹配成功,则说明已经有其它线程与 m 节点匹配了,将 mn 作为 s 的后继节点
                            s.casNext(m, mn);   // help unlink
                    }
                }
            } else {
                //如果栈头节点模式为Fulfilling,则代表栈头节点正在与其它节点匹配,可以理解为协助栈头节点匹配成功
                SNode m = h.next;               // m is h's match
                if (m == null)
                    //如果无后继节点,则栈头出栈
                    casHead(h, null);           // pop fulfilling node
                else {
                    //尝试匹配,如果匹配成功,栈头和匹配节点出栈,否则跳过后继节点
                    SNode mn = m.next;
                    if (m.tryMatch(h))          // help match
                        casHead(h, mn);         // pop both h and m
                    else                        // lost match
                        h.casNext(m, mn);       // help unlink
                }
            }
        }
    }

    // 自旋或阻塞,直到节点被一个fulfill操作匹配
    SNode awaitFulfill(SNode s, boolean timed, long nanos) {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Thread w = Thread.currentThread();
        //获取自旋的次数
        int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
        for (;;) {
            // 如果线程被中断,则取消等待
            if (w.isInterrupted())
                s.tryCancel();
            SNode m = s.match;
            // 如果节点的匹配节点不为null,则返回匹配节点
            if (m != null)
                return m;
            if (timed) {
                nanos = deadline - System.nanoTime();
                //如果超时,则取消等待
                if (nanos <= 0L) {
                    s.tryCancel();
                    continue;
                }
            }
            // 如果自旋次数大于零,且可以自旋,则自旋次数减1
            if (spins > 0)
                spins = shouldSpin(s) ? (spins-1) : 0;
            else if (s.waiter == null)
                //如果节点S的等待线程为空,则设置当前节点为S节点的等待线程,以便可以park后继节点。
                s.waiter = w; // establish waiter so can park next iter
            else if (!timed)
                //非超时等在者,park当前线程
                LockSupport.park(this);
            else if (nanos > spinForTimeoutThreshold)
                //如果超时时间大于,最大自旋阈值,则超时park当前线程
                LockSupport.parkNanos(this, nanos);
        }
    }

    // 如果节点在栈头或栈头为FULFILLING的节点,则返回true
    boolean shouldSpin(SNode s) {
        //因为很可能立刻就会有新的线程到来,那么就会立刻进行交易而不需要进行阻塞,然后被唤醒,这是需要过程的,所以这样的自旋等待是值得的。
        SNode h = head;
        return (h == s || h == null || isFulfilling(h.mode));
    }

    // 将head节点到S节点之间所有已经取消的节点全部移出。
    void clean(SNode s) {
        s.item = null;   // forget item
        s.waiter = null; // forget thread

        SNode past = s.next;
        if (past != null && past.isCancelled())
            past = past.next;

        // 如果取消的是头节点则运行下面的清理操作,操作逻辑很简单就是判断头结点是不是取消节点,如果是则将节点一定到下一个节点
        SNode p;
        while ((p = head) != null && p != past && p.isCancelled())
            //p是从头节点开始第一个不移除的节点
            casHead(p, p.next);

        // 取消不是头结点的嵌套节点
        while (p != null && p != past) {
            SNode n = p.next;
            if (n != null && n.isCancelled())
                //移除节点n
                p.casNext(n, n.next);
            else
                p = n;
        }
    }

因为堆栈的出栈和入栈操作都在 transfer 方法里面,所以不容易理解。建议读者多看几遍,结合下面的图和我个人分析的思路,一步一步的debug,这样理解起来应该就不难啦~

1.根据节点模式判断是入栈(put)还是出栈(take)操作
2.判断栈头是否为空或栈头节点操作是否和本次一样,是的话执行第3步,否则执行第6步
3.判断是否是超时操作,如果是超时操作的话则执行第4步,否则执行第5步
4.判断栈头是否非空并且是否可以取消,是的话将栈头后继节点cas操作成为栈头节点后执行第1步,否则返回null
5.cas操作创建节点并将该节点入栈,自旋等待匹配节点
6.判断栈头节点模式是否为Fulfilling,如果不是的话执行第7步,否则执行第10步
7.判断栈头节点是否需要取消等待,需要取消等待的话将栈头节点的后继节点cas操作成为头节点后重新执行第1步,否则执行第8步
8.栈头节点不需取消等待,将当前(take or put or poll)操作封装为一个节点入栈后自旋堆栈,直到栈头节点与栈中其它节点匹配后两个节点都出栈返回节点信息或者所有的等待节点都没有后跳出子循环重新执行第1步
9.栈头节点模式为Fulfilling,如果栈头节点的后继节点为null,cas设置栈头节点为null并执行第1步,否则继续尝试匹配栈中其它节点

2.SynchronousQueue 的公平模式(TransferQueue)

公平模式下使用的数据结构是队列,其方式是先进先出(FIFO:First In First Out)。就比如说咱们在结账排队的时候,肯定是先排队的人先结账呀,这样才公平!

队列节点

// 队列节点
static final class QNode {
    // 下一个节点
    volatile QNode next;
    // 元素信息
    volatile Object item;
    // 当前等待的线程
    volatile Thread waiter;
    // 是否是数据(put的时候是true,take的时候是false)
    final boolean isData;

    QNode(Object item, boolean isData) {
        this.item = item;
        this.isData = isData;
    }

    // 替换当前节点的next节点
    boolean casNext(QNode cmp, QNode val) {
        return next == cmp &&
                UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // 替换当前节点的item数据
    boolean casItem(Object cmp, Object val) {
        return item == cmp &&
                UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    // 取消当前操作,将当前item赋值为this(当前QNode节点)
    void tryCancel(Object cmp) {
        UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
    }

    // 如果item是this(当前QNode节点)的话就返回true,反之返回false
    boolean isCancelled() {
        return item == this;
    }

    // 如果已知此节点离队列,判断next节点是不是为this,则返回true
    boolean isOffList() {
        return next == this;
    }
}

成员变量

// 队列头节点
transient volatile QNode head;
// 队列尾节点
transient volatile QNode tail;
// 节点被取消但没有从队列中移除
transient volatile QNode cleanMe;

核心方法

  • advanceHead(QNode h, QNode nh):更新头节点
  • advanceTail(QNode t, QNode nt):更新尾节点
  • casCleanMe(QNode cmp, QNode val):更新 cleanMe 节点
  • awaitFulfill(QNode s, E e, boolean timed, long nanos):等待fulfill操作
  • clean(QNode pred, QNode s):清空cleanMe节点
  • transfer(E e, boolean timed, long nanos): 主要处理逻辑
    void advanceHead(QNode h, QNode nh) {
        if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
            h.next = h; // forget old next
    }
    
    void advanceTail(QNode t, QNode nt) {
        if (tail == t)
            UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
    }
    
    boolean casCleanMe(QNode cmp, QNode val) {
        return cleanMe == cmp && UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
    }
    
    Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
        // 和 TransferStack.awaitFulfill 方法的逻辑一样,因此就不显示方法的逻辑啦
    }
    
    void clean(QNode pred, QNode s) {
        s.waiter = null; // forget thread
        while (pred.next == s) { // Return early if already unlinked
            QNode h = head;
            QNode hn = h.next;   // Absorb cancelled first node as head
            if (hn != null && hn.isCancelled()) {
                advanceHead(h, hn);
                continue;
            }
            QNode t = tail;      // Ensure consistent read for tail
            if (t == h)
                return;
            QNode tn = t.next;
            // 判断现在的t是不是末尾节点,可能其他线程插入了内容导致不是最后的节点。
            if (t != tail)
                continue;
            // 如果不是最后节点的话将其现在t.next节点作为tail尾节点。
            if (tn != null) {
                advanceTail(t, tn);
                continue;
            }
            // 如果当前节点不是尾节点进入到这里面。
            if (s != t) {        // If not tail, try to unsplice
                // 获取当前节点(被取消的节点)的下一个节点。
                QNode sn = s.next;
                // 修改上一个节点的next(下一个)元素为下下个节点。
                if (sn == s || pred.casNext(s, sn))
                    return;
            }
            QNode dp = cleanMe;
            // 尝试清除上一个标记为清除的节点
            if (dp != null) {    // Try unlinking previous cancelled node
                //1.获取要被清除的节点
                QNode d = dp.next;
                QNode dn;
                if (d == null ||               // d is gone or
                        d == dp ||                 // d is off list or
                        !d.isCancelled() ||        // d not cancelled or
                        (d != t &&                 // d not tail and
                                (dn = d.next) != null &&  //   has successor
                                dn != d &&                //   that is on list
                                dp.casNext(d, dn)))       // d unspliced
                    casCleanMe(dp, null);
                if (dp == pred)
                    return;      // s is already saved node
            } else if (casCleanMe(null, pred))
                return;          // Postpone cleaning s
        }
    }   
    
    E transfer(E e, boolean timed, long nanos) {
        QNode s = null; // constructed/reused as needed
        // 标识此次操作是存数据(put)还是取数据(take)
        boolean isData = (e != null);

        // 自旋匹配节点
        for (;;) {
            QNode t = tail;
            QNode h = head;
            // 如果头节点或尾节点为空继续自旋(在TransferQueue初始化的时候已经赋值头尾结点)
            if (t == null || h == null)         // saw uninitialized value
                continue;                       // spin

            // h == t 说明头尾结点相同,是空队列
            // t.isData == isData 说明尾节点与当前操作一样
            if (h == t || t.isData == isData) { // empty or same-mode
                QNode tn = t.next;
                // 如果临时变量 t 不等于尾节点,说明有其它线程改变了尾节点,则重新自旋匹配节点
                if (t != tail)                  // inconsistent read
                    continue;
                // 如果尾节点之后的节点值不为空,说明也是有其它线程改变了尾节点,将tn节点赋值给尾节点
                if (tn != null) {               // lagging tail
                    advanceTail(t, tn);
                    continue;
                }
                //超时直接返回 null
                if (timed && nanos <= 0)
                    return null;
                // 创建node节点
                if (s == null)
                    s = new QNode(e, isData);
                // 将新创建的node节点添加到队列尾部,如果失败则重新自旋
                if (!t.casNext(null, s))
                    continue;
                
                // 更新新创建节点为尾节点
                advanceTail(t, s);            
                // 调用 awaitFulfill 方法自旋匹配等待节点
                Object x = awaitFulfill(s, e, timed, nanos);
                // 如果返回当前节点,则说明节点由于被取消、超时、中断导致匹配失败
                if (x == s) {                   // wait was cancelled
                    // 清除当前等待匹配节点
                    clean(t, s);
                    return null;
                }

                // 判断节点是否已从队列离开
                if (!s.isOffList()) {           // not already unlinked
                    // 尝试将s节点设置为head,移出t
                    advanceHead(t, s);          // unlink if head
                    if (x != null)              // and forget fields
                        s.item = s;
                    // 释放 s 节点当前的等待线程
                    s.waiter = null;
                }
                // 返回节点值(put返回put操作的值,take返回匹配到的节点值)
                return (x != null) ? (E)x : e;
            } else {// 队列不为空,并且当前操作与尾节点的操作不一致。所以当前操作与尾节点的操作是互相匹配的
                QNode m = h.next;               // node to fulfill
                if (t != tail || m == null || h != head)
                    continue;                   // inconsistent read

                Object x = m.item;
                // isData == (x != null):判断isData与x的模式是否相同,相同表示已经完成匹配,继续自旋
                // x == m :m节点被取消了
                // !m.casItem(x, e):如果尝试将数据e设置到m上失败
                if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // lost CAS
                    // 将m设置为头结点,h出列,然后重试
                    advanceHead(h, m);          // dequeue and retry
                    continue;
                }

                // 成功匹配了,节点m 设置为头结点,h出列
                advanceHead(h, m);              // successfully fulfilled
                // 唤醒节点 m 的等待线程
                LockSupport.unpark(m.waiter);
                // 返回节点值(put返回put操作的值,take返回匹配到的节点值)
                return (x != null) ? (E)x : e;
            }
        }
    }        

OMG!源码又是那么长。但其实也不是很难,知道一个规律就行:由于是使用队列作为公平策略,所以在put的时候会在队列尾部添加数据,而take的时候会从队列尾部向队列头部方向寻找第一个被阻塞的线程,这样就可以保证公平的、按顺序的释放被阻塞的线程。

先简单的总结一下核心流程(TransferQueue.trasnfer方法):
1.获取当前操作是存数据还是取数据
2.自旋寻找匹配的节点(put操作匹配take操作、take操作匹配put操作)
3.如果头节点或尾节点为空,则继续执行第2步
4.如果是空队列或尾节点和当前操作一样,执行第5步,否则执行第13步
5.如果尾节点被其它线程更改,重新执行第2步,否则执行第6步
6.如果尾节点的后继节点不为空,说明有其它线程更改,则设置后继节点为尾节点,并重新执行第2步;否则执行第7步
7.如果超时直接返回null,否则往下执行第8步
8.为当前操作创建节点并添加到队列尾部,如果添加成功往下执行第9步,否则执行第2步
9.自旋匹配等待节点,当返回节点与当前节点不一样,说明节点匹配成功,执行第10步;否则说明由于被取消、超时、中断导致匹配失败,则清除当前节点并返回null
10.如果节点已从队列离开,执行第11步,否则执行第12步
11.返回节点值(put返回put操作的值,take返回匹配到的节点值)
12.将节点从队列中移除,并重新设置队列头节点后执行第11步
13.执行到这一步,说明当前操作与尾节点的操作是互相匹配;那么如果队列头节点是否匹配完成或队列头节点被取消,又或者cas更新头节点操作失败,则执行第14步,否则执行第15步
14.重新设置头节点并执行第2步
15.执行到这一步代表匹配成功,重新设置队列的头节点,并唤醒头节点的等待线程,最后返回节点值(put返回put操作的值,take返回匹配到的节点值)

还是依照个人习惯,喜欢通过画图分析一下源码流程!


总结:

  • SynchronousQueue 是一个没有队列大小的概念,所有的操作都必须与其匹配的节点共同入队出队(公平模式)或入栈出栈(非公平模式)
  • SynchronousQueue 是轻量级的阻塞队列。因为SynchronousQueue是没有使用到锁,都是通过CAS方法保证线程安全

其实也不难发现,SynchronousQueue 的缺点也是十分明显。如果同一个模式的节点多的话,就会一直阻塞,这是会损耗性能,所以需要根据实际业务场景使用。

最后希望读者们看源码的时候,亲自debug,这样才会加深源码的理解,读任何文章都只是辅助,自己真正理解才是学会东西。

如果觉得源码剖析不错的话,麻烦点个赞哈!对于文章有哪里不清楚或者有误的地方,欢迎在评论区留言~

参考资料:

https://www.cnblogs.com/dwlsxj/p/Thread.html
慕课网:面试官系统精讲Java源码及大厂真题: https://www.imooc.com/read/47/article/862

相关文章

网友评论

    本文标题:J.U.C 阻塞队列源码剖析系列(四)之 Synchronous

    本文链接:https://www.haomeiwen.com/subject/gfkbqhtx.html