经典的生产者-消费者模式,操作流程是这样的:
有多个生产者,可以并发生产产品,把产品置入队列中,如果队列满了,生产者就会阻塞;
有多个消费者,并发从队列中获取产品,如果队列空了,消费者就会阻塞;

SynchronousQueue 也是一个队列来的,但它的特别之处在于它内部没有容器,一个生产线程,当它生产产品(即put的时候),如果当前没有人想要消费产品(即当前没有线程执行take),此生产线程必须阻塞,等待一个消费线程调用take操作,take操作将会唤醒该生产线程,同时消费线程会获取生产线程的产品(即数据传递)
package com.concurrent;
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
final SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
Thread putThread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("put thread start");
try {
queue.put(1);
} catch (InterruptedException e) {
}
System.out.println("put thread end");
}
});
Thread takeThread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("take thread start");
try {
System.out.println("take from putThread: " + queue.take());
} catch (InterruptedException e) {
}
System.out.println("take thread end");
}
});
putThread.start();
Thread.sleep(1000);
takeThread.start();
}
}
一种输出结果如下:
put thread start
take thread start
take from putThread: 1
put thread end
take thread end
从结果可以看出,put线程执行queue.put(1) 后就被阻塞了,只有take线程进行了消费,put线程才可以返回。可以认为这是一种线程与线程间一对一传递消息的模型。
作为BlockingQueue中的一员,SynchronousQueue与其他BlockingQueue有着不同特性:
- SynchronousQueue没有容量。与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
- 因为没有容量,所以对应 peek, contains, clear, isEmpty … 等方法其实是无效的。例如clear是不执行任何操作的,contains始终返回false,peek始终返回null。
- SynchronousQueue分为公平和非公平,默认情况下采用非公平性访问策略,当然也可以通过构造函数来设置为公平性访问策略(为true即可)。
- 若使用 TransferQueue, 则队列中永远会存在一个 dummy node
SynchronousQueue实现原理
公平模式下的模型:
公平模式下,底层实现使用的是TransferQueue这个内部队列,它有一个head和tail指针,用于指向当前正在等待匹配的线程节点。
初始化时,TransferQueue的状态如下:

接着我们进行一些操作:
1、线程put1执行 put(1)操作,由于当前没有配对的消费线程,所以put1线程入队列,自旋一小会后睡眠等待,这时队列状态如下:

2、接着,线程put2执行了put(2)操作,跟前面一样,put2线程入队列,自旋一小会后睡眠等待,这时队列状态如下:

- 这时候,来了一个线程take1,执行了 take操作,由于tail指向put2线程,put2线程跟take1线程配对了(一put一take),这时take1线程不需要入队,但是请注意了,这时候,要唤醒的线程并不是put2,而是put1。为何? 大家应该知道我们现在讲的是公平策略,所谓公平就是谁先入队了,谁就优先被唤醒,我们的例子明显是put1应该优先被唤醒。
公平策略总结下来就是:队尾匹配队头出队。
执行后put1线程被唤醒,take1线程的 take()方法返回了1(put1线程的数据),这样就实现了线程间的一对一通信,这时候内部状态如下:

-
最后,再来一个线程take2,执行take操作,这时候只有put2线程在等候,而且两个线程匹配上了,线程put2被唤醒,
take2线程take操作返回了2(线程put2的数据),这时候队列又回到了起点,如下所示:
image.png
以上便是公平模式下,SynchronousQueue的实现模型。总结下来就是:队尾匹配队头出队,先进先出,体现公平原则。
非公平模式下的模型:
我们还是使用跟公平模式下一样的操作流程,对比两种策略下有何不同。非公平模式底层的实现使用的是TransferStack,
一个栈,实现中用head指针指向栈顶,接着我们看看它的实现模型:
1、线程put1执行 put(1)操作,由于当前没有配对的消费线程,所以put1线程入栈,自旋一小会后睡眠等待,这时栈状态如下:

2、接着,线程put2再次执行了put(2)操作,跟前面一样,put2线程入栈,自旋一小会后睡眠等待,这时栈状态如下:

3、这时候,来了一个线程take1,执行了take操作,这时候发现栈顶为put2线程,匹配成功,但是实现会先把take1线程入栈,然后take1线程循环执行匹配put2线程逻辑,一旦发现没有并发冲突,就会把栈顶指针直接指向 put1线程

4、最后,再来一个线程take2,执行take操作,这跟步骤3的逻辑基本是一致的,take2线程入栈,然后在循环中匹配put1线程,最终全部匹配完毕,栈变为空,恢复初始状态,如下图所示:

可以从上面流程看出,虽然put1线程先入栈了,但是却是后匹配,这就是非公平的由来。
下面我们分析一下cachedThreadPool的使用流程,通过这个过程我们来了解synchronousQueue的使用方式:先看代码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//1
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
由于ThreadPoolExecutor内部实现任务提交的时候调用的是工作队列(BlockingQueue接口的实现类)的非阻塞式入队列方法(offer方法),因此,在使用SynchronousQueue作为工作队列的前提下,客户端代码向线程池提交任务时,而线程池中又没有空闲的线程能够从SynchronousQueue队列实例中取一个任务,那么相应的offer方法调用就会失败(即任务没有被存入工作队列)。此时,ThreadPoolExecutor会新建一个新的工作者线程用于对这个入队列失败的任务进行处理(假设此时线程池的大小还未达到其最大线程池大小)。
SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大.
所以,使用SynchronousQueue作为工作队列,工作队列本身并不限制待执行的任务的数量。但此时需要限定线程池的最大大小为一个合理的有限值,而不是Integer.MAX_VALUE,否则可能导致线程池中的工作者线程的数量一直增加到系统资源所无法承受为止。
使用SynchronousQueue的目的就是保证“对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务”。
源码解析
与其他BlockingQueue一样,SynchronousQueue同样继承AbstractQueue和实现BlockingQueue接口:
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
SynchronousQueue提供了两个构造函数:
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
// 通过 fair 值来决定公平性和非公平性
// 公平性使用TransferQueue,非公平性采用TransferStack
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
TransferQueue、TransferStack继承Transferer,Transferer为SynchronousQueue的内部类,它提供了一个方法transfer(),该方法定义了转移数据的规范,如下:
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}
transfer()方法主要用来完成转移数据的,如果e != null,相当于将一个数据交给消费者,如果e == null,则相当于从一个生产者接收一个消费者交出的数据。
SynchronousQueue采用队列TransferQueue来实现公平性策略,采用堆栈TransferStack来实现非公平性策略,他们两种都是通过链表实现的,其节点分别为QNode,SNode。TransferQueue和TransferStack在SynchronousQueue中扮演着非常重要的作用,SynchronousQueue的put、take操作都是委托这两个类来实现的。
TransferQueue
TransferQueue是实现公平性策略的核心类,其节点为QNode,其定义如下:
/**
* 这是一个非常典型的 queue , 它有如下的特点
* 1. 整个队列有 head, tail 两个节点
* 2. 队列初始化时会有个 dummy 节点
* 3. 这个队列的头节点是个 dummy 节点/ 或 哨兵节点, 所以操作的总是队列中的第二个节点(AQS的设计中也是这也)
*/
/** 头节点 */
transient volatile QNode head;
/** 尾节点 */
transient volatile QNode tail;
/**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was last inserted node
* when it was cancelled
*/
/**
* 对应 中断或超时的 前继节点,这个节点存在的意义是标记, 它的下个节点要删除
* 何时使用:
* 当你要删除 节点 node, 若节点 node 是队列的末尾, 则开始用这个节点,
* 为什么呢?
* 大家知道 删除一个节点 直接 A.CASNext(B, B.next) 就可以,但是当 节点 B 是整个队列中的末尾元素时,
* 一个线程删除节点B, 一个线程在节点B之后插入节点 这样操作容易致使插入的节点丢失, 这个cleanMe很像
* ConcurrentSkipListMap 中的 删除添加的 marker 节点, 他们都是起着相同的作用
*/
transient volatile QNode cleanMe;
TransferQueue(){
/**
* 构造一个 dummy node, 而整个 queue 中永远会存在这样一个 dummy node
* dummy node 的存在使得 代码中不存在复杂的 if 条件判断
*/
QNode h = new QNode(null, false);
head = h;
tail = h;
}
/**
* 推进 head 节点,将 老节点的 oldNode.next = this, help gc,
* 这种和 ConcurrentLinkedQueue 中一样
*/
void advanceHead(QNode h, QNode nh){
if(h == head && unsafe.compareAndSwapObject(this, headOffset, h, nh)){
h.next = h; // forget old next help gc
}
}
/** 更新新的 tail 节点 */
void advanceTail(QNode t, QNode nt){
if(tail == t){
unsafe.compareAndSwapObject(this, tailOffset, t, nt);
}
}
/** CAS 设置 cleamMe 节点 */
boolean casCleanMe(QNode cmp, QNode val){
return cleanMe == cmp && unsafe.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}
在TransferQueue中定义了QNode类来表示队列中的节点,QNode节点定义如下:
static final class QNode {
// next 域
volatile QNode next;
// item数据项
volatile Object item;
// 等待线程,用于park/unpark
volatile Thread waiter; // to control park/unpark
//模式,表示当前是数据还是请求,只有当匹配的模式相匹配时才会交换
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
...
isData,该属性在进行数据交换起到关键性作用,两个线程进行数据交换的时候,必须要两者的模式保持一致。
公平模式 TransferQueue transfer方法
-
若队列为空 / 队列中的尾节点和自己的 类型相同, 则添加 node
到队列中, 直到 timeout/interrupt/其他线程和这个线程匹配
timeout/interrupt awaitFulfill方法返回的是 node 本身
匹配成功的话, 要么返回 null (producer返回的), 或正真的传递值 (consumer 返回的) -
队列不为空, 且队列的 head.next 节点是当前节点匹配的节点,
进行数据的传递匹配, 并且通过 advanceHead 方法帮助 先前 block 的节点 dequeue
/**
* Puts or takes an item
* 主方法
*
* @param e if non-null, the item to be handed to a consumer;
* if null, requests that transfer return an item
* offered by producer.
* @param timed if this operation should timeout
* @param nanos the timeout, in nanosecond
* @return
*/
@Override
E transfer(E e, boolean timed, long nanos) {
/**
* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
*
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to fulfill by CAS'ing
* item field of waiting node and dequeuing it, and then
* returning matching item.
*
* In each case, along the way, check for gurading against
* seeing uninitialized head or tail value. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicity interspersed
*
* 这个 producer / consumer 的主方法, 主要分为两种情况
*
* 1. 若队列为空 / 队列中的尾节点和自己的 类型相同, 则添加 node
* 到队列中, 直到 timeout/interrupt/其他线程和这个线程匹配
* timeout/interrupt awaitFulfill方法返回的是 node 本身
* 匹配成功的话, 要么返回 null (producer返回的), 或正真的传递值 (consumer 返回的)
*
* 2. 队列不为空, 且队列的 head.next 节点是当前节点匹配的节点,
* 进行数据的传递匹配, 并且通过 advanceHead 方法帮助 先前 block 的节点 dequeue
*/
QNode s = null; // constrcuted/reused as needed
boolean isData = (e != null); // 1.判断 e != null 用于区分 producer 与 consumer
for(;;){
QNode t = tail;
QNode h = head;
if(t == null || h == null){ // 2. 数据未初始化, continue 重来
continue; // spin
}
if(h == t || t.isData == isData){ // 3. 队列为空, 或队列尾节点和自己相同 (注意这里是和尾节点比价, 下面进行匹配时是和 head.next 进行比较)
QNode tn = t.next;
if(t != tail){ // 4. tail 改变了, 重新再来
continue;
}
if(tn != null){ // 5. 其他线程添加了 tail.next, 所以帮助推进 tail
advanceTail(t, tn);
continue;
}
if(timed && nanos <= 0){ // 6. 调用的方法的 wait 类型的, 并且 超时了, 直接返回 null, 直接见 SynchronousQueue.poll() 方法,说明此 poll 的调用只有当前队列中正好有一个与之匹配的线程在等待被【匹配才有返回值
return null;
}
if(s == null){
s = new QNode(e, isData); // 7. 构建节点 QNode
}
if(!t.casNext(null, s)){ // 8. 将 新建的节点加入到 队列中
continue;
}
advanceTail(t, s); // 9. 帮助推进 tail 节点
Object x = awaitFulfill(s, e, timed, nanos); // 10. 调用awaitFulfill, 若节点是 head.next, 则进行一些自旋, 若不是的话, 直接 block, 知道有其他线程 与之匹配, 或它自己进行线程的中断
if(x == s){ // 11. 若 (x == s)节点s 对应额线程 wait 超时 或线程中断, 不然的话 x == null (s 是 producer) 或 是正真的传递值(s 是 consumer)
clean(t, s); // 12. 对接点 s 进行清除, 若 s 不是链表的最后一个节点, 则直接 CAS 进行 节点的删除, 若 s 是链表的最后一个节点, 则 要么清除以前的 cleamMe 节点(cleamMe != null), 然后将 s.prev 设置为 cleanMe 节点, 下次进行删除 或直接将 s.prev 设置为cleanMe
return null;
}
if(!s.isOffList()){ // 13. 节点 s 没有 offlist
advanceHead(t, s); // 14. 推进head 节点, 下次就调用 s.next 节点进行匹配(这里调用的是 advanceHead, 因为代码能执行到这边说明s已经是 head.next 节点了)
if(x != null){ // and forget fields
s.item = s;
}
s.waiter = null; // 15. 释放线程 ref
}
return (x != null) ? (E)x :e;
}else{ // 16. 进行线程的匹配操作, 匹配操作是从 head.next 开始匹配 (注意 队列刚开始构建时 有个 dummy node, 而且 head 节点永远是个 dummy node 这个和 AQS 中一样的)
QNode m = h.next; // 17. 获取 head.next 准备开始匹配
if(t != tail || m == null || h != head){
continue; // 18. 不一致读取, 有其他线程改变了队列的结构inconsistent read
}
/** producer 和 consumer 匹配操作
* 1. 获取 m的 item (注意这里的m是head的next节点
* 2. 判断 isData 与x的模式是否匹配, 只有produce与consumer才能配成一对
* 3. x == m 判断是否 节点m 是否已经进行取消了, 具体看(QNOde#tryCancel)
* 4. m.casItem 将producer与consumer的数据进行交换 (这里存在并发时可能cas操作失败的情况)
* 5. 若 cas操作成功则将h节点dequeue
*
* 疑惑: 为什么将h进行 dequeue, 而不是 m节点
* 答案: 因为每次进行配对时, 都是将 h 是个 dummy node, 正真的数据节点 是 head.next
*/
Object x = m.item;
if(isData == (x != null) || // 19. 两者的模式是否匹配 (因为并发环境下 有可能其他的线程强走了匹配的节点)
x == m || // 20. m 节点 线程中断或者 wait 超时了
!m.casItem(x, e) // 21. 进行 CAS 操作 更改等待线程的 item 值(等待的有可能是 concumer / producer)
){
advanceHead(h, m); // 22.推进 head 节点 重试 (尤其 21 操作失败)
continue;
}
advanceHead(h, m); // 23. producer consumer 交换数据成功, 推进 head 节点
LockSupport.unpark(m.waiter); // 24. 换线等待中的 m 节点, 而在 awaitFulfill 方法中 因为 item 改变了, 所以 x != e 成立, 返回
return (x != null) ? (E)x : e; // 25. 操作到这里若是 producer, 则 x != null, 返回 x, 若是consumer, 则 x == null,.返回 producer(其实就是 节点m) 的 e
}
}
}
我们梳理一下一般性的流程:
-
一开始整个queue为空, 线程直接封装成QNode, 通过 awaitFulfill 方法进入自旋等待状态, 除非超时或线程中断, 不然一直等待, 直到有线程与之匹配
-
下个再来的线程若isData与尾节点一样, 则进行第一步, 不然进行数据转移(步骤 21), 然后 unpark 等待的线程
-
等待的线程被唤醒, 从awaitFulfill方法返回, 最后将结果返回
公平模式 TransferQueue awaitFulfill
/**
* Spins/blocks until node s is fulfilled
*
* 主逻辑: 若节点是 head.next 则进行 spins 一会, 若不是, 则调用 LockSupport.park / parkNanos(), 直到其他的线程对其进行唤醒
*
* @param s the waiting node
* @param e the comparsion value for checking match
* @param timed true if timed wait
* @param nanos timeout value
* @return matched item, or s of cancelled
*/
Object awaitFulfill(QNode s, E e, boolean timed, long nanos){
final long deadline = timed ? System.nanoTime() + nanos : 0L;// 1. 计算 deadline 时间 (只有 timed 为true 时才有用)
Thread w = Thread.currentThread(); // 2. 获取当前的线程
int spins = ((head.next == s) ? // 3. 若当前节点是 head.next 时才进行 spin, 不然的话不是浪费 CPU 吗, 对挖
(timed ? maxTimeSpins : maxUntimedSpins) : 0);
for(;;){ // loop 直到 成功
if(w.isInterrupted()){ // 4. 若线程中断, 直接将 item = this, 在 transfer 中会对返回值进行判断 (transfer中的 步骤 11)
s.tryCancel(e);
}
Object x = s.item;
if(x != e){ // 5. 在进行线程阻塞->唤醒, 线程中断, 等待超时, 这时 x != e,直接return 回去
return x;
}
if(timed){
nanos = deadline - System.nanoTime();
if(nanos <= 0L){ // 6. 等待超时, 改变 node 的item值, 进行 continue, 下一步就到 awaitFulfill的第 5 步 -> return
s.tryCancel(e);
continue;
}
}
if(spins > 0){ // 7. spin 一次一次减少
--spins;
}
else if(s.waiter == null){
s.waiter = w;
}
else if(!timed){ // 8. 进行没有超时的 park
LockSupport.park(this);
}
else if(nanos > spinForTimeoutThreshold){ // 9. 自旋次数过了, 直接 + timeout 方式 park
LockSupport.parkNanos(this, nanos);
}
}
}
梳理逻辑:
- 计算timeout时间(若 time = true)
- 判断 当前节点是否是 head.next 节点(queue中有个dummy node 的存在, AQS 中也是这样), 若是的话就进行 spin 的赋值, 其他的节点没有这个需要, 浪费资源
- 接下来就是自旋, 超过次数就进行阻塞, 直到有其他线程唤醒, 或线程中断(这里线程中断返回的是 Node 自己)
说一下大致的操作。在transfer中,把操作分为两种,一种就是入队put,一种是出队take,入队的时候会创建data节点,值为data。出队的时候会创建一个request节点,值为null。
-
put和take操作都会调用该方法,区别在于,put操作的时候e值为数据data,take操作的时候e值为null
-
如果h==t也就是队列为空,或者当前队列尾部的数据类型和调用该方法的数据类型一致:比如当前队列为空,第一次来了一个入队请求,这时候队列就会创建出一个data节点,如果第二次又来了一个入队请求(和第一次也就是队列尾部的数据类型一致,都是入队请求),这时候队列会创建出第二个data节点,并形成一个链表。同理,如果刚开始来了request请求,也会入队,之后如果继续来了一个reqeust请求,也会继续入队!
-
满足2的条件,就会进入3,中间会有一些一致性检查这也是必须的,避免产生并发冲突。3会创建出一个节点,根据e值的不同,可能是data节点或者request节点。
-
把3中创建的节点通过cas方式设置到队列尾部去。
-
把tail通过cas方式修改成3中新建立的s节点
-
调用方法awaitFulfill进行等待,如果3中创建的是data节点,那么就会等待来一个reqeust节点,反之亦然!
6.1 放入队列之后就开始进行循环判断
6.2 终止条件是节点的值被修改,具体如果是data节点,那么会被修改成null,如果是request节点,那么会被修改成data值。这个修改是在第9步中由相对的请求(如果创建的是data节点,那么就由reqeust请求来进行修改,反之亦然)来做的。如果一直没有相对的请求过来,那么节点的值就一直不会被修改,这样就跳不出循环体!
6.3 如果没有被修改,那么就需要进入park休眠,等待第9步进行修改后再通过unpark进行唤醒,唤醒之后就会判断节点值被修改从而返回。 -
如果在插入一个节点的时候,不满足2的条件,也就是队列不为空并且尾部节点和当前要插入节点的类型不一样(这就代表来了一个相对请求),比如上图中的尾部是data节点,如果来了一个插入reqeust节点的请求,那么就会走到7这里
-
由于是队列,先进先出,所以会取队列里面的第一个节点,也就是h.nex
-
把8中取出的节点的值通过cas的方式设置成新来节点的e值,这样就成功的满足了6-2的终止条件
-
将head节点往后移动,这样就把第一个节点成功的出队。
-
每个节点都保存了对应的操作线程,将8中节点对应的线程进行唤醒,这样6-3处于休眠的线程就醒来了,然后继续进行for循环,进而判断6-2终止条件满足,于是返回
image.png
网友评论