之前写了SynchronousQueue的源码解析,其内部实现有两个数据结构:一个是栈,一个是FIFO队列,在之前的文章中主要分析了栈(非公平模式)的实现思路而没有分析队列的实现,其实是因为JDK里面本身有LinkedTransferQueue这个队列的实现,所以今天单独来说下这个队列的实现思路,主要目的方便自己以后快速的回顾。
首先我们回顾一下SynchronousQueue里面的TransferStack的实现方式:
1.栈中节点通过0,1,2来标记节点是获取数据还是生产数据,或者是正在匹配。
2.栈中的节点称为waiter(等待者),等待其它线程与其匹配。
3.匹配过程是将匹配节点入栈,并设置节点操作模式为FULFILLING|mode让后面的匹配节点和等待节点都不能入栈,即达到了匹配动作的线程安全。
LinkedTransferQueue的实现在逻辑上与TransferStack大致相同,不同点在于队列是FIFO的,所以匹配每次操作的都是从头节点开始寻找,直到找到一个未匹配的节点,而入队列的节点是从队尾进入,与队头不冲突,所以在实现上,不会需要像栈一定,将匹配中的节点压入栈。
LinkedTransferQueue的实现也没有像SynchronousQueue里面的实现那样,在transfer(xfer)方法的处理上也有一些不同。在SynchronousQueue的transfer方法上只有3个参数,方法实现是通过是否传入元素e来判断本次操作是获取还是生产,而在LinkedTransferQueue的xfer方法的实现上,还需要手动的传入一个haveData参数来表名本次的操作。(xfer是jdk1.7写的,transfer方法是1.8写的,可以看出作者后面也发现这个参数真没必要)。
先说说总体首先思路吧:
1.一次操作,先从队列头开始检查,看节点是否已经匹配,并且模式是否与本次操作相匹配,如果模式匹配,则进行配对CAS操作。
2.如果头节点已经被匹配,则循环查找下一个节点
3.如果队列为空,或者操作模式与队列中的节点的模式一样,则本次操作构造为节点进入队列成为等待者
4.等待者在队列中睡眠或者自旋,当有调用者将其唤醒后匹配成功出队列
接下来是对代码的详细说明:
private E xfer(E e, boolean putData, int how, long nanos) {
//判断参数操作模式和e是否匹配,如果是put,则e不为空
if (putData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
//代码1:从头节点开始遍历,直到队列为空
for (Node h = head, p = h; p != null;) { // find & match first node
boolean nodeIsPutData = p.isData; //头节点操作模式(false为消费,true为生产)
Object item = p.item; //头节点数据
//代码2
//代码2-1:如果头节点的数据不等于自己(节点未被匹配,见awaitMatch方法代码1)
//(item != null) == nodeIsPutData 表明节点的操作模式要与节点的数据对应,这样节点才是正常的
if (item != p && (item != null) == nodeIsPutData) { // unmatched //如果头节点已经被匹配,则去到代码3,继续循环下一个节点
if (nodeIsPutData == putData) // can't match //如果头节点的模式和本次操作的模式一样,则不能匹配,所以跳出循环来到代码4(入队列)
break;
if (p.casItem(item, e)) { // match //匹配成功
//n1(head: h)->n2->n3->n4->n5(p)->n6->n7(tail) 假如找到n5才匹配上,则说明n1,n2,n3,n4都已经
//
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
//将头节点指向p节点(p为尾节点)或者指向p.next,从而p之前的那些节点出队列
//这里为什么不是直接指向p.next?为什么有可能指向p,p不是已经被匹配了么 ?
//因为如果让head指向null的话还需要cas改变tail,两个cas没办法保证原子性),否则指向q的下一个节点。这里在代码3处得到补偿
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext(); //将next指向自己,这里印证了代码3
break; //
} // advance and retry
//重新判断队列里面是否至少还有2个节点,并且第二个是已经被匹配了的,这种情况,再次进入循环,尝试将已经匹配的节点出队列。
//if((h = head) != null && (q = h.next) != null && q.isMatched()){
// continue;
//}else break;
if ((h = head) == null ||
(q = h.next) == null ||
!q.isMatched())
break; // unless slack < 2
}
//代码2-2
//唤醒p节点,节点从awaitMatch方法中醒来,然后返回配对的值
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
//代码3,如果p!=p.next(p没有出队列),则取下一个节点继续尝试,否则从“头”开始
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
//代码4:如果入队节点模式与等待者相同,或者队列中已经没有待匹配的等待者
//NOW代表不等待的poll操作,除此之外
if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, putData); //本次操作构造节点
Node pred = tryAppend(s, putData); //尝试入队列,详情见tryAppend方法
if (pred == null)
continue retry; //入队失败,从新尝试 // lost race vs opposite mode
//ASYNC代表offer, put, add
//如果入队列成功,并且是take和timed poll方法,则进入等待
if (how != ASYNC) //SYNC / TIMED
//让当前节点进入睡眠状态,等待配对者将其唤醒
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
/**
将本次操作封装成队列节点入队
入队列逻辑:
1.判断队列是否为空,为空则直接入
2.从队尾入手,判断队尾节点的状态是否与本次操作相同,如果不同则不能入队列
2.1.如果能入队列,则判断循环中的p节点是否还是尾节点,如果不是,则继续往下找,直到尾节点
3.如果p是尾节点,则cas将s节点入队列,如果cas失败,则p重新定位为next,又重新尝试
4.如果入队成功
**/
private Node tryAppend(Node s, boolean putData) {
for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
//1.队列为空,将当前节点设置为头节点
if (p == null && (p = head) == null) {
if (casHead(null, s))
return s; // initialize
}
//2.来到这里说明队列不为空,此时理论上p!=null
//p节点未被匹配,并且模式与putData相反,则说明s节点无法接入队列
else if (p.cannotPrecede(putData))
return null; // lost race vs opposite mode
//2.1 如果已经有新节点入队列了,或者队尾出队了(p.next == p)
else if ((n = p.next) != null) // not last; keep traversing
// tail发生了变化,则指向新的tail(u),否则看老的tail节点是否出队列,如果未出,则让p指向p.next,如果已出队列,则说明队列空了,则让p指向null重新尝试
//让t每次都指向最新的tail,而如果p!=t,则说明p不是尾节点, 如果t已经等于了tail,则单独设置p(如果p!=p.next,则p就指向next,否则p指向null,重新从head开始)
//如果t不等于tail,则将t指向tail
//这里为什么要这么复杂? 因为有代码3.可知,节点入队的保证是casNext, 如果保证线程安全的入队,不能只拿到tail就append,因为有可能老的tail已经不是队尾了,
//所以必须一直遍历到p.next等于null才能入队列,并且如果发现p已经出队列,则将p指向null,然后代码来到1.处,从“头”开始往后遍历。
//为什么从头开始能保证线程安全呢? 因为节点出队列是casHead来保证的。
p = (p != t) && t != (u = tail) ?
(t = u) :
((p != n) ? n : null; )// restart if off list
//3.入队列
else if (!p.casNext(null, s))
p = p.next; // re-read on CAS failure
//4.如果入队成功返回前节点
else {
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null &&
(s = s.next) != null &&
s != t);
// advance and retry
}
return p;
}
}// for over
}
//返回true,说明调用者节点不能作为前继节点
final boolean cannotPrecede(boolean haveData) {
boolean d = isData; //
Object x;
//如果调用者操作模式和本次操作模式不相同
//并且调用者没有被匹配 x = item) != this && (x != null) == d
return d != haveData && (x = item) != this && (x != null) == d;
}
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = -1; //让出CPU的次数 // initialized after first item and cancel checks
ThreadLocalRandom randomYields = null; // bound if needed
//自旋方法:睡眠节点等待超时,被中断,或者匹配成功才会返回。
for (;;) {
Object item = s.item;
//代码1:当节点数据放生变化时,说明节点已经被匹配,返回匹配值
if (item != e) { // matched
// assert item != s;
s.forgetContents(); // avoid garbage
return LinkedTransferQueue.<E>cast(item);
}
//如果当前线程被中断,或者线程等待超时(timed && nanos <= 0)并且将自己的item指向自己(在代码2-1处可以看到,判断节点是否被匹配,是通过判断item是否等于自己)
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) { // cancel
unsplice(pred, s);
return e;
}
//初始化spins==-1,当其小于0时,
if (spins < 0) { // establish spins at/near front
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
}
//自旋次数大于0,继续
else if (spins > 0) { // spin
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); //随机让出CPU时间 // occasionally yield
}
//自旋完成,准备进入睡眠,先设置等待线程
else if (s.waiter == null) {
s.waiter = w; // request unpark then recheck
}
//如果有时间等待限制,判断是否超过等待时间,如果没有超过等待时间,则让节点睡眠(一定时间,节点超时或者被其它节点唤醒)
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
else { //如果没有时间限制,则直接将线程睡眠
LockSupport.park(this);
}
}
}
/**
通过前节点和当前节点的操作模式来判断自旋次数
**/
private static int spinsFor(Node pred, boolean haveData) {
//如果是多核心CPU,并且前继节点存在,则根据前继节点的情况设置自旋次数。
if (MP && pred != null) {
if (pred.isData != haveData) // phase change //没看懂为什么前节点的模式和本次操作模式会不相等(因为节点都已入队列,说明前节点和本节点类型应该一定相等才对)
return FRONT_SPINS + CHAINED_SPINS;
if (pred.isMatched()) // probably at front //如果前节点以及被匹配,那么自己就是最前面的节点了,那就使用前节前的自旋次数定义
return FRONT_SPINS;
if (pred.waiter == null) // pred apparently spinning //如果前节点的waiter等于空,说明前节点还在自旋,则使用较小的CHAINED_SPINS作为自旋次数
return CHAINED_SPINS;
}
return 0;
}
最后再总结一下:
1.队列中的节点称为等待者,等待者首先会自旋一会儿来判断自己是否被匹配上,自旋完了还没匹配上就线程睡眠。
2.判断一个节点是否被匹配,直接判断item跟isData模式上是否能匹配,但是代码里面多加了一个item是否指向自己的判断(我觉得没必要),因为匹配成功的条件就是p.casItem(item, e),见代码2-1
3.判断节点是否出队列,需要判断节点以及节点的next是否指向自己。
4.等待着发现自己被匹配上了,会将自己节点的内容清空:即通过forgetContents方法将item指向自己,将线程指向null.
由于个人能力有限,代码的理解上可能会存在一定的错误,还请各位指正,谢谢。
网友评论