一 SynchronousQueue
- 不存储消息,线程生产消息后休眠等待其他线程消费,被消费后生产者线程才继续往后处理。
1.1 实例化
- 实例化,公平则是按先进先出排序消费,非公平则不确定顺序消费
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
1.2 入队
- 入队函数,实际都是调用
transferer.transfer(e, true, unit.toNanos(timeout))
实现
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}
1.3 出队
- 出队函数,实际都是调用
transferer.transfer(null, true, unit.toNanos(timeout))
实现,返回出队消息
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = transferer.transfer(null, true, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}
public E poll() {
return transferer.transfer(null, true, 0);
}
1.4 TransferQueue
1.4.1 队列存储节点结构
static final class QNode {
volatile QNode next; // next node in queue
// 数据存储,null表示需要数据,非null表示传输数据。
// null->非null表示获取到数据。
// 非null->null表示数据传输完成。
// item=QNode,表示节点被cancel
volatile Object item;
volatile Thread waiter; // 休眠等待的线程
final boolean isData;//true 生产者,false 消费者
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
//cas修改链表节点下一节点引用
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
//cas修改节点数据
boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
//数据引用cas修改为节点引用,即节点取消处理
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
boolean isCancelled() {
return item == this;
}
//是否从链表中退出
boolean isOffList() {
return next == this;
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
//cas修改时使用
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
1.4.2 实例化
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
1.4.3 基本操作
void advanceHead(QNode h, QNode nh) {
if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
}
void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
boolean casCleanMe(QNode cmp, QNode val) {
return cleanMe == cmp &&
UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}
1.4.4 入队/出队函数transfer()
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);//e为null出队,e不为null入队
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // 还未实例化
continue; // 自旋等待
if (h == t || t.isData == isData) { //队列为空,或队列节点模式相同
QNode tn = t.next;
if (t != tail) // 尾节点有并发修改,则重新获取
continue;
if (tn != null) { //tail不是真正的尾节点,则先更新tail节点再重新获取
advanceTail(t, tn);
continue;//不断循环直到tail为真正的尾节点
}
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)//若并发导致插入失败,则不为null
s = new QNode(e, isData);//首次执行则初始化节点,
if (!t.casNext(null, s)) //插入队尾,若有并发修改,则重头开始重新处理
continue;
advanceTail(t, s); // 更新tail,不关心结果,入队之前会保证tail更新成真正的尾节点
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // 节点被cancel
clean(t, s);
return null;
}
//节点s被消费,
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // 释放原head节点,更新s为head
if (x != null) // and forget fields
s.item = s;//cancel
s.waiter = null;//清空等待线程
}
return (x != null) ? (E)x : e;//返回传递的数据
} else { //模式不同,则节点消费
QNode m = h.next; // 从头节点开始
if (t != tail || m == null || h != head)
continue; // 有并发处理,则重头开始重新处理
Object x = m.item;
if (isData == (x != null) || // m已经被消费
x == m || // 节点被cancel
!m.casItem(x, e)) { // 有并发,消费失败
advanceHead(h, m); // m出队,处理下一节点
continue;
}
//m节点被消费,m出队,
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);//唤醒m节点的线程,进行后续处理
return (x != null) ? (E)x : e;//返回传输的数据
}
}
}
1.4.5 awaitFulfill()
//自旋次数,NCPUS是cpu核数
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
static final int maxUntimedSpins = maxTimedSpins * 16;
//1ms,最短休眠时间
static final long spinForTimeoutThreshold = 1000L;
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
//计算限时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
//计算自旋次数,
// 非头节点不自旋。
// 头节点则自旋等待对应的生产者或消费者
// 配置超时时间的,maxTimedSpins = cpu核数小于2则不自旋,否则自旋32次
// 未配置超时时间的,自旋 maxTimedSpins * 16
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel(e);//当前生产者或消费者被中断,则cancel节点
Object x = s.item;//获取节点数据
if (x != e)
//生产者节点则item被修改为null
//消费者节点则item被修改为生产数据
//cancel则item被修改为节点引用地址
return x;
if (timed) {//指定超时时间
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {//超时到达,cancel节点
s.tryCancel(e);
continue;
}
}
if (spins > 0)
--spins;//自旋
else if (s.waiter == null)
s.waiter = w;//配置等待线程
else if (!timed)//无超时时间,则一直休眠等待
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);//限时休眠,等待唤醒
}
}
1.4.6 clean()
void clean(QNode pred, QNode s) {
s.waiter = null; // forget thread
while (pred.next == s) { // Return early if already unlinked
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
if (hn != null && hn.isCancelled()) {
//从头节点开始检查cancel节点,若cancel则释放节点
advanceHead(h, hn);
continue;
}
QNode t = tail; // Ensure consistent read for tail
if (t == h)//队列已空,则返回
return;
QNode tn = t.next;
if (t != tail)
continue;
if (tn != null) {//更新tail为真实的尾节点
advanceTail(t, tn);
continue;
}
if (s != t) { // 非尾节点,断开链表连接
QNode sn = s.next;
if (sn == s || pred.casNext(s, sn))
return;
}
//待删除节点是尾节点,cleanMe有待删除节点则处理删除,
// 否则放入cleanMe延迟处理删除
//cleanMe表示清理我的next节点
QNode dp = cleanMe;
if (dp != null) { // Try unlinking previous cancelled node
QNode d = dp.next;//待清理节点
QNode dn;
if (d == null || // d is gone or
d == dp || // d is off list or
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced
casCleanMe(dp, null);//已清理完成
if (dp == pred)
return; // s is already saved node
} else if (casCleanMe(null, pred))
return; // Postpone cleaning s
}
}
1.5 TransferStack
- 非公平消费,新请求不入队,直接先消费模式不一样的head节点。
1.5.1 transfer
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // 链表为空,或和head模式相同,则入队
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled()) //清理cancelled节点
casHead(h, h.next); // pop cancelled node
else
return null;//直接返回null
} else if (casHead(h, s = snode(s, e, h, mode))) {//插入头节点
SNode m = awaitFulfill(s, timed, nanos);//休眠等待目标节点。request等待data,data等待request类型
if (m == s) { // wait was cancelled
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // 修改next节点为头节点
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { //模式不同,且head节点未被消费,则待插入节点尝试消费head
if (h.isCancelled()) //cancelled节点则清理
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {//插入节点,尝试消费head
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
if (m == null) { // 链表为空,清理i
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {//尝试消费,消费成功,删除两个节点
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // 并发其他已消费,则只删除被消费的节点
s.casNext(m, mn); // help unlink
}
}
} else { // head已被消费,则删除head和消费head的next节点
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // 删除消费head的节点和head
else // lost match
h.casNext(m, mn); // 下一节点不是消费head的,则删除
}
}
}
}
二 LinkedTransferQueue
- 数据处理
private E xfer(E e, boolean haveData, int how, long nanos)
,所有入队,出队都调用这个函数
- e有值生产数据,haveData=true。e为null表示消费数据,haveData=false。
- how表示xfter的模式,有四种。
// for untimed poll, tryTransfer,节点成功消费或消费失败
private static final int NOW = 0;
// for offer, put, add。节点存储到队列中,可以后续处理
private static final int ASYNC = 1;
// for transfer, take。无不同模式则休眠或自旋等待。
private static final int SYNC = 2;
// for timed poll, tryTransfer。指定限时时间休眠等待,超时在处理失败。
private static final int TIMED = 3;
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))//参数冲突错误
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
//遍历链表,进行处理
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
Object item = p.item;
//(item != null) == isData 数据节点未消费
//item != p 数据节点未释放
if (item != p && (item != null) == isData) { // unmatched
if (isData == haveData)
break;// 和新节点模式一样,都是生产或消费数据
if (p.casItem(item, e)) { // 修改item值,消费节点,
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
//修改头节点,释放消费q.isMatched()的原头节点
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
// 节点数<=1,或q未被消费则不再继续处理
break;
}
LockSupport.unpark(p.waiter);//唤醒线程
return LinkedTransferQueue.<E>cast(item);//返回数据
}
}
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, haveData);
Node pred = tryAppend(s, haveData);//节点入队列尾
if (pred == null)//有并发竞争失败,或有相反模式节点可消费
continue retry; // lost race vs opposite mode
if (how != ASYNC)//同步或限时则线程休眠等待
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // NOW或异步类型立即返回
}
private Node tryAppend(Node s, boolean haveData) {
for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
if (p == null && (p = head) == null) {
if (casHead(null, s))//空链表
return s; // initialize
}
else if (p.cannotPrecede(haveData))
return null; // 模式相反且未被消费,返回null重新处理
else if ((n = p.next) != null) // not last; keep traversing
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
else if (!p.casNext(null, s))//遍历直到尾节点,插入尾节点
p = p.next; // 插入失败则重新获取nexit节点,准备插入
else {//插入成功
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&//更新tail节点
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
return p;
}
}
}
final boolean cannotPrecede(boolean haveData) {
boolean d = isData;
Object x;
return d != haveData && (x = item) != this && (x != null) == d;
}
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = -1; // initialized after first item and cancel checks
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
Object item = s.item;
if (item != e) { // 节点已被消费
// assert item != s;
s.forgetContents(); // 释放数据引用
return LinkedTransferQueue.<E>cast(item);//返回数据
}
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) { // 中断或超时则节点cancel
unsplice(pred, s);
return e;
}
//自旋等待或休眠等待
if (spins < 0) { // establish spins at/near front
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
}
else if (spins > 0) { // spin
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield
}
else if (s.waiter == null) {
s.waiter = w; // request unpark then recheck
}
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
else {
LockSupport.park(this);
}
}
}
网友评论