put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// transferer托管进行阻塞式插入,一直等到有人取走为止
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
take
public E take() throws InterruptedException {
// 阻塞式读取,直到有人写入为止
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
transfer
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
// 传入的e是否为空来决定是入队还是出队的操作
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
// 如果栈顶节点不为空,且mode相同
if (h == null || h.mode == mode) { // empty or same-mode
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
// 将该mode相同的插入值包装成snode,作为新的栈顶
} else if (casHead(h, s = snode(s, e, h, mode))) {
// 并且阻塞,并等待匹配的请求
// 如果被唤醒,那么m就是等待的匹配请求
SNode m = awaitFulfill(s, timed, nanos);
// 如果返回的匹配请求时自己本身,那么说明是被中断的,不是正常唤醒,那么返回null
// 将s从栈中弹出
if (m == s) { // wait was cancelled
clean(s);
return null;
}
// 到这里,说明匹配的真的来的,那么将这一对匹配请求进行出栈,head继续往下,等待下次匹配
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
// 根据类型来返回请求的值
return (E) ((mode == REQUEST) ? m.item : s.item);
}
// 到这说明mode不同,带来的意义是可进行匹配操作
// 先看看head是不是已经被别人匹配了,如果没有,继续往下
} else if (!isFulfilling(h.mode)) { // try to fulfill
// 如果head已经被cancel了,那么head往下,继续自旋
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
// 否则将e包装成snode,入栈为栈顶,且mode=FULFILLING|mode
// 表示该节点已经被配对,不能再被其他人匹配了
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
// 如果入栈成功,那么自旋先
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
// 如果发现match请求为null,那么前面入栈的栈顶也没有意义了,回滚并继续自旋
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
// 假如s->m->mn
SNode mn = m.next;
// 如果m跟s是一对匹配的请求
if (m.tryMatch(s)) {
// 那么弹出m和s,将head设为mn,继续下次匹配
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
// 如果不匹配的话,将s指向mn,跳过m
// 自旋后,来处理s是否和mn匹配
s.casNext(m, mn); // help unlink
}
}
// 到这里说明head被标识为匹配,那么看m和h是否匹配,匹配就head指向mn,准备下次匹配
// 不匹配就跳过m,继续h跟mn是否匹配,继续自旋处理
} else { // help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
// 如果不匹配的话,将h指向mn,跳过m
h.casNext(m, mn); // help unlink
}
}
}
}
网友评论