美文网首页Java 杂谈Spring-Boot马士兵
【并发编程系列10】阻塞队列之SynchronousQueue,

【并发编程系列10】阻塞队列之SynchronousQueue,

作者: 刀哥说Java | 来源:发表于2020-10-14 12:22 被阅读0次

    前言

    前面我们介绍了ArrayBlockingQueue,LinkedBlockingQueue,LinkedBlockingDeque和 PriorityBlockingQueue,DelayQueue五种阻塞队列,这一次就继续介绍Java中提供的7种阻塞队列中的最后两种:SynchronousQueue和LinkedTransferQueue。

    双队列

    双队列是一个节点可以表示数据或者请求的队列。即一个存在的节点可能表示put一个元素进去也可能是take()一个元素出来。

    本文中叙述的两种队列SynchronousQueue和LinkedTransferQueue均是利用了双队列的特性:

    • put(E)操作时,队列中的节点代表一个元素,也就说表示的是数据
    • take()操作时,如果队列中无元素,会放一个null的项到队列中占位,这时候表示的是一个请求,而不是一个数据。

    SynchronousQueue

    对于一个SynchronousQueue来说,每个插入操作都必须对应等待另一个线程的删除操作,反之亦然。其没有任何内部容量,甚至连1都没有。所以不能执行peek()之类的方法获取一个元素,因为一个元素只有在尝试移除的时候才会出现,也不能使用任何方法去插入一个元素,除非另一个线程正好在尝试移除它,也不能去迭代,因为没有任何东西可以迭代。


    【并发编程系列10】阻塞队列之SynchronousQueue,LinkedTransferQueue原理分析 在这里插入图片描述

    SynchronousQueue的head是第一个排队插入线程试图添加到队列的元素,如果没有这样的排队线程那么就没有元素可以被移除,所以执行poll()的时候就会返回null。

    SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。

    SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue,因为其内部是通过CAS和自旋来实现并发,没有通过锁来控制,减少了锁的开销。

    先来看看类图:


    在这里插入图片描述

    SynchronousQueue提供了两个构造器,默认构造器是非公平策略,可以通过第二个构造器传入参数true来构造一个公平策略:


    在这里插入图片描述

    可以看到,公平策略构造的是一个TransferQueue,而非公平构造的是一个TransferStack。

    公平策略(TransferQueue)

    TransferQueue是SynchronousQueue的一个内部类,构造器如下:


    在这里插入图片描述

    TransferQueue内部是通过QNode节点来维持一个队列,QNode是TransferQueue的一个内部类:


    在这里插入图片描述

    put(E)方法就是往队列里面添加一个元素,并需要等待另一个线程来take(),如果没有线程来取走,则put(E)线程会阻塞,反之也一样。put(E)和take()方法必须要要成对出现,否则就会一直阻塞。


    在这里插入图片描述 在这里插入图片描述

    从上面代码可以看到,put(E)和take都是调用的同一个方法transfer,通过不同的参数来区分。

    E transfer(E e, boolean timed, long nanos) {
                QNode s = null; // constructed/reused as needed
                boolean isData = (e != null);//e==null表示当前是消费者(take操作),e!=null表示当前是生产者(put操作)
    
                for (;;) {
                    QNode t = tail;
                    QNode h = head;
                    //表示还没有初始化(初始化之后不可能为空),继续自旋
                    if (t == null || h == null)         // saw uninitialized value
                        continue;                       // spin
    
                    //head=tail或者tail节点的模式和当前操作模式相同
                    if (h == t || t.isData == isData) { // empty or same-mode
                        QNode tn = t.next;//获得当前tail节点的next节点
                        //t和tail不一致,说明有其他线程操作过,继续自旋
                        if (t != tail)    // inconsistent read
                            continue;
                        //tail.next正常是null,不为null说明其他线程新增了元素到tail.next
                        if (tn != null) {         // lagging tail
                            advanceTail(t, tn);//尝试帮助其他线程将tail.next设置为tail,然后继续自旋
                            continue;
                        }
                        //如果当前调用的是超时方法,且到时间了,直接返回null
                        if (timed && nanos <= 0)        // can't wait
                            return null;
                        if (s == null)//第一次进来肯定是null
                            //如果是put进来,e不为null,如果是take进来,e==null,也就是初始化了一个空节点
                            s = new QNode(e, isData);//初始化当前元素为QNode
                        //原tail节点为next
                        if (!t.casNext(null, s))        // failed to link in
                            continue;//替换tail节点的next节点为当前节点,失败则继续
    
                        //走到这里说明上面的CAS成功,需要将s设置为新的tail节点
                        //这里是一定会成功的,因为上面的cas完成之后,其他线程只能一直自旋等待
                        advanceTail(t, s);              // swing tail and wait
                        //节点添加进去之后,阻塞等待,直接消费者线程来消费
                        Object x = awaitFulfill(s, e, timed, nanos);
                        //x==s表示已经被取消
                        if (x == s) { // wait was cancelled
                            clean(t, s);//清除
                            return null;//直接返回null
                        }
    
                        if (!s.isOffList()) {           // not already unlinked
                            advanceHead(t, s);          // unlink if head
                            if (x != null)              // and forget fields
                                s.item = s;
                            s.waiter = null;
                        }
                        return (x != null) ? (E)x : e;
                    } else {//互补模式          // complementary-mode
                        QNode m = h.next;//拿到tail.next       // node to fulfill
                        //出现读不一致的情况,继续自旋
                        if (t != tail || m == null || h != head)
                            continue;    // inconsistent read
    
                        Object x = m.item;//拿到节点中的item
                        //走到这里时的isData一定等于false,如果false=(x!=null)就说明x==null,元素已经被其他元素拿走了,继续自旋
                        if (isData == (x != null) ||    // m already fulfilled
                            x == m ||//说明已经被取消了   // m cancelled
                            !m.casItem(x, e)) {   // lost CAS
                            //上面任意一个条件失败都说明当前已被其他线程取走了元素,所以帮助那个线程cas替换一下头节点
                            advanceHead(h, m);          // dequeue and retry
                            continue;
                        }
    
                        //走到这说明元素是被当前线程取走了,cas替换一下头节点
                        advanceHead(h, m);              // successfully fulfilled
                        LockSupport.unpark(m.waiter);//唤醒阻塞线程继续传递元素
                        //x!=null说明是被当前线程获得了元素,那么返回x,否则就是被其他线程拿走了,返回e
                        return (x != null) ? (E)x : e;
                    }
                }
            }
    
    

    这个方法的看起来很长,其实是因为SynchronousQueue内部不通过锁来控制并发,而是通过CAS和自旋来控制并发,所以会有很多的if判断。
    根据上面的方法,主要可以分为两种场景:一种是先put(E)再take(),另一种是先take()再put(E)。

    初始化

    初始化的时候调用上面的构造器TransferQueue(),默认得到一个哨兵节点,里面的元素是空的,这个isData就是说这个节点是不是一个有效数据,只有item!=null才表示一个有效数据:


    在这里插入图片描述

    先put(E)再take()

    假如先put(E),因为没有对用的take()操作,线程会被阻塞直到有take()出现。

    线程t1过来put(1)

    这时候h==t走的是第一个if分支,至少在第2次自旋的时候将元素1包装成节点QNode之后假如队列,并会进入方法awaitFulfill阻塞等待传递元素:

     Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
                /* Same idea as TransferStack.awaitFulfill */
                final long deadline = timed ? System.nanoTime() + nanos : 0L;//获得超时时间
                Thread w = Thread.currentThread();//当前执行的线程
                int spins = ((head.next == s) ?
                             (timed ? maxTimedSpins : maxUntimedSpins) : 0);//获得自旋次数
                for (;;) {
                    if (w.isInterrupted())//如果线程被中断了
                        s.tryCancel(e);//尝试取消,注意这里取消后,会将s中的item和s替换。即s.item变成了s.QNode
                    Object x = s.item;//拿到当前节点的item
                    if (x != e)//不相等说明被取消或者值已经被取走或者已经有值放进来
                        return x;//直接返回自己
                    if (timed) {//如果当前方法是计时方法
                        nanos = deadline - System.nanoTime();//获取剩余时间
                        if (nanos <= 0L) {//如果已经到时间了
                            s.tryCancel(e);//尝试取消
                            continue;
                        }
                    }
                    if (spins > 0)
                        --spins;//自旋次数>0,则减1后继续自旋
                    else if (s.waiter == null)
                        s.waiter = w;
                    else if (!timed)//非计时方法
                        LockSupport.park(this);//如果当前方法不是带超时时间的,则直接挂起直到唤醒
                    else if (nanos > spinForTimeoutThreshold)//如果到了自旋次数,且还没到指定的超时时间,就挂起指定的剩余时间
                        LockSupport.parkNanos(this, nanos);
                }
            }
    
    

    因为调用的put(1)没有带超时时间,所以会被LockSupport.park(this)阻塞,这时候得到了如下队列:


    在这里插入图片描述

    主要经过如下5个步骤:

    • 1、将元素初始化成为一个QNode。
    • 2、将tail.next指向当前新构建的QNode(CAS操作)。
    • 3、将新构建的QNode设置为tail节点(CAS操作)。
    • 4、将当前QNode节点中的waiter属性设置为当前线程(awaitFulfill方法)。
    • 5、挂起前线程(awaitFulfill方法)。

    线程t2过来put(2)

    这时候h和t不相等了,但是isData都为true,所以t2过来的流程一样,还是会走if分支,然后会继续将元素2添加到队尾,得到如下队列:


    在这里插入图片描述

    注意,QNode还有一个属性waiter,是用来记录当前节点是哪个线程放进来的,因为后面当节点被take()走了之后,需要知道当前节点是由哪个线程放进来的,然后去唤醒对应线程。

    我们可以看到,SynchronousQueue号称是不存储元素的,但是不存储元素并不代表它内部没有队列,内部还是会有一个队列的,只不过每个线程过来put(E)的时候,如果没有对应的take()来匹配,那么线程就一直卡住了,也就是元素不会一直停留在队列,而是会等待被转移(transfer)。

    线程t3过来take()

    这时候来了一个线程t3过来take(),这时候因为h!=t,且take()的时候isData=false,和tail节点中的isData不一致了,会走else分支。

    因为head节点是一个哨兵节点(空元素),而这又是公平模式,也就是必须满足FIFO,所以会从head.next开始转移元素。
    最终得到如下最新的队列:


    在这里插入图片描述

    主要经过如下步骤:

    • 1、将head.next中的item设置为null(CAS操作)。
    • 2、将head.next设置为新的head节点(advanceHead方法)。
    • 3、将原head节点的next指向自己(advanceHead方法)。
    • 4、通过原节点的waiter属性,将原先线程唤醒。
    • 5、返回获取到的元素。

    注意,这里将元素1取走之后,原先的线程t1被唤醒,唤醒之后会在方法awaitFulfill继续自旋,这时候执行到if (x != e)条件的时候就会成立了,所以会返回x。然后回到transfer方法,将元素返回,t1线程结束。

    线程t4过来take()

    这时候的步骤和上面也是一样,最终得到如下队列:


    在这里插入图片描述

    回到了原始的初始化状态,只保留了一个哨兵节点。

    先take()再put(E)

    假如先take()进来,步骤和上面put(E)基本一致,唯一的区别就是take()会先抢占一个队列的位置,将一个item==null的节点加入队列。

    线程t1过来take()

    线程t1过来take()因为一开始h==t,还是会走的if逻辑,最终会得到如下队列:


    在这里插入图片描述

    主要经过如下步骤:

    • 1、将一个null元素初始化成为一个QNode。
    • 2、将tail.next指向当前新构建的QNode(CAS操作)。
    • 3、将新构建的QNode设置为tail节点(CAS操作)。
    • 4、将当前QNode节点中的waiter属性设置为当前线程(awaitFulfill方法)。
    • 5、挂起前线程(awaitFulfill方法)。

    除了第1个步骤,其他都和首先进来put(E)的步骤一样

    线程t2过来put(1)

    这时候因为if条件不满足,会走else分支,先将元素1赋值到之前被线程t1占的位置,最终得到如下队列:


    在这里插入图片描述

    主要经过如下步骤:

    • 1、将head.next中的item设置为1(CAS操作)。
    • 2、将head.next设置为新的head节点(advanceHead方法)。
    • 3、将原head节点的next指向自己(advanceHead方法)。
    • 4、通过原节点的waiter属性,将原先线程唤醒。
    • 5、返回成功put进去到的元素。

    接下来将原先的t1线程唤醒,t1线程唤醒之后会继续将节点中的item设置为自己,然后返回拿到的元素:


    在这里插入图片描述

    非公平策略(TransferStack)

    非公平策略是通过其内部类TransferStack来实现的,思想基本和TransferQueue一致,唯一的区别就是TransferStack是非公平的,也就是LIFO模式,在这里就不详细介绍了。

    LinkedTransferQueue

    LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。LinkedTransferQueue和SynchronousQueue中的公平策略使用的算法是一样的,唯一的区别是SynchronousQueue内部不会存储哪怕1个元素,而LinkedTransferQueue内部会存储元素。

    松弛度

    正常队列中,当移除一个元素的时候,就会同步移动head和tail节点的指针,为了最大程序的保证性能LinkedTransferQueue不会实时去更新head和tail的指针,而是引入了一个松弛度的概念。

    松弛度指的是head值与第一个不匹配节点之间的目标最大距离,反之对tail也是如此。这个值这一般为1-3(根据经验得出),在LinkedTransferQueue中松弛度定义为2。因为如果太大了会增加缓存丢失的成本或者长遍历链的风险,而较小的话就会增加CAS的开销。

    LinkedTransferQueue原理分析

    LinkedTransferQueue内部也是通过CAS和自旋来实现并发控制,所以也是一种效率比较高的队列。

    下面还是先来看看LinkedTransferQueue类图:


    在这里插入图片描述

    相比较于其他阻塞队列,多了一个TransferQueue接口,我们先来看看TransferQueue接口中核心的几个方法:

    方法 功能
    tryTransfer(e) 传递一个元素给正在等待的消费者,如果没有正在等待的消费者,则返回false
    transfer(e) 传递一个元素给正在等待的消费者,如果没有正在等待的消费者,则阻塞等待
    tryTransfer(e,time,uint) 传递一个元素给正在等待的消费者,如果没有正在等待的消费者,则阻塞等待指定时间,过了超时时间之后,仍没有消费者,则直接返回false
    hasWaitingConsumer() 至少有一个消费者正在等待接收元素则返回true
    getWaitingConsumerCount() 返回正在等待的消费者数量,返回的值是一个近似值,因为消费者可能很快就完成消费或者放弃等待

    初始化

    初始化的时候什么也不做,并不会在内部构造一个初始节点,addAll()实际上也是循环调用了add(E)方法:


    在这里插入图片描述 在这里插入图片描述

    然后我们再看看其他方法,add,put,take,offer等,都是调用了一个共同的方法xfer,只不过通过不同参数来控制。


    在这里插入图片描述

    xfer方法

    private E xfer(E e, boolean haveData, int how, long nanos) {
            if (haveData && (e == null))//如果当前是put操作,且e==null,则抛出异常
                throw new NullPointerException();
            Node s = null;                        // the node to append, if needed
    
            retry:
            for (;;) {                            // restart on append race
                //从head开始循环匹配
                for (Node h = head, p = h; p != null;) { // find & match first node
                    boolean isData = p.isData;
                    Object item = p.item;
                    //如果元素还没被匹配过,也就是还在队列里
                    if (item != p && (item != null) == isData) { // unmatched
                        //如果相等,就说明是两个相同操作,直接不用执行后面了,互补操作才能往后走
                        if (isData == haveData)   // can't match
                            break;
                        //将p中的item替换成e
                        if (p.casItem(item, e)) { // match
                            //假如有一个队列是有元素的,第一次被take()的时候,q==h是进不了for循环的,所以会直接返回
                            //第2次进来会先匹配一次head节点,匹配不上,在匹配第2个节点,这就相当于松弛度=2了,所以
                            //这时候是满足条件的,可以进入for循环,
                            for (Node q = p; q != h;) {
                                Node n = q.next;  // update by 2 unless singleton
                                //移动head指针
                                if (head == h && casHead(h, n == null ? q : n)) {
                                    h.forgetNext();
                                    break;
                                }                 // advance and retry
                                if ((h = head)   == null ||
                                    (q = h.next) == null || !q.isMatched())
                                    break;        // unless slack < 2
                            }
                            LockSupport.unpark(p.waiter);
                            return LinkedTransferQueue.<E>cast(item);
                        }
                    }
                    Node n = p.next;
                    p = (p != n) ? n : (h = head); // Use head if p offlist
                }
    
                if (how != NOW) {                 // No matches available
                    if (s == null)
                        s = new Node(e, haveData);//初始化节点
                    Node pred = tryAppend(s, haveData);
                    if (pred == null)
                        continue retry;           // lost race vs opposite mode
                    if (how != ASYNC)//take()或者带超时时间的方法会走这里
                        return awaitMatch(s, pred, e, (how == TIMED), nanos);
                }
                return e; // not waiting
            }
        }
    
    

    这个方法也要分为两种方式,先put(E)再take()和先take()再put(E)。

    先put(E)再take()

    put(E)操作不会进行阻塞,成功之后直接返回。

    线程t1过来put(1)

    因为head和tail都是null(一开始不会初始化队列),所以上面的第2个for循环是进不去的,会走到后面这里初始化node,并加入到队列中,最终得到如下队列:


    在这里插入图片描述

    可以看到,这时候tail节点并没有被初始化,这是因为利用了松弛度,松弛度要等于2才会移动tail指针(这是一种性能的优化),我们看看tryAppend方法(主要是看红框部分):


    在这里插入图片描述

    主要分为以下步骤:

    • 1、初始化Node节点
    • 2、将Node节点设为head节点

    线程t2过来put(2)

    线程t2再进来put的时候,因为满足松弛度=2了,这时候就会移动tail指针,所以会得到如下队列:


    在这里插入图片描述

    主要分为以下步骤:

    • 1、初始化Node节点
    • 2、将Node节点设为head.next节点
    • 3、达到松弛度,将新Node设置为tail节点
      后面如果再有元素过来添加,到第3个元素的时候,tail也是不会移动的,要第3个元素才会移动tail,这里就不再继续举例了。

    线程t3过来take()

    这时候来take会将head节点的元素设为null,然后直接返回,得到如下队列:


    在这里插入图片描述

    这时候因为松弛度还没达到2,不会移动head指针。
    主要经过如下步骤:

    • 1、将head节点中的item设置为null。
    • 2、返回获取到的item,。

    线程t4过来take()

    这时候首先会循环head节点,发现不匹配,然后循环到head.next,得到如下队列:


    在这里插入图片描述

    这里因为松弛度达到2,所以会移动head指针。
    主要经过如下步骤:

    • 1、循环head节点,发现不匹配。
    • 2、循环head.next,匹配上,将head.next中item设置为null。
    • 3、移动head指针,指向head.next。

    先take()再put(E)

    先take()的时候,因为队列中还没有元素,所以会先自旋,自旋一定次数之后就阻塞,直到有元素put(E)进来然后唤醒线程。

    线程t1过来take()

    和上面第一次put(E)一样,上面的第2个for循环进不去,会走到后面这里初始化node,并加入到队列中,最终得到如下队列:


    在这里插入图片描述

    主要经过如下步骤:

    • 1、初始化一个item=null的Node节点。
    • 2、将Node节点设置为head节点。
    • 3、自旋一定次数(awaitMatch方法)。
    • 4、达到自旋次数后还没有线程过来take(),执行park挂起线程(awaitMatch方法)。

    注意,上面的Node中也有一个waier属性用来存储线程信息,后面唤醒需要获取waiter中的线程

    线程t2过来take()

    这时候因为松弛度达到2了,会移动tail指针,最终得到如下队列:


    在这里插入图片描述

    线程t3过来put(1)

    这时候因为前面take()的时候,队列中已经有了item=null的元素(node!=null),所以会直接进入第2个for循环,然后将值替换进head节点中的item,最后唤醒t1线程,t1线程被唤醒之后,会继续自旋,然后返回拿到t3线程put进来的元素:


    在这里插入图片描述

    最终得到如下队列:


    在这里插入图片描述

    主要经过如下步骤:

    • 1、将head节点中的item替换成当前元素1。
    • 2、唤醒t1线程。
    • 3、t1线程将head节点中的item指向当前自己的Node,并将waiter设置为null。
    • 4、t1拿到put进来的元素1返回。

    线程t4过来put(2)

    这时候主要流程和上面一样,但是因为松弛度达到了2,所以会移动head节点指针,最终得到如下队列:


    在这里插入图片描述

    主要步骤为:

    • 1、循环head节点,发现head已经被匹配过了(item=p)。
    • 2、继续循环head.next,这时候发现可以匹配上,将head.next中的item设置为2。
    • 3、这时候因为松弛度达到2,会将head节点后移。
    • 4、将旧head节点的next指向当前自己的节点(forgetNext方法)。
    • 5、唤醒线程t2。
    • 6、t2线程将新head节点中的item指向当前自己的Node,并将waiter设置为null。
    • 7、线程t2拿到元素2返回

    总结

    本文主要讲述了SynchronousQueue,LinkedTransferQueue两种不加锁的传递型队列,因为不加锁,所以期性能会高于其他五种加锁的队列

    下一篇,将为大家介绍一下Java中提供的12个原子类操作。

    相关文章

      网友评论

        本文标题:【并发编程系列10】阻塞队列之SynchronousQueue,

        本文链接:https://www.haomeiwen.com/subject/jkaipktx.html