以下内容是 Condition jdk8 版本的官方翻译
Condition 将对象监视方法(wait,notify 和 notifyAll)分解为不同的对象,从而通过与任意 Lock 实现结合使用,从而使每个对象具有多个等待集。如果 Lock 替换了 synchronized,而 Condition 替换了Object监视器方法的使用。
Condition (也称为条件队列或条件变量)为一个线程暂停执行(“等待”)直到另一线程通知某些状态条件现在可能为真提供了一种方法。由于对该共享信息的访问发生在不同的线程中,因此必须对其进行保护,因此某种形式的锁与该 Condition 相关联。等待条件提供的关键属性是它自动释放关联的锁并挂起当前线程,就像Object.wait一样。
Condition 实例从本质上绑定到锁。要获取特定 Lock 实例的 Condition 实例,请使用其 newCondition()
方法。
例如,假设我们有一个有界缓冲区,它支持 put 和 take 方法。如果尝试在空的缓冲区上执行提取操作,则线程将阻塞,直到有可用项为止。如果尝试在完整的缓冲区上进行放置,则线程将阻塞,直到有可用空间为止。我们希望将 put 和 take 放在单独的等待集中,以便我们可以使用仅当缓冲区中的 item 或空间可用时才通知单个线程的优化。这可以使用两个Condition 实例来实现。(画外音:这个好像也是阻塞队列(BlockQueue)的基础)
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
Condition 实现类可以提供与对象监视器方法不同的行为和语义,例如,保证通知的顺序,或者在执行通知时不需要锁定。如果实现提供了这种特殊的语义,则实现必须记录这些语义。
请注意,Condition 实例只是普通对象,它们本身可以用作 synchronized 的目标,并且可以调用自己的监视器等待和通知方法。获取 Condition 实例的监视器锁或使用其监视器方法与获取与该 Condition 相关联的锁或使用其等待和信令方法没有指定的关系。建议避免混淆,除非在 Condition 的实现类中,否则不要以这种方式使用 Condition 实例。
除非另有说明,否则为任何参数传递null值都会导致引发NullPointerException。
实现类注意事项
当等待条件时,通常会允许“虚假唤醒”,作为对底层平台语义的让步。这对大多数应用程序几乎没有实际影响,因为应该始终在循环中等待一个条件,测试等待状态 state 是否满足条件。一个实现类可以自由地消除虚假唤醒的可能性,但是建议应用程序程序员始终假定它们会发生,因此总是在循环中等待。
Condition 等待的三种形式(可中断,不可中断和定时)在它们在某些平台上的实现容易程度和性能特征上可能有所不同。特别是,可能很难提供这些功能并维护特定的语义,例如排序保证。此外,中断挂起线程的能力可能并不总是在所有平台上都可行。
因此,不需要实现为所有三种等待形式定义完全相同的保证或语义,也不需要支持挂起线程的中断。
需要一个实现类来清楚地记录每种等待方法提供的语义和保证。当实现类确实支持挂起线程中断时,它必须遵守此接口中定义的中断语义。
由于中断通常意味着取消,并且通常不经常进行中断检查,因此与正常方法返回相比,实现类可能更喜欢对中断做出响应。即使可以证明中断是在另一个可能解除线程阻塞的操作之后发生的,也是如此。实现类应记录此行为。
Condition 定义
public interface Condition {
//使当前线程等待,直到发出信号或被中断为止。
void await() throws InterruptedException;
//使当前线程等待,直到发出信号。中断不抛出异常
void awaitUninterruptibly();
//使当前线程等待,直到发出信号或中断它,或者经过指定的等待时间。
long awaitNanos(long nanosTimeout) throws InterruptedException;
//使当前线程等待,直到发出信号或中断它,或者经过指定的等待时间。
//和 awaitNanos 一样,只是可以指定时间单位
boolean await(long time, TimeUnit unit) throws InterruptedException;
//使当前线程等待,直到发出信号或中断它,或者经过指定的期限。
boolean awaitUntil(Date deadline) throws InterruptedException;
//唤醒一个等待线程。
//如果有任何线程在这种情况下等待,则选择一个线程进行唤醒。然后,该线程必须重新获取锁,然后才能从等待返回。
void signal();
//唤醒所有等待的线程
void signalAll();
}
它的主要实现类是 ConditionObject in AbstractQueuedLongSynchronizer。
首先我们看下 ConditionObject 的结构。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
//此条件队列的第一个 Node
private transient Node firstWaiter;
//此条件队列的最后一个 Node
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
...
...
}
以上是 ConditionObject 的构造方法和两个成员变量,也容易理解。
接下来一个个的看下各个实现方法:
await:使当前线程等待,直到发出信号或被中断为止。
public final void await() throws InterruptedException {
if (Thread.interrupted())//如果有中断信号,则抛出中断异常
throw new InterruptedException();
Node node = addConditionWaiter();//把线程放入条件等待队列的最后,并返回 Node
long savedState = fullyRelease(node);//释放锁
int interruptMode = 0;
while (!isOnSyncQueue(node)) {//判断 Node 是否被放入同步队列
LockSupport.park(this);//阻塞,等待被唤醒
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
//判断是否被中断,这里需要判断是否是singal 引起的唤醒还是中断引起的唤醒,所以 checkInterruptWhileWaiting 会返回3中状态。
//0:没有中断
//1:唤醒后被中断
//-1:唤醒前被中断
//需要注意的是:这里的中断后,Node 还是会被放入同步队列
break;
}
//获取锁
//情况1:acquireQueued(node, savedState) == false,interruptMode 不变
//情况2:acquireQueued(node, savedState) == true,interruptMode == 0 变成 1
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//如果不是最后一个 Node,则整理整个条件等待队列
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)//中断处理
reportInterruptAfterWait(interruptMode);
}
//0:没有中断
//1:唤醒后被中断
//-1:唤醒前被中断
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
//这个是 AQS 的方法
//取消节点后,如有必要,转移节点至同步队列。如果线程在发出信号之前被中断,则返回 true。
final boolean transferAfterCancelledWait(Node node) {
//线程在发出信号之前被中断,则返回 true。此时可能线程也被唤醒,如果 CAS 竞争成功,则入队
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);//加入同步队列
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
//线程在发出信号之后被中断,此时可能正在入队
while (!isOnSyncQueue(node))
Thread.yield();//让出CPU
return false;
}
awaitUninterruptibly:使当前线程等待,直到发出信号。中断也不抛出异常,继续等待。
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
可以看到 awaitUninterruptibly 比 await 简单很多,因为它会忽略中断抛出异常这个选项,无论时唤醒前还是唤醒后中断做相同处理。
awaitNanos:使当前线程等待,直到发出信号或中断它,或者经过指定的等待时间。
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())//如果中断,抛出异常
throw new InterruptedException();
Node node = addConditionWaiter();//把当前线程加入到条件等待队列
int savedState = fullyRelease(node);//释放锁
final long deadline = System.nanoTime() + nanosTimeout;//计算出等待结束的时间点
int interruptMode = 0;
while (!isOnSyncQueue(node)) {//Node 是否转移到同步队列
if (nanosTimeout <= 0L) {//nanosTimeout <= 0,直接放入同步队列,不需要唤醒,并退出循环
transferAfterCancelledWait(node);
break;
}
//传入的时间 >= 1ms,则线程挂起 nanosTimeout 。
//如果 nanosTimeout 小于1ms,线程不会阻塞,直到 nanosTimeout <= 0,退出循环
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//如果被中断,则退出循环。
break;
nanosTimeout = deadline - System.nanoTime();//唤醒后,重新计算时间差
}
//以下和 await 方法一模一样
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
从代码分析看,等待时间结束和正常被别的线程唤醒都有可能转移到同步队列。但一般超时等待不要使用唤醒,唤醒后还是会等待,直到时间超时。
await(long time, TimeUnit unit):和 awaitNanos 一模一样,只是多了个时间单位,还有会返回一个 boolean 值,false:时间超时退出循环,true:其他原因获取退出循环,例如中断,被别的唤醒。
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);//转换成纳秒
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);//就算执行到这里也有可能被别的线程唤醒,此时 timeout = false
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;//返回 boolean 值
}
awaitUntil:使当前线程等待,直到发出信号或中断它,或者经过指定的期限。
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);//使用 parkUntil
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
- 和 awaitNanos 和 await(long time, TimeUnit unit) 不同的是 awaitUntil 使用了 LockSupport 的 parkUntil 而不是 parkNanos,其他操作和另外两个方法一样。
- 返回一个 boolean 值。
signal:唤醒一个等待线程。采用的唤醒方式是,按队列排序唤醒第一个。FIFO
public final void signal() {
if (!isHeldExclusively())//判断当前线程是否获取锁
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);//遍历节点
}
private void doSignal(Node first) {
do {
//把第一个的下一个置为 第一个,如果整个条件队列只剩下一个,则把 lastWaiter 置为 null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//如果 transferForSignal 返回 false 且下一个 Node 不为 null,则继续唤醒下一个 Node,直到成功唤醒或没有 Node 为止
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//翻译:如果CAS失败,说明线程被中断,Node 由自己的线程放入同步队列
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);//放入同步队列,返回前继 Node
int ws = p.waitStatus;获取前继 Node 的 waitStatus
//如果前继 Node 被取消或 CAS 设置前继 Node 的 waitStatus = SIGNAL 失败则立刻唤醒 Node
//其实我觉得直接唤醒也没问题,只是这样做更节省CPU资源吧
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
为什么前继 Node 被取消或 CAS 设置前继 Node 的 waitStatus = SIGNAL 失败,需要立刻唤醒 Node?
我的理解是出现这种情况说明同步队列发生了一些错误,唤醒后让线程尝试获取锁,也可以在shouldParkAfterFailedAcquire 方法种修复这种错误,比如前继 Node 被取消,shouldParkAfterFailedAcquire 可以把 Node 前面已取消的节点全部剔除掉。
signalAll:唤醒全部。按队列顺序逐一唤醒。
public final void signalAll() {
if (!isHeldExclusively())//判断当前线程是否获取锁
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;//去除引用,帮助GC
transferForSignal(first);//加入同步队列
first = next;//把下一个当作第一个
} while (first != null);
}
好了,到这里 Condition 基本讲完了。以上都是个人理解,如果由不对的地方,请提醒我纠正。
网友评论