博客思路介绍
concurrent
框架的思路在AQS
中有不少体现。所以我们打算着重记录一下,记录的思路如下:
- 引入及介绍
AQS
队列通用的方法 - 介绍
AQS
预提供的各种和锁获得、释放相关的方法;及暴露出来的用来重写的方法。 - 介绍队列相关的监控方法
- 介绍
Condition
相关的方法 - 扩展、总结及展望
本文主要介绍AQS
预提供的方便实现Condition
的方法,AQS
还提供了Condition
的一个实现类等待子类根据各自的情况去完善。
支持条件同步队列的方法
判断节点是否在同步队列中
源码
/**
* 如果一个最初放在条件同步队列上的节点现在正在同步队列上等待重新获取就返回true
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
// 如果位于条件同步队列中或者不在任何队列中【废节点】就返回false
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 它一定在队列中,结合上面的条件,说明是在同步队列中
if (node.next != null)
return true;
/*
* 我们上面还有一种情况没考虑:enq() 进队进一半,所以我们从队尾往前再找一
* 遍确认一下
*/
return findNodeFromTail(node);
}
思路
之前我们介绍过一个public final boolean isQueued(Thread thread)
方法,用来判断线程是否在队中。
这里我们只判断给定的节点是否在同步队列中,要求
- 是此节点
- 同步队列
判断节点是否在同步队列中【实现】
源码
/**
*
* 只被 isOnSyncQueue 调用
* @return 找到了就返回 true
*/
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
思路
之前我们碰到过一个例子,就一个地方用,但是还是给抽出来了一个函数。唉,知道啥意思就行,别计较太多。
节点从条件(Condition
)同步队列转换到同步队列
源码
/**
* 将一个节点从条件同步队列转到同步队列
* @param node the node
* @return 转化成功就返回 true (如果失败,一般情况下是节点被取消了)
*/
final boolean transferForSignal(Node node) {
/*
* 如果设置 ws 失败了, 估计是被取消了,然后 ws 被设置成了 CANCELLED
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* 将node重新进入同步队列,并将他的前驱设置成 SIGNAL ,如果设置失败就唤醒此节点重新同步
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
思路
为什么要重新进一次队列
条件同步队列和同步队列在数据结构上是两个队列
取消等待后将节点移动到同步队列
源码
/**
* 条件同步队列中取消等待的节点转移至同步队列中。
*
* @param node the node
* @return 如果节点被取消了就返回 true
*/
final boolean transferAfterCancelledWait(Node node) {
/**
* 如果可以正常修改状态的话,直接修改状态重新进队列
*/
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* 反查确认一下
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
思路
感觉这个transferAfterCancelledWait
和transferForSignal
的最大区别就是加入同步队列后要不要手动设置一下前驱,让前驱唤醒自己
释放节点,返回同步状态
源码
/**
* 唤醒节点,返回节点之前的状态。唤醒此节点后将此节点作废(取消)
*
* @param node the condition node for this wait
* @return previous sync state
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
思路
条件同步队列检测方法
判断当前AQS
是决定Condition
的安排
源码
/**
* 查询入参的 ConditionObject 是否是由当前的 AQS 锁得出的
*
* @param condition the condition
* @return {@code true} if owned
* @throws NullPointerException if the condition is null
*/
public final boolean owns(ConditionObject condition) {
return condition.isOwnedBy(this);
}
思路
判断条件同步队列中是否有阻塞线程
源码
/**
* 如果条件同步队列中有等待者就返回 true, 本方法返回的结果用于检测队列情况,不要用此结果来做一些队列
* 操作的判断。
*
* 从实现上来看,本方法仅作了非法前置条件检查,实现上是依赖入参的 hasWaiters() 方法的
*
* @param condition the condition
* @return {@code true} if there are any waiting threads
* @throws IllegalMonitorStateException if exclusive synchronization
* is not held
* @throws IllegalArgumentException if the given condition is
* not associated with this synchronizer
* @throws NullPointerException if the condition is null
*/
public final boolean hasWaiters(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.hasWaiters();
}
思路
获得条件同步队列的长度
源码
/**
* 返回条件同步队列中等待的线程的数量的估计,此估计仅用于队列状态的统计和监控,不要用作队列操作的判断
* 依据。
*
* 依赖入参的 ConditionObject 的方法实现
*
* @param condition the condition
* @return the estimated number of waiting threads
* @throws IllegalMonitorStateException if exclusive synchronization
* is not held
* @throws IllegalArgumentException if the given condition is
* not associated with this synchronizer
* @throws NullPointerException if the condition is null
*/
public final int getWaitQueueLength(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitQueueLength();
}
思路
获得条件同步队列中所有线程
源码
/**
* 依赖入参的 ConditionObject 的方法实现,功能啥的没啥说的
*
* @param condition the condition
* @return the collection of threads
* @throws IllegalMonitorStateException if exclusive synchronization
* is not held
* @throws IllegalArgumentException if the given condition is
* not associated with this synchronizer
* @throws NullPointerException if the condition is null
*/
public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitingThreads();
}
思路
内部类ConditionObject
类基本介绍
ConditionObject
是Condition
接口的一个实现类,他和一个AQS
相关联,是用来作为一个Lock
的实现的。
我们这里主要介绍机制的运作,不根据Lock
和Condition
的使用去专门介绍,我们后面解读具体的实现类时在介绍那些东西。
这个类实现了序列化和反序列化接口,但是所有的字段都是transient
,反序列化得到的对象字段都是空的。
内部方法概览
ConditionObject
实现了
-
Condition
接口定义的所有方法。【wait/signal
那一大堆各种各样的方法】 - 条件队列的一些基本操作方法
总体比较及总结
通过阅读ConditionObject
的源码,我们知道在ConditionObject
中维护了一个条件同步队列,并实现了此队列的插入删除取消等方法,以及将节点从CondititonObjcet
的条件同步队列转移至外面AQS
类的同步队列。
ConditionObject
类采用的是单向队列没有很多难懂的东西,整体较简单。
源码
public class ConditionObject implements Condition, java.io.Serializable {
/**
* 自定义 serialVersionUID 方便控制升级时的兼容性问题
*/
private static final long serialVersionUID = 1173984872572414699L;
/** 相当于 AQS 的header ,只是没有假的头部节点,而且时单向队列*/
private transient Node firstWaiter;
/** 相当于 AQS 的tail */
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
// 内部实现,主的逻辑实现
/**
* 入队列,相当于 AQS 的 addWaiter()
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果明显的发现队列中有取消的点,先清除一遍
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 然后就是一样的套路了
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
/**
* 唤醒一个等待的线程,如果队列中没线程,就啥都不做
* @param 队列中的第一个等待的线程
*/
private void doSignal(Node first) {
// 我们一般不修改入参,但是这里改了就改了吧,不追究
// 这里把 first 直接当遍历找有效等待节点的中间变量用了
do {
// 头部标记向后移动
if ( (firstWaiter = first.nextWaiter) == null)
// 如果 first 是最后一个节点,把头尾指向记成null
lastWaiter = null;
//将 first 置空摘下来
first.nextWaiter = null;
// 如果 transferForSignal(first) 将线程从条件同步队列放到同步队列成功,就退出循环
// 如果失败了则尝试转移下一个节点
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* 唤醒队列中所有等待的线程,相当于直接把条件同步队列清空
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
// 先清空标记,防止一个一个清时再有进队请求
lastWaiter = firstWaiter = null;
// 这个代码很简单,就是一直搬运,把节点遍历一遍,全搬到同步队列中
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
/**
* 将取消的等待节点从条件同步队列中移除。这个方法在
* 1. 等待节点的取消时调用
* 2. 新进节点发现明显的队列中有取消的节点时调用【队未节点为 CANCELLED】
*
* 此方法可以在没有线程唤醒的情况下清除垃圾【考虑条件同步队列,唤醒的速度要比同步队列慢】
*
*
*/
private void unlinkCancelledWaiters() {
// 顺序遍历,摘掉所有已取消的节点
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
// 实现的公共方法
/**
* 把等待时间最长的线程唤醒来竞争锁
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/**
* 唤醒所有线程来竞争锁
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
/**
* 不可打断式等待:
*
* 我们先将线程入队条件同步队列,然后将线程从同步队列中释放出来,然后阻塞线程,直至线程被放回到同
* 步队列中
*
* ConditionObject 的思路很简单,只是安排获得锁和线程的阻塞问题,如果线程被唤醒,不管,直接移
* 动到 AQS 中, AQS 会协调线程的执行。
*/
public final void awaitUninterruptibly() {
// 线程入条件同步队列
Node node = addConditionWaiter();
// 将线程从同步队列中释放出来
int savedState = fullyRelease(node);
boolean interrupted = false;
// 阻塞线程,只要节点还在条件同步队列中,就表示线程还在等待锁
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
// 线程从条件同步队列转移至同步队列了,直接调用同步队列的排队获得锁即可
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
/*
* 在可打断的阻塞中,我们需要判断如何处理中断。是抛出异常还是记录一下继续执行
*/
/** 记录一下继续执行 **/
private static final int REINTERRUPT = 1;
/** 抛出异常 */
private static final int THROW_IE = -1;
/**
* 在等待是检测中断
*
* 返回0表示没有中断发生
*
* 这里如果线程被打断过,我们尝试将节点从条件同步队列移动至同步队列。
* 如果移动成功,表明该线程在唤醒前移动了,返回 THROW_IE
* 如果移动失败,表明线程在移动前唤醒了,返回 REINTERRUPT
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
/**
* 根据 interruptMode 做对应的操作
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
/**
* 实现阻塞
*
* 1. 如果线程被中断,直接抛出异常
* 2. 保存锁的状态
* 3. 将其从同步队列释放并转移至条件同步队列
* 4. 在条件同步队列中阻塞直至被中断或者获得锁,然后转移至同步队列中
* 5. 在同步队列中排队唤醒线程
* 6. 如果在4中线程被打断了,根据 interruptMode 做对应的响应
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 因为已经是获得锁的状态,所以不用移除,直接往条件同步队列加就行
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 自旋检测是否发生了中断, 这里使用自旋是
// 没发生返回 0
// 发生了根据将线程搬运回同步队列的结果返回对应的值。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 在同步队列中唤醒成功
// 这里还进行了中断状态的防呆处理,应该是怕子类在复写方法时出现 THROW_IE REINTERRUPT
// 之外的状态导致 reportInterruptAfterWait() 出错吧
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 我们将节点转移至同步队列时并没有将其从条件同步队列删除
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 传递打断状态
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* 实现限时阻塞
*
* 1. 如果线程被中断,直接抛出异常
* 2. 保存锁的状态
* 3. 将其从同步队列释放并转移至条件同步队列
* 4. 在条件同步队列中阻塞直至时间到达或者获得锁或者被中断,然后转移至同步队列中
* 5. 在同步队列中排队唤醒线程
* 6. 如果在4中线程被打断了,根据 interruptMode 做对应的响应
*
*/
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
/**
* 实现限时阻塞
*
* 1. 如果线程被中断,直接抛出异常
* 2. 保存锁的状态
* 3. 将其从同步队列释放并转移至条件同步队列
* 4. 在条件同步队列中阻塞直至时间到达或者获得锁或者被中断,然后转移至同步队列中
* 5. 在同步队列中排队唤醒线程
* 6. 如果在4中线程被打断了,根据 interruptMode 做对应的响应
*/
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
/**
* 实现限时阻塞
*
* 1. 如果线程被中断,直接抛出异常
* 2. 保存锁的状态
* 3. 将其从同步队列释放并转移至条件同步队列
* 4. 在条件同步队列中阻塞直至时间到达或者获得锁或者被中断,然后转移至同步队列中
* 5. 在同步队列中排队唤醒线程
* 6. 如果在4中线程被打断了,根据 interruptMode 做对应的响应
*/
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
// 监测用
/**
* 如果本条件同步队列对应的同步队列为入参 AQS ,返回true
*
* @return {@code true} if owned
*/
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}
/**
* 查询条件同步队列中是否有等待的线程【因队列在变,不保证准确】
*
* 是 AQS 中的 hasWaiters(ConditionObject) 方法的实现
*
* @return {@code true} if there are any waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
/**
* 查询条件同步队列的长度【因队列在变,不保证准确】
* 是 AQS 中的 getWaitQueueLength(ConditionObject) 方法的实现
*
* @return the estimated number of waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
/**
* 返回条件同步队列中的所有线程【因队列在变,不保证准确】
* 是 AQS 中的 getWaitingThreads(ConditionObject) 方法的实现
*
* @return the collection of threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}
总结
本文介绍了AQS
中支持同步队列的方法和条件同步队列的方法。
AQS
预提供的条件同步队列的实现很简单,就是一个单向队列而已,我们猜测可能是用到锁的地方不是很普遍,没必要为了队列频繁变化而做出双向队列那么复杂的结构设计。
AQS
的逻辑上的东西到这里就结束了。
网友评论