同步队列:它继承了一般的AbstractQueue和实现了BlockingQueue接口。它与其它的BlockingQueue最大的区别就在它不存储任何数据,它的内部是一个栈(非公平)或者一个队列(公平策略)用来存储访问SynchronousQueue的线程,而访问它的线程有消费者和生产者,对应于方法put和take。当一个生产者或者消费者试图访问SynchronousQueue的时候,如果找不到与之能够配对的消费者或者生产者,则当前线程会阻塞,直到对应的线程将其唤醒,或者等待超时,或者中断。
以下是它的put和take方法的实现,不管是put还是take,其核心都是调用transferer对象的transfer方法,所以要弄明白SynchronousQueue,就需要弄清楚Transferer。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
我们先从整体上来看一下SynchronousQueue的内部结构:
类结构图
从类图上可以看出,SynchronousQueue内部引用了一个Transfer,而Transfer的实现有两种,一个是TransferStack,一个是TransferQueue.
今天我们分析的重点对象是TransferStack.
1.先来看下TransferStack是如何初始化的:
//SynchronousQueue的构造方法
public SynchronousQueue(boolean fair) {
//如果初始化为非公平策略,则使用TransferStack
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
2.仔细看一下TransferStack的实现:
static final class TransferStack<E> extends Transferer<E> {
//代表本次transfer的操作是获取数据
static final int REQUEST = 0;
//代笔本次transfer的操作是放入数据
static final int DATA = 1;
//代表节点正在配对
static final int FULFILLING = 2;
//判断节点目前是否在匹配中,3&2 == 2(匹配中)
//2&2 == 2(匹配中) 1&2==0 , 0&2==0 (等待匹配)
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
/** 内部类,栈节点*/
static final class SNode {
volatile SNode next; // next node in stack
volatile SNode match; // 配对的节点,如果未配对则未空
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode;
SNode(Object item) {
this.item = item;
}
boolean casNext(SNode cmp, SNode val) {
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
/**
尝试配对
s:被配对的节点
配对的逻辑为:通过CAS设置节点的match属性,如果能设置成功,则说明配对成功,配对成功后,再通过LockSupport.unpark(w);
将其对应的等待线程唤醒。
这个方法比较简答,就不再累述
**/
boolean tryMatch(SNode s) {
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}
/**
* Tries to cancel a wait by matching node to itself.
如果一个节点等待超时,或者线程被中断,则需要取消节点的等待,而取消等待的标志就是将match指向自己
*/
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
boolean isCancelled() {
return match == this;
}
// Unsafe mechanics
//Unsafe机制, 不懂的就不要看juc包了,先搞懂Unsafe再看
private static final sun.misc.Unsafe UNSAFE;
private static final long matchOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = SNode.class;
matchOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("match"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
/** The head (top) of the stack */
//栈顶节点
volatile SNode head;
//重新CAS栈顶节点,一般都用在栈顶元素出栈的时候
boolean casHead(SNode h, SNode nh) {
return h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}
//构造栈节点
static SNode snode(SNode s, Object e, SNode next, int mode) {
if (s == null) s = new SNode(e);
s.mode = mode;
s.next = next;
return s;
}
//关键方法transfer, 通过e是否为空来判断本次操作是获取元素还是放入元素
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA; //e为空, 表示该次请求为获取元素,e不为空则是生产放入元素
for (;;) {
SNode h = head;
//①当头节点为空,则新加入节点直接入栈,然后进入等待,当头节点不为空,并且本次操作的模式和头节点模式一样,则继续进入栈等待,否则去到代码②
if (h == null || h.mode == mode) { // empty or same-mode
if (timed && nanos <= 0) { // can't wait //不需要等待
if (h != null && h.isCancelled()) //头节点已经中断或者超时,重新设置头节点
casHead(h, h.next); // pop cancelled node
else
return null; //由于不能等待,而头节点与本次入队列的节点操作模式相同,所以直接返回空(如果可以等待的话会进入自旋)
} else if (casHead(h, s = snode(s, e, h, mode))) { //新节点入栈
SNode m = awaitFulfill(s, timed, nanos); //等待该节点被配对或者超时中断,详情见下面的方法具体解释。
if (m == s) { // wait was cancelled
clean(s); //清理从head节点到s.next节点之间的“取消”节点
return null; //返回null,线程被中断或者节点等待超时,调用者(上层应用)能处理线程中断逻辑。
}
//s节点从awaitFulfill中出来,说明是配对成功节点将其唤醒,而唤醒的节点一定是栈顶节点(见代码②逻辑,因为每次只允许有一个节点进入配对状态,
//进入配对状态后,其它节点无法加入栈,只有等到配对成功,重新casHead后其它节点才能入栈,所以这里直接判断头节点)
if ((h = head) != null && h.next == s) //因为匹配线程那边也可能在调用casHead,所以这里h.next == s判断一下head是否已经被重置
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item); //如果是消费者则返回匹配上的m.item, 如果是生产者,则返回生产的节点的item
}
}
//②
//本次操作模式和头节点模式不相等,先检查头节点模式是否为匹配中。
//头节点模式为 3或者2时为匹配中,这时任何新增的节点都无法入栈,因为任何新增操作的模式都不等于3或者2,所以在这次匹配完成之前,head节点是不会变化的。
else if (!isFulfilling(h.mode)) {
//头节点模式为:0或者1
//head节点已经取消,所以将head指向下一个节点,head出栈
if (h.isCancelled())
casHead(h, h.next); // pop and retry
//②-1
//如果头节点模式为0或1,则本次操作的节点入栈,入栈的模式为:2或者3, ps(这个时候头节点判断isFullfilling则返回true),
//所以一旦有节点在匹配中,则其它新增节点都会直接去到代码③
//FULFILLING=2, 2|0 == 2, 2|1==3
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
//如果新的头节点后面没有等待节点了(等待节点有可能被中断或者超时出栈了),则清空栈,自旋重新开始
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
//②-2如果下一个节点不为空,则尝试匹配
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m //匹配成功,则弹出头节点和等待节点
//如果本次操作是获取数据,则返回匹配上的生产数据的item,相反, 如果本次操作是生产数据,则直接返回自己生产的数据。
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
//如果匹配失败(理论上一次只有一个节点能够进行匹配,见②-1,所以理论上不应该会匹配失败,但是为什么会失败呢?
//因为匹配的m还有可能超时或者被中断),所以如果匹配失败,弹出m节点。
s.casNext(m, mn); // help unlink
}
}
}
//③如果头节点不为空,并且本次操作的模式和头节点的模式不相同,并且头节点是正在匹配
else { // help a fulfiller
SNode m = h.next; // m is h's match
//当②中新的匹配节点进入后,有可能等待节点超时或者被中断了,即waiter is gone
if (m == null) // waiter is gone
//删除匹配节点,栈清空重新开始,因为最外层有一个自旋,所以又会重新安置这个节点。
casHead(h, null); // pop fulfilling node
else {
//这一段代码逻辑与②-2相同
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
//让节点进入等待(通常发生在节点入栈时发现没有对应的匹配者)
//根据设置的是否等待超时条件,让当前线程进入等待配对的自旋逻辑中,只有当配对成功,或者线程中断才会退出。
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted()) //当前线程已经中断,则取消当前节点的配对等待
s.tryCancel();
SNode m = s.match;
if (m != null) //当前节点已经配对成功,或者取消(m==s)
return m;
if (timed) { //有超时设置
nanos = deadline - System.nanoTime();
if (nanos <= 0L) { //已经超时
s.tryCancel(); //取消s节点的等待
continue;
}
}
//不超时(一直等待),或者设置超时时间后还没有超时。
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0; //减少循环次数
else if (s.waiter == null) //不再自旋,设置线程
s.waiter = w; // establish waiter so can park next iter
else if (!timed) //如果设置的是一直等待,则睡眠当前线程
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold) //如果是有时间的等待,但是如果还剩下小于1秒的时间,则不再park,直接继续自旋
LockSupport.parkNanos(this, nanos);
}
}
//判断是否还需要继续自旋还是睡眠
//这里要注意的是该方法只在awaitFulfill方法被调用,而awaitFulfill只在入栈元素和栈顶元素操作模式一致时(者说明栈顶元素不是匹配中状态,见代码②处的说明)
boolean shouldSpin(SNode s) {
SNode h = head;
//这个逻辑尤其是h==null确实没看明白什么时候会出现h!=s然后h == null,因为s入栈时head会指向s,就算s在自旋过程中有新的head节点加入,那h也不会为null。 先打???
//但是如果我来写这个方法的话, 我我会写一个shouldNotSpin,如下
return (h == s || h == null || isFulfilling(h.mode));
}
//s不在栈顶,并且栈顶元素不在匹配中,则栈顶以后的元素可以暂时不用自旋,这样逻辑就很容易说得通
//因为栈顶元素还没有进来匹配者,说明栈顶元素和s一样属于等待者(前者都还没被匹配,s自然不会被匹配,所以先睡一会儿)
boolean shouldNotSpin(Snode s){
Snode h = head;
return (h!=null && h!=s && !isFulfilling(h.mode));
}
//清理节点出栈
void clean(SNode s) {
s.item = null; // forget item
s.waiter = null; // forget thread
//这里为什么要尝试下一个,而不到下下一个????
SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;
// Absorb cancelled nodes at head
//清理头节点到past节点之间,从头节点开始连续的“取消”状态的节点(主要是清理头节点)
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);
// Unsplice embedded nodes
//清理头节点到past节点之间,跳跃的被“取消”的节点(主要是清理头节点与past中间的节点)
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = TransferStack.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
} catch (Exception e) {
throw new Error(e);
}
}
}
核心逻辑就是以上的代码,只要记住:SynchronousQueue不存储实际的数据,它的栈或者队列中存储的是生产者和消费者节点,最开始进入栈的节点就成为了等待者(这里不管是生产还是消费),而后面进入的节点都需要根据操作的mode来判断是继续入栈等待还是入栈后立即进行匹配。
网友评论