一、说明:SynchronousQueue是堵塞队列得一种,其特性是put操作和take操作必须是互相唤醒的,比如:put操作,必须take操作唤醒,take操作必须put操作唤醒,没有take操作,多个put操作都堵塞,等待一个一个take操作来一个一个唤醒,同理,take操作也一样,跟他的名字一样,同步队列,就好像,你是救我的药,我也是救你的药。
误区:很多人把其认为其没有容量,不存储元素,这是错的。
正解: SynchronousQueue的公平队列TransferQueue是一个单项链表,是会存储元素的。
二、源码分析
从构造方法入手。看代码注释。
无参构造方法,默认给得参数是false,也就是非公平队列。
非公平队列使用得是TransferStack, 公平队列使用得是TransferQueue。公平和非公平也就是put和take数据的顺序是否是公平的,也就是先放则先拿,后放则后拿。
/**
* Creates a {@code SynchronousQueue} with nonfair access policy.
*/
// 无参构造方法,默认给得参数是false,也就是非公平队列。
public SynchronousQueue() {
this(false);
}
/**
* Creates a {@code SynchronousQueue} with the specified fairness policy.
*
* @param fair if true, waiting threads contend in FIFO order for
* access; otherwise the order is unspecified.
*/
// 非公平队列使用得是TransferStack, 公平队列使用得是TransferQueue。
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
这里我们研究公平队列,也就是传参true。
TransferQueue类中有一个静态内部类Qnode,TransferQueue中存储得一个元素就是一个Qnode,Qnode有next,说明是一个单项链表
static final class TransferQueue<E> extends Transferer<E> {
/*
* This extends Scherer-Scott dual queue algorithm, differing,
* among other ways, by using modes within nodes rather than
* marked pointers. The algorithm is a little simpler than
* that for stacks because fulfillers do not need explicit
* nodes, and matching is done by CAS'ing QNode.item field
* from non-null to null (for put) or vice versa (for take).
*/
/** Node class for TransferQueue. */
static final class QNode {
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
TransferQueue中还有两个变量。1、head,头节点。2、tail,尾节点。
/** Head of queue */
transient volatile QNode head;
/** Tail of queue */
transient volatile QNode tail;
/**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node
* when it was cancelled.
*/
transient volatile QNode cleanMe;
还有三个方法。方法作用看代码注释。
advanceHead:cas替换,把TransferQuene的旧头节点替换为新节点,旧头节点得next指向旧头节点自己。
advanceTail:cas替换,把TransferQuene的旧尾节点替换为新节点。
/**
* Tries to cas nh as new head; if successful, unlink
* old head's next node to avoid garbage retention.
*/
// cas替换,把TransferQuene的旧头节点替换为新节点,旧头节点得next指向旧头节点自己。
void advanceHead(QNode h, QNode nh) {
if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
}
/**
* Tries to cas nt as new tail.
*/
// cas替换,把TransferQuene的旧尾节点替换为新节点。
void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
/**
* Tries to CAS cleanMe slot.
*/
// 同上
boolean casCleanMe(QNode cmp, QNode val) {
return cleanMe == cmp &&
UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}
接下来跟着SynchronousQueue的put方法走看源码。
调用构造方法的时候,就创建了一个dummy node(假节点),TransferQueue的头和尾指针都指向这个节点。也就是这种结构:head(tail)(dummy)
// 测试方法
@Test
public void testPut() throws InterruptedException {
SynchronousQueue<String> queue = new SynchronousQueue<>(true);
queue.put("hello");
}
// SynchronousQueue的构造方法,我们传入了true,所以使用的queue是TransferQueue。
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
// 调用构造方法的时候,就创建了一个dummy node(假节点),TransferQueue的头和尾指针都指向这个节点。
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
static final class QNode {
// next 指针
volatile QNode next; // next node in queue
// 实际保存的元素
volatile Object item; // CAS'ed to or from null
// 锁和唤醒的线程
volatile Thread waiter; // to control park/unpark
// 是否是数据(put则为true,take则会false)
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
然后看起put方法
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
// e:put的元素。timed:是否具有超时机制。nanos:超时时间。
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
// put操作这里e不为null,isData为true
boolean isData = (e != null);
// 自旋
for (;;) {
QNode t = tail;
QNode h = head;
// 还没有初始化,则continue重新来过
if (t == null || h == null) // saw uninitialized value
continue; // spin
// h == t则说明还没有存入元素。这是为true,进入。进入这个if就会堵塞元素,不管take还是put。
if (h == t || t.isData == isData) { // empty or same-mode
// tn为null。(顺便说一句:next变量是volatile修饰,多线程可见,所以下面这个复制操作是线程安全的。)
QNode tn = t.next;
// 其他线程把尾节点改变了,则再旋一次
if (t != tail) // inconsistent read
continue;
// tn为null
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
// 无超时机制
if (timed && nanos <= 0) // can't wait
return null;
// 创建put操作存放数据的新节点,isData为true。
// 如果没有元素,take操作进入这里,也创建了一个新node,也就是说take操作堵塞的时候,也会创建节点,
if (s == null)
s = new QNode(e, isData);
// cas替换,从:head(tail)(dummy)到head(tail)(dumy)->S
// cas替换,尾节点的next指向新建立的s节点。失败则再旋一次
if (!t.casNext(null, s)) // failed to link in
continue;
/*cas替换,把TransferQuene的旧尾节点t替换为新节点s。
至此,变成了head(dumy)=>s(tail)
*/
advanceTail(t, s); // swing tail and wait
// 下面这个方法调用就是具体的堵塞调用,看下一段代码分析
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
/**
* Spins/blocks until node s is fulfilled.
*
* @param s the waiting node
* @param e the comparison value for checking match
* @param timed true if timed wait
* @param nanos timeout value
* @return matched item, or s if cancelled
*/
// s是新节点,e是put的真正数据,timed是是否超时,nanos是超时时间
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
// deadLine为0
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 自旋次数,第一次put的时候,链表结构为:head(dumy)=>s(tail),所以
// head.hext == s,timed为不超时false,所以spin=maxUntimedSpins=512,
// 如果第二个put堵塞,则结构为:head(dumy)=>s=>s1(tail),而head.next和s1(新put的元素是s1)不相等,所以,spin=0直接堵塞。
//(为什么会自旋,我估计是为了,高并发下,take操作
// 和put操作很可能再极短时间进行,这样的话,就不需要唤醒线程和堵塞线程)
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 当前线程准备中断,则s节点的item指向s节点自己
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item;
// 这里为false,只有当其他线程把这个元素替换了,比如put堵塞在这
// 里的时候,take就会把这个元素替换掉,然后put唤醒的时候就能直接return了。
if (x != e)
return x;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
// 这里spins初始为512,一直递减到0
if (spins > 0)
--spins;
else if (s.waiter == null)
// node节点和waiter和当前线程关联上,为了公平的唤醒。
s.waiter = w;
else if (!timed)
// 锁住当前线程。设置thread的block变量为parkBlocker指向的对象transferQueue。
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
再来看take方法
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
// isData为false
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
// 第一put后结构变成了:head(dumy)=>s(tail)
/* 所以这里h不等于t,t.isData为true,
*所以这里不成立,走else块。只要不堵塞,都会走else块。put操作如
果不堵塞,也会走else块。
*/
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
if (t != tail) // inconsistent read
continue;
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;
advanceTail(t, s); // swing tail and wait
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { // complementary-mode
//head(dumy)=>s(tail)
// m就是之前put的哪个节点
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
/*
isData=(e!=null),这是take操作,所以e为null,所以isData为false,x为之前put进去的值,为非null
x == m说明已经取消了,之前put操作的时候,
*awaitFulfill方法里,如果当前线程准备中断,
*就会调用qnode的tryCancel方法,让qnode的next指向自己,代表
* 这个节点取消了。
head(dumy)=>s(tail)
*!m.casItem(x, e):直接替换item的值,
这样,在take方法堵塞在awaitFulfill方法里的时候,
这里直接把之前take方法创建的node的item改掉,
然后唤醒take的线程,然后take操作获取到这个新值了和它之前的值不一样,则直接跳出循环,不堵塞。
*/
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
// 变成了head(s)(tail)
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
SynchronousQueue还有offer方法,poll方法。add方法。
offer方法,不会堵塞,可以存进去,则存进去(poll或take操作正在堵塞等着获取元素),否则,直接返回false。
poll方法可以有超时时间,和take差不多,take没有超时时间。
add方法,调用的就是offer方法,不过add方法,添加不进去,则直接报错。
总结
新建TransQueue初始化结构为:head(dumy)(tail)
第一个次put后:head(dumy)=>s(tail)
第二个次put后:head(dumy)=>s=>s1(tail)
第一次put后紧接着take后:head(s)(tail)
链表中没有put过的节点,则自旋512次,有,则直接堵塞
put和take都会存储元素,take存储的元素item为null,再堵塞之后,put操作会cas替换这个元素的item,然后唤醒take操作,获取这个新元素。
网友评论