SynchronousQueue应该来说算是阻塞队列。一个是因为Implements BlockingQueue。另外一个原因是通过LockSupport.park()/unpark()来挂起/恢复线程。但是假如说生产消费者两者近似同步的时候,注意这个地方不一定生产消费速度都很快的时候才能发挥这个队列的最佳性能,而是近似同步的时候。进来之后自转一下,就直接消费掉了,那么会更快一些。 那这种情况实际并不算是阻塞,因为cpu还在执行指令。当达到最大的maxUntimedSpins=32*16 次之后,才阻塞。在这之前就消费掉的话,应该是个无锁非阻塞的队列。
SynchronousQueue分为公平和非公平,默认情况下采用非公平性( (Lifo) stack)访问策略,当然也可以通过构造函数来设置为公平性((Fifo) queue)访问策略(为true即可)。
首先是SynchronousQueue的UML类图:
image.png
当SynchronousQueue构造方法传true的时候,采用的是dual-queue.默认是dual-stack。
什么叫dual-data-structure ? dual stack ;dual queue
至于什么是dual-stack什么是dual-queue。请看下面的论文:
https://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf
下面的截图是从论文中截图出来的。
首先register a request 。然后insert reservation。这里为什么不是data呢? 因为虽然producer走在consumer前面,但是第一个consumer走在第二个producer前面。所以可以是插入的data,也可能是hand-off partner。总而言之就是这个数据结构,既可以存储数据data,也可以是请求数据的线程。
然后,我们看其他的blockingQueue 。都是队列一端放入数据,一端消费数据。总而言之队列是数据的容器。
队列就像一个总线(BUS),很多数据往这里过,但是数据的寄件人sender和收件人receiver是不确定的
。但是这个SynchronousQueue不仅有数据,还有线程。 而且如果是公平模式,sender和recevier是固定的,就像一个管道,从数据的producer传送到consumer中。一个个都匹配的。
接下来我们看这个synchronousQueue什么样一个效果:
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueTest {
public static void main(String[] args) {
final BlockingQueue<String> synchronousQueue = new SynchronousQueue<String>(true);
SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(
synchronousQueue,1000);
new Thread(queueProducer).start();
SynchronousQueueProducer queueProducer2 = new SynchronousQueueProducer(
synchronousQueue,2000);
new Thread(queueProducer2).start();
SynchronousQueueProducer queueProducer3 = new SynchronousQueueProducer(
synchronousQueue,4000);
new Thread(queueProducer3).start();
SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(
synchronousQueue,6000);
new Thread(queueConsumer1).start();
SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(
synchronousQueue,8000);
new Thread(queueConsumer2).start();
}
}
class SynchronousQueueProducer implements Runnable {
protected BlockingQueue<String> blockingQueue;
private int sleepTime ;
final Random random = new Random();
public SynchronousQueueProducer(BlockingQueue<String> queue,int sleepTime) {
this.blockingQueue = queue;
this.sleepTime = sleepTime;
}
@Override
public void run() {
try {
Thread.sleep(sleepTime);
String data = UUID.randomUUID().toString();
System.out.println(Thread.currentThread().getName()+"Put: " + data);
blockingQueue.put(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class SynchronousQueueConsumer implements Runnable {
protected BlockingQueue<String> blockingQueue;
private int sleepTime ;
public SynchronousQueueConsumer(BlockingQueue<String> queue,int sleepTime) {
this.blockingQueue = queue;
this.sleepTime = sleepTime;
}
@Override
public void run() {
try {
Thread.sleep(sleepTime);
String data = blockingQueue.take();
System.out.println(Thread.currentThread().getName()
+ " take(): " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
输出结果:
Thread-0Put: be102704-9b87-40e6-8654-7433b3a8ebc5
Thread-1Put: b9819edc-7852-4a73-b47e-f9ca162986bf
Thread-2Put: ca98c7e8-26f8-4066-8427-bfd0469699b4
Thread-3 take(): be102704-9b87-40e6-8654-7433b3a8ebc5
Thread-4 take(): b9819edc-7852-4a73-b47e-f9ca162986bf
我们发现数据是FIFO,先入先出,最后一个Thread-2,等待线程与之匹配。
接下来,我们调试源码,看看内部原理如何:
为了调试方便,我们改动main方法如下:
public static void main(String[] args) {
final BlockingQueue<String> synchronousQueue = new SynchronousQueue<String>(true);
SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(
synchronousQueue,1000);
new Thread(queueProducer).start();
SynchronousQueueProducer queueProducer2 = new SynchronousQueueProducer(
synchronousQueue,20000);
new Thread(queueProducer2).start();
SynchronousQueueProducer queueProducer3 = new SynchronousQueueProducer(
synchronousQueue,40000);
new Thread(queueProducer3).start();
// SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(
// synchronousQueue,6000);
// new Thread(queueConsumer1).start();
//
// SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(
// synchronousQueue,8000);
// new Thread(queueConsumer2).start();
}
刚开始构造方法new TransferQueue<E>();生成一个QNode对象。QNode h = new QNode(null, false);item是空的,而且不是数据。刚开始是一个dummy node 。或者说是哨兵节点。head和tail都指向这个dummy node。
E transfer(E e, boolean timed, long nanos) {
// 基本算法是循环尝试,执行下面两个步中的,其中一个:
/**
* 1.如果队列为空,或队列中为相同模式的节点,尝试节点入队列等待,
* 直到fulfilled,返回匹配元素,或者由于中断,超时取消等待。
* 2.如果队列中包含节点,transfer方法被一个协同模式的节点调用,
* 则尝试补给或填充等待线程节点的元素,并出队列,返回匹配元素。
* 在每一种情况,执行的过程中,检查和尝试帮助其他stalled/slow线程移动队列头和尾节点
* 如果调用持有transferer的non-volatile/final引用,
* 可能出现这种情况。一般在循环的开始,都要进行null检查,检查过程非常快,不用过多担心
* 性能问题。
*/
QNode s = null; // constructed/reused as needed
// //如果元素e不为null,则为DATA模式,否则为REQUEST模式
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
// //如果队列头或尾节点没有初始化,则跳出本次自旋
if (t == null || h == null) // saw uninitialized value
continue; // spin
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
////如果t不是队尾,非一致性读取,跳出本次自旋
if (t != tail) // inconsistent read
continue;
if (tn != null) { // lagging tail
// //如果t的next不为null,设置新的队尾,跳出本次自旋
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;
advanceTail(t, s); // swing tail and wait
// //自旋或阻塞直到节点被fulfilled
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
// //如果s指向自己,s出队列,并清除队列中取消等待的线程节点
clean(t, s);
return null;
}
// //如果s节点已经不再队列中,移除
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { // complementary-mode
// //如果队列不为空,且与队头的模式不同,及匹配成功
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
// //如果h不为当前队头,则返回,即读取不一致
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
// //如果匹配节点元素不为null,则返回x,否则返回e,即take操作,返回等待put线程节点元素,
//put操作,返回put元素
return (x != null) ? (E)x : e;
}
}
}
最后是这样,head --> dummy node(next) ==>Qnode1(item1,waiter:THread-0,next)===>Qnode2(item2,waiter:Thread-1,next)==>Qnode3(item3,waiter:Thread-2,next)==>null;
tail ===>Qnode3(item3,waiter:Thread-2,next)==>null;
image.png
如果源码是这样的.
public static void main(String[] args) {
final BlockingQueue<String> synchronousQueue = new SynchronousQueue<String>(true);
// SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(
// synchronousQueue,1000);
// new Thread(queueProducer).start();
//
// SynchronousQueueProducer queueProducer2 = new SynchronousQueueProducer(
// synchronousQueue,20000);
// new Thread(queueProducer2).start();
//
// SynchronousQueueProducer queueProducer3 = new SynchronousQueueProducer(
// synchronousQueue,40000);
// new Thread(queueProducer3).start();
SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(
synchronousQueue,1000);
new Thread(queueConsumer1).start();
SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(
synchronousQueue,20000);
new Thread(queueConsumer2).start();
SynchronousQueueConsumer queueConsumer3 = new SynchronousQueueConsumer(
synchronousQueue,40000);
new Thread(queueConsumer3).start();
}
image.png
head --> dummy node(next) ==>Qnode1(item=null,waiter:THread-0,isData=false;next)===>Qnode2(item=null,waiter:Thread-1,isData=false;next)==>Qnode3(item=null,waiter:Thread-2,next)==>null;
tail ===>Qnode3(item=null,waiter:Thread-2,next)==>null;
所以当都是消费者,或者都是生产者的时候还是可以形成队列的。下面我们来看看如何配对。
image.png
m 就是match。
producer unpark之后,走这段代码。
image.png
TransferStack和TransferQueue差不多。消费的过程不太一样。
public static void main(String[] args) {
final BlockingQueue<String> synchronousQueue = new SynchronousQueue<String>();
SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(
synchronousQueue,1000);
new Thread(queueProducer).start();
SynchronousQueueProducer queueProducer2 = new SynchronousQueueProducer(
synchronousQueue,3000);
new Thread(queueProducer2).start();
SynchronousQueueProducer queueProducer3 = new SynchronousQueueProducer(
synchronousQueue,5000);
new Thread(queueProducer3).start();
SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(
synchronousQueue,25000);
new Thread(queueConsumer1).start();
}
还没消费的时候,produce了3个数据之后是这样的。
image.png
head ==> Thread-2 ===>Thread-1 ==> Thread -0 。栈顶head指向最后一个producer 。
然后开始消费
casHead(h, s=snode(s, e, h, FULFILLING|mode)) FULFILLING=2 ,mode =0 (REQUEST) 按位或=2
head ==>StackNode(mode=2,next)==> Thread-2 ===>Thread-1 ==> Thread -0 。
接着,执行tryMatch .mode=2的节点和栈里面mode=1的最上面的节点,一起消失。head重新指向,接下来的mode=1的节点。如图:
image.png
tryMatch方法中,m.tryMatch(s) ,m代表是栈顶的下一个元素,s代表当前栈顶元素,是m的match,所以调用Unsafe的compareAndSwapObject,把match设置成s节点。
image.png
网友评论