概述
引入
在进行多线程编程时经常会有进行线程间的数据传递需求,一种是通过定义一个线程安全的共有变量实现,也可以通过已经封装好的类来进行数据传递。我们主要介绍BlockingQueue
的一个子类——SynchronousQueue
。
摘要
因为在BlockingQueue
中介绍了阻塞队列的使用注意事项,本文主要介绍SynchronousQueue
的实现原理。要点如下:
- 源码解读
- 内部接口类
Transfer
的定义及思考 - 内部类
TransferStack
实现原理解析 - 内部类
TransferQueue
实现原理解析
- 内部接口类
- 整体思路梳理
- 类使用场景简介
- 总结
类介绍
类定位
BlockingQueue
是一个线程安全的队列,线程安全的集合有很多,比如Collections
下面通过装饰器模式搞的那一大堆线程安全的类。BlockingQueue
的特点是它针对队列中的队满入队和空队列出队情况进行了更方便的封装,尤其是阻塞等待的策略,使其在进行线程协调的场景中使用更加方便【因为多线程操作线程安全队列时我们没必要关心队列中的元素数量,取值的关注取值,放值的关注放值即可】
而SynchronousQueue
在BlockingQueue
的实现中显得更为偏激——它虽然进行了线程角色【存/取】的区分和数据、线程的缓存【缓存数据是用来传递,缓存线程是用来控制阻塞唤醒】,但是这些缓存队列/栈都是借助内部类实现时用的,类本身并没有维护任何队列,所以SynchronousQueue
只是协调数据交换线程之间的关系,同步队列中没有任何容量,同步队列长度啥的都没有
注意
- 此队列中的对象不能为空
- 此队列也提供了公平、非公平两种模式,非公平模式使用栈,公平模式使用队列。
源码解读
常量值的确定
/** CPU 核数【和自旋数量有关】 */
static final int NCPUS = Runtime.getRuntime().availableProcessors();
/**
* 限时操作阻塞前自旋的次数,单核就不循环等待了,直接阻塞
*/
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
/**
* 不限时操作阻塞前自旋的次数,因为限时操作要判断时间是否超时,这样会占用一定时间,为了平衡,不限时操
* 作就多循环几圈
*/
static final int maxUntimedSpins = maxTimedSpins * 16;
/**
* 自旋阈值,等待时常超过此值就阻塞等待,不再自旋
*/
static final long spinForTimeoutThreshold = 1000L;
Transferer
类介绍
此类定义了SynchronousQueue
内部类的统一api,用于处理数据的传输。
类实现思路介绍
传输方法只定义了一个,通过入参的是否为空来确定操作的意图是存还是取,不管是存还是取,当一个存线程和一个取线程配对时,双方的transfer
方法返回的对象都是存线程传进去的对象。这样,就完成了对线程间变量 的传递
源码分析
abstract static class Transferer<E> {
/**
* 子类实现此方法时根据入参进行数据存/取操作
*
* @param e 非空: 进行数据存操作
* 空: 进行取操作
* @param timed 操作等待时常是否有要求
* @param nanos 如果等待时常有要求,要等待的纳秒数
* @return 非空: 存/取操作完成并返回双方传输的数据
* 空: 存/取操作失败,可能是超时或者线程中断。可以通过 Thread.interrupt 进行检测
*/
abstract E transfer(E e, boolean timed, long nanos);
}
TransferStack
类介绍
此类实现了上面的Transfer
接口,通过使用栈对操作线程及数据进行缓存,从而实现先进后出的效果,此模式是非公平的,但是相应的,数据吞吐量较大。
类实现思路介绍
其实核心方法就是一个transfer
。思路如下:
- 此模式采用栈进行信息缓存,定义了一个
SNode
可以同时缓存传输对象、线程、操作角色【存/取】、是否已配对- 新来一个对象时:
- 如果可以入栈等待【栈空或者栈顶是相同角色的东西】就入栈然后“等待-等待结束-善后”
- 如果不能入栈【栈顶的可以互补】直接搞起
- 如果不能入栈【栈顶的已经是配对过的,出栈一对,然后重新循环】
源码分析
/** 双栈????? */
static final class TransferStack<E> extends Transferer<E> {
/* SNode的模式 */
/** 和配对无关的节点模式 */
/** 消费者节点【int 末位 0】 */
static final int REQUEST = 0;
/** 生产者节点【int 末位 1】 */
static final int DATA = 1;
/** 判断节点是否已经配对【int 右数第二位 0:未配对 1:配对】 */
static final int FULFILLING = 2;
/** 判断节点是否配对 */
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
/** SNode 定义 */
static final class SNode {
volatile SNode next; // 栈中的下一个节点
volatile SNode match; // 此节点配对的节点
volatile Thread waiter; // 此节点对应的线程,用来控制阻塞/唤醒
Object item; // 传递的数据【生产者线程的节点有值】
int mode; // 此节点的模式 【和配对无关,只是标识生产者/消费者】
// 这里不使用volintile的原因,我们在后面详细介绍JMM后再回头看
SNode(Object item) {
this.item = item;
}
boolean casNext(SNode cmp, SNode val) {
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
/**
* 尝试将入参的节点和本节点配对
*
* @param s 用来配对的节点
* @return 如果配对成功则返回true
*/
boolean tryMatch(SNode s) {
// 此节点还没配对且配对成功,就唤醒此节点对应的线程
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
// 配对失败,看看是不是因此此节点已经和入参节点配对过了
return match == s;
}
/**
* 通过将配对以引用指向自己来标识取消操作
*
* 使用这种约定是为了方便垃圾回收
* 毕竟在 match = null 我们赋予的意义是还没配对,没别的方法了
*/
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
boolean isCancelled() {
return match == this;
}
// Unsafe 日常使用
private static final sun.misc.Unsafe UNSAFE;
private static final long matchOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = SNode.class;
matchOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("match"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
/** 栈顶 ,没有假节点 */
volatile SNode head;
boolean casHead(SNode h, SNode nh) {
return h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}
/**
* 为了减少重新创建节点的次数,减少时间开销而使用的方法
*/
static SNode snode(SNode s, Object e, SNode next, int mode) {
if (s == null) s = new SNode(e);
s.mode = mode;
s.next = next;
return s;
}
/**
* Puts or takes an item.
*/
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
/*
* 情况分三种:
*
* 1. 如果栈为空或者栈顶明显的和此节点是同一个模式的,就入栈等待,根据返回的结果再做善后
*
* 2. 如果明显的栈顶是另一个模式的点,且尝试配对成功,那么原栈顶的match已经完成记录,不用再
* 管,此节点模式设置为此模式的已配对状态即可,然后继续入栈。然后这两个节点一起出栈并返回
* 【最后一步可以不做,因为第三种情况就是专门做这个的】
*
* 3. 如果栈顶是已配对节点,那么出栈一对节点然后重新循环
*/
SNode s = null; // 可能循环操作多次,所以这个节点就循环利用了
int mode = (e == null) ? REQUEST : DATA; // 根据入参获得节点的模式生产/消费者
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // 情况1,入栈等待
if (timed && nanos <= 0) { // 等不了
if (h != null && h.isCancelled()) // 如果可以的话就把栈顶的节点出掉
// 其实我们可以不做这一步的,毕竟我们还没入栈,可能是考虑到多线程
// 吧,看到有垃圾随手清了
casHead(h, h.next); // 无效节点出栈
else // 栈顶节点不用清除,那就直接返回吧
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {// 可以等
SNode m = awaitFulfill(s, timed, nanos); // 等
if (m == s) { // 此节点已被取消
clean(s);
return null;
}
if ((h = head) != null && h.next == s) // 此节点正好是栈顶下一个节点
// 结合上面此节点已配对,说明此节点和栈顶节点配对的,直接出栈一对节点
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item); // 返回传递的对象
}
} else if (!isFulfilling(h.mode)) { // 情况2,配对
if (h.isCancelled()) // 被取消了,没法配对了
casHead(h, h.next); // 出栈重新循环看看是哪种情况
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
// 此节点以 已配对 的模式入栈
for (;;) { // 循环直到配对
SNode m = s.next; // 从新栈顶的下一个节点开始【也就是老栈顶】
if (m == null) { // 没了?可能是多线程的锅,
casHead(s, null); // 把本节点出栈吧
s = null; // ????没必要吧
break; // 重新循环
}
SNode mn = m.next;
if (m.tryMatch(s)) { // 配对成功
casHead(s, mn); // 俩节点一起出栈
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // 配对失败
// 结合上面:
// 1. h 和 s 是互补的
// 2. 使用 cas ,说明 s 入栈时中间没有节点加塞
// 3. match() 失败
// 综上,说明一个问题: m 过期了......
// 其实这里也有一个极端情况,栈中只有一对节点,后一个节点入栈后发现前
// 一个节点过期了,然后 next 变成了 null , 正好就是上面我们纠结的
// m == null
s.casNext(m, mn);
}
}
} else { // 栈顶是已经配对节点
SNode m = h.next;
if (m == null) // 和上面的 m== null 一个道理
casHead(h, null); // 清空栈
else {
SNode mn = m.next;
if (m.tryMatch(h)) // 找到一对
casHead(h, mn); // 出栈
else // 他俩不是一对,和上面的分析差不多,过期了呗
h.casNext(m, mn); // 把节点清除出去
}
}
}
}
/**
* 等待操作
*
* @param s 要等的节点
* @param timed
* @param nanos
* @return 配对的节点,如果是s,表示节点被取消
*/
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
/*
* 等待的思路:
* 1. 设置 waiter 然后自旋n次
* 2. 还没等到,而且可以继续等,那就根据情况进行线程阻塞
*/
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted()) // 看线程是否被打断,根据情况取消
s.tryCancel();
SNode m = s.match;
if (m != null) // 已经配对或者取消,返回结果
return m;
if (timed) { // 如果有时间限制,则看情况取消
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
if (spins > 0) // 还需要循环等待就等
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null) // 多走了一次循环用来设置 waiter ..... 感觉没必要
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
/**
* 如果 s 节点需要自旋等待,就返回 true
*
* 判断节点是否要自旋的依据:
* 根据栈的情况进行猜测此节点是否很容易就配对退出
* 因为线程的阻塞唤醒有较大的开销,所以在预判等待时间较少时就使用自旋
*/
boolean shouldSpin(SNode s) {
SNode h = head;
return (h == s || h == null || isFulfilling(h.mode));
// 节点为栈顶,没得说,肯定容易满足
// 栈空,这里不是条件,我的理解是这里是为了利用或的短路来防止下一个条件出现空指针
// 栈顶已配对,说明要有一批节点出栈了,可能很快就轮到 s 了
}
/**
* s 节点从栈中出来,如果不能常规的从栈中出来,就遍历修改 next 是其出来
*/
void clean(SNode s) {
s.item = null; // forget item
s.waiter = null; // forget thread
// 找到 s 后面有效的节点
SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;
// 从栈顶开始清除废节点
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);
// 没清除完,表示在 s 和栈顶之间有有效节点,那就用极端方法去修改 next 了
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}
// Unsafe 日常操作
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = TransferStack.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
} catch (Exception e) {
throw new Error(e);
}
}
}
TransferQueue
类介绍
此类实现了上面的Transfer
接口,通过使用队列对操作线程及数据进行缓存,从而实现先进先出的效果,此模式是相对公平的。
类实现思路介绍
和上面的TransferStack
目的相似,但是在代码风格上不同,很明显的一个因素就是在表示节点的状态时没有使用一致的标识。transfer
方法是最重要的方法。
源码分析
/** 双队列???? */
static final class TransferQueue<E> extends Transferer<E> {
/** 节点定义 */
static final class QNode {
volatile QNode next; // 后继
volatile Object item; // 用来传递的数据
volatile Thread waiter; //线程
final boolean isData; // 用来区分生产者/消费者
// 【这里赘余了不是,item就可以做到的。上面的 SNode 我没话说是因为人家 mode 不仅标识
// 生产者和消费者,还标识了是否已经配对,没办法所以加了一个变量】
// 这里少了一个 match 字段,因为配对算法的不同,后面详述
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/**
* 取消,
* QNode 是将 item 设置成自己
* SNode 是将 match 设置成自己
* 不同的约定而已,没啥
*/
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
boolean isCancelled() {
return item == this;
}
/**
* 判断是否脱离了链表,通过将 next 指向自己,
*
* 没的选,因为:
* null 用来标识队尾了
*/
boolean isOffList() {
return next == this;
}
// 日常 Unsafe
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
/** 队头 */
transient volatile QNode head;
/** 队尾 */
transient volatile QNode tail;
/**
* 指向一个已经过期的节点的前驱节点,因为这个节点是队尾,所以不好清理,就先标记了
*/
transient volatile QNode cleanMe;
TransferQueue() {
QNode h = new QNode(null, false); // 假队头
head = h;
tail = h;
}
/**
* 设置队头
*/
void advanceHead(QNode h, QNode nh) {
if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
}
/**
* 设置队尾
*/
void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
/**
* 设置 cleanMe
*/
boolean casCleanMe(QNode cmp, QNode val) {
return cleanMe == cmp &&
UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}
/**
* 存/取数据
*/
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
/*
* 情况分两种:
* 1. 不能立刻配对,那就入队等待
* 2. 可以立刻配对,那就出队配对
*
*/
QNode s = null; // 重用
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // 未初始化,等待完成初始化
// 【这里我看不懂注释,我们留到后面,阅读了JMM再看吧】
// This never happens in current SynchronousQueue, but could if
// callers held non-volatile/final ref to the transferer.
continue;
if (h == t || t.isData == isData) { // 不能立刻配对,进队
// 注意,这里判断 h.isData/t.isData 都一样,因为队列中的 isData 都是一样的
// 【结合我们设计的队列来看】
QNode tn = t.next;
if (t != tail) // 队列已被改变,重新判断
continue;
if (tn != null) { // 队尾指向不准确,更新队尾
// 这里做的还可以,因为在队尾插入节点分两步: 设置 next,设置 tail
// 它只认 next ,tail 配合 next 做修正
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // 不能等,直接返回 null ,表示失败
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // 进队失败,说明没竞争到,重新判断
continue;
advanceTail(t, s); // 修正 tail ,修正失败也没关系,因为可能是
// 其他节点入队发现不一致,更新了 tail
Object x = awaitFulfill(s, e, timed, nanos); // 阻塞等待
if (x == s) { // 被取消了
clean(t, s);
return null;
}
if (!s.isOffList()) { // 没被取消,但是配对了,结合“先进先出”的规则
// 说明它排上了,修改队头,出队
advanceHead(t, s); // 让它做队头的假节点
if (x != null) // 再次确认 s 是配对了??????
s.item = s; // 是它无效化
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { // 可以配对,搞起搞起!!!
QNode m = h.next; // 从队头拿个节点
if (t != tail || m == null || h != head)
continue; // 队列已改变,重新读取
Object x = m.item;
// 和取出的节点配对
if (isData == (x != null) || // 队头的 isData 和我们的一样
x == m || // 队头的节点已被取消【或者是配对后它自己
// 设置成无效化了】
!m.casItem(x, e)) { // 竞争队头节点失败
advanceHead(h, m); // dequeue and retry
continue;
}
// 配对成功,出队
advanceHead(h, m); // successfully fulfilled
// 唤醒队头节点的线程
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
/**
* 等待
*
* @param s 要等待的节点
* @param e 要传递的值
* @param timed
* @param nanos
* @return 配对对象的 item ,如果取消了,就返回 s
*/
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* 和 TransferStack.awaitFulfill 一样 */
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted()) // 中断就取消
s.tryCancel(e);
Object x = s.item;
if (x != e) // item 已经修改。或者是配对完成设置成了配对的item,
//或者是被取消了设置成了自己 s
return x;
if (timed) { // 如果有时间限制就根据情况取消
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
if (spins > 0)
--spins;
else if (s.waiter == null) // 设置线程
s.waiter = w;
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
/**
* 清除已经取消的节点
*/
void clean(QNode pred, QNode s) {
s.waiter = null; // forget thread
/*
* 如果删除不了就把这个节点的前驱标记
*/
while (pred.next == s) {
QNode h = head;
QNode hn = h.next;
if (hn != null && hn.isCancelled()) { // 先从队头清理一波
advanceHead(h, hn);
continue;
}
QNode t = tail; // Ensure consistent read for tail
if (t == h) // 队空
return;
QNode tn = t.next;
if (t != tail) // 队列变化,重新读取
continue;
if (tn != null) { // 队尾不一致,根据 next 进行修正
advanceTail(t, tn);
continue;
}
if (s != t) { // 要删除的节点不是队尾,直接删
QNode sn = s.next;
if (sn == s || pred.casNext(s, sn))
return;
}
// 要删除的节点是队尾
QNode dp = cleanMe; // dp --> delete presuccess
if (dp != null) { // 旧的 cleanMe 有指向,先清除旧的
QNode d = dp.next; // d --> delete
QNode dn; // dn --> delete next
// 前面一堆或,可以理解为 原来要删除的节点做一些预判断
// 防止要删除的节点已经被删掉了
if (d == null ||
d == dp ||
!d.isCancelled() ||
(d != t &&
(dn = d.next) != null &&
dn != d &&
dp.casNext(d, dn))) // 做删除操作
casCleanMe(dp, null); // 清除 cleanMe
if (dp == pred) // 旧的 cleanMe 标记的就是我们要新标记的节点,直接返回即可
return;
} else if (casCleanMe(null, pred)) // 标记,返回,等待后续清理
return;
}
}
// 日常 Unsafe
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
private static final long tailOffset;
private static final long cleanMeOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = TransferQueue.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("tail"));
cleanMeOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("cleanMe"));
} catch (Exception e) {
throw new Error(e);
}
}
}
使用示例
核心逻辑介绍
其实我们主要分析内部类的实现即可,TransferStack
和TransferQueue
使用的思路是两种,当然我说的不是栈和队列的区别:
TransferStack
配对操作是时机合适时通过抢先在栈顶压一个配对节点来搞定的,当然,进栈后顺着栈的链表慢慢找配对节点TransferQueue
配对操作是时机合适时通过抢先将合适的节点出队来实现抢占节点配对的
这两者也不好说哪个更好,只是两种思路而已。
使用思路
没啥要特别说的,就原来的“生产者——消费者”就是最好的例子。使用请参见BlockingQueue
的相关介绍。
实现原理解析
上面说的很明白了。
问题
-
在使用栈、队列作为内部类的实现时,注释中提到了
Dual
一词,为什么是双栈、双队列??? -
在阅读
TransferQueue
时,在循环中它一直在检测队列初始化,这里没太看懂,后面学习了JVM后尝试进行一些解读
扩展
BlockingQueue
还有其他的一些子类:ArrayBlockingQueue
,LinkedBlockingDeque
。先不急着读吧,感觉大同小异,后面需要时再说。
网友评论