Conditon接口与示例
Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象创建出来的,换句话说,Condition是依赖Lock对象的。
Condition的使用方式比较简单,需要主语在调用方法前获取锁,如下所示:
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionAwait() throws InterruptedException {
lock.lock();
try {
condition.await();
} finally {
lock.unlock();
}
}
public void conditionSignal() throws InterruptedException {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}
如示例所示,一般都会将Condition对象作为成员变量。当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才会从await()方法返回,并且在返回前已经获取了锁。
Condition接口定义的方法如下:
方法名称 | 描述 |
---|---|
void await() throws InterruptedException | 当前线程进入等待状态直到被通知或中断,当前线程进入运行状态且从await()方法返回的情况包括: 1.其他线程调用Condition的signal()或signalAll()方法,而当前线程被选中唤醒。 2.其他线程中断当前线程。 如果当前等待线程从await()方法返回,那么表面该线程已经获取了Condition对象所对应的锁 |
void awaitUninterruptibly() | 当前线程进入等待状态直到被通知,从方法名称可以看出该方法对中断不敏感 |
long awaitNanos(long nanosTimeout) throws InterruptedException | 当前线程进行等待状态直到被通知、中断或者超时。返回值表示剩余的时间 |
boolean await(long time, TimeUnit unit) throws InterruptedException | 当前线程进行等待状态直到被通知、中断或者超时,与awaitNanos区别就是可以指定时间单位 |
boolean awaitUntil(Date deadline) throws InterruptedException | 当前线程进行等待状态直到被通知、中断或者到某个时间,如果没有到指定时间就被通知,返回true,否则返回false |
void signal() | 唤醒一个等待在Condition上的线程,该线程从等待方法上返回前必须获得与Condition相关联的锁 |
void signalAll() | 唤醒所有等待在Condition上的线程,能从等待方法上返回的线程必须先获得与Condition相关联的锁 |
下面通过一个有界队列的示例来深入了解Condition的使用方式。有界队列是一种特殊的队列,当队列为空时,队列的获取操作将会阻塞获取线程,直到队列中有新增元素,当队列已满时,队列的插入操作会阻塞插入线程,直接队列出现空位。
package util;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedQueue<T> {
private Object[] items;
private int addIndex;
private int removeIndex;
private int count;
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public BoundedQueue(int count) {
items = new Object[count];
}
public void add(T item) throws InterruptedException {
lock.lock();
try {
while (count==items.length) {
notFull.await();
}
items[addIndex] = item;
if(++addIndex == items.length) {
addIndex=0;
}
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
@SuppressWarnings("unchecked")
public T remove() throws InterruptedException {
lock.lock();
try {
while (count==0) {
notEmpty.await();
}
Object o = items[removeIndex];
if(++removeIndex==items.length) {
removeIndex=0;
}
--count;
notFull.signal();
return (T) o;
} finally {
lock.unlock();
}
}
}
上述示例中,BoundedQueue通过add(T)
方法添加一个元素,通过remove()
方法移除一个元素。
以增加方法为例,首先需要获得锁,目的是确保数组修改的可见性和排他性。当数量元素数量等于数组长度时,表示数组已满,调用notFull.await()
,当前线程随之释放锁并进入等待状态。如果数组元素数量不等于数组长度,表示数组未满,则添加元素到数组中,同时通知等待在notEmpty上的线程,数组中有新元素可以获取。
Condition实现分析
ConditionObject是AQS的内部类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也比较合理。每个Condition对象都包含一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。
等待队列
等待队列是一个FIFO队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。事实上,节点的定义服用了同步器中节点的定义,也就是两个节点类型都是AbstractQueuedSynchronizer.Node
。
一个Condition中包含了一个等待队列,Condition拥有首节点和尾节点。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列。
> line: 1862
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
等待队列的基本结构如下:
等待
Condition拥有首尾节点的引用,而新增节点只需要将原来的尾节点nextWaiter指向它,并且更新尾节点即可。上述的更新过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。
> line: 2007
public final void awaitUninterruptibly() {
// 增加尾节点
Node node = addConditionWaiter();
// 释放同步状态
int savedState = fullyRelease(node);
boolean interrupted = false;
// 阻塞当前线程
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
// 线程被唤醒后再次尝试获取同步状态
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
> line: 1880
private Node addConditionWaiter() {
// 如果不是持有锁的线程调用,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node t = lastWaiter;
// 如果lastWaiter被取消了,则清除它
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建一个CONDITION节点
Node node = new Node(Node.CONDITION);
// 更新字段
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
> line: 1943
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
// 解除对此节点的引用
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
> line: 1756
final int fullyRelease(Node node) {
try {
int savedState = getState();
if (release(savedState))
return savedState;
throw new IllegalMonitorStateException();
} catch (Throwable t) {
node.waitStatus = Node.CANCELLED;
throw t;
}
}
> line: 1666
final boolean isOnSyncQueue(Node node) {
// 在addConditionWaiter()方法上创建的新节点都是CONDITION节点
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock(更确切的说是AQS)有一个同步队列和多个等待队列:
等待的响应中断版本如下:
> line: 2068
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 检查等待时线程是否被中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被唤醒或者中断后再次尝试获取同步状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 处理中断
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
> line: 2037
private int checkInterruptWhileWaiting(Node node) {
// 如果线程等待时被中断,尝试将其加入到同步队列中
// 在被通知前被中断了,返回THROW_IE
// 在被通知后被中断了,返回REINTERRUPT
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
> line: 2028
/*
* 对于响应中断的等待,我们需要跟踪是否要抛出InterruptedException
* 异常,如果在等待队列上被阻塞时被中断了,那么抛出异常,
* 如果在同步队列上被阻塞时被中断了,那么就再次中断。
*/
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
> line: 1734
final boolean transferAfterCancelledWait(Node node) {
if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
> line: 629
// 此方法与addWaiter(Node)方法几乎一样
private Node enq(Node node) {
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return oldTail;
}
} else {
initializeSyncQueue();
}
}
}
> line: 2047
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
等待的超时版本如下:
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
// 响应中断
if (Thread.interrupted())
throw new InterruptedException();
// 我们不会在这里检查 nanosTimeout <= 0L,使得awaitNanos(0)成为一种释放锁的方式
// 计算截止时间
final long deadline = System.nanoTime() + nanosTimeout;
long initialNanos = nanosTimeout;
// 增加到等待队列中
Node node = addConditionWaiter();
// 释放同步状态
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 超时时间已经过去,移到同步队列中,退出循环
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
// 睡眠指定时间
if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
// 重新计算超时时间
nanosTimeout = deadline - System.nanoTime();
}
// 尝试获取同步状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
long remaining = deadline - System.nanoTime(); // avoid overflow
return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE;
}
> line: 2146
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
// 获取截止时间
long abstime = deadline.getTime();
// 响应中断
if (Thread.interrupted())
throw new InterruptedException();
// 增加到等待队列中
Node node = addConditionWaiter();
// 释放同步状态
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 到达截止时间,移到同步队列,退出循环
if (System.currentTimeMillis() >= abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
// 阻塞线程
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 尝试获取同步状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
通知
调用Condition的signal()方法会唤醒在等待队列中等待时间最长的节点(即首节点),在唤醒线程前,将节点移到同步队列中。
> line: 1973
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
> line: 1906
private void doSignal(Node first) {
// 解除头节点的引用,同时更新头节点
// 然后尝试将头节点移到同步队列,如果失败表示此节点已经被取消,
// 唤醒下一个节点
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
> line: 1707
final boolean transferForSignal(Node node) {
/*
* 如果无法改变waitStatus,表示这个节点已经被取消了
*/
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
// 移到同步队列中,并返回前驱结点
Node p = enq(node);
int ws = p.waitStatus;
// 尝试将前驱结点的waitStatus设置为SIGNAL
// 如果失败了或者前驱节点被取消了,那么就唤醒线程使其继续尝试获取同步状态
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
调用该方法的前置条件是当前线程必须获取了锁,可以看到signal()
方法开头进行了检查。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。
被唤醒后的线程,将从await()
方法中的while循环中退出(isOnSyncQueue(Node)
方法返回true),进而调用同步器的acquireQueued()
方法加入到获取同步状态的竞争中。
public final void awaitUninterruptibly() {
// 增加尾节点
Node node = addConditionWaiter();
// 释放同步状态
int savedState = fullyRelease(node);
boolean interrupted = false;
// 阻塞当前线程
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
// 线程被唤醒后再次尝试获取同步状态
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
final boolean isOnSyncQueue(Node node) {
// 被移动到同步队列的节点waitStatus被更新为0
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 查看next字段的注释:"The next field of cancelled nodes is set to
// point to the node itself instead of null, to make life
// easier for isOnSyncQueue."。从此处我们可以发现这个if语句
// 存在的意义。
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
// 想象这样一种场景,有一个线程调用signal方法并执行到transferForSignal
// 方法中的enq方法,尝试将此节点插入到同步队列中,查看enq源码可以发现,
// 不管CAS tail是否成功,此节点的prev节点已经被设置了,所以第一个if的
// 检查将会通过(prev!=null),此时这个线程被意外唤醒,然后在await方法中的
// 循环中继续执行,调用此方法,此时就会符合注释描述的这种情况。
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
// We check for node first, since it's likely to be at or near tail.
// tail is known to be non-null, so we could re-order to "save"
// one null check, but we leave it this way to help the VM.
for (Node p = tail;;) {
if (p == node)
return true;
if (p == null)
return false;
p = p.prev;
}
}
signalAll()
方法则对等待队列中的每个节点都唤醒,将它们全部移到同步队列中。
> line: 1988
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
> line: 1919
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
关于AQS的分析到此结束。除了AQS以外,java.util.concurrent.lock
包中还有一个AbstractQueuedLongSynchronizer
类,此类的实现与AQS几乎完全一致,只有同步状态使用long表示而不是int。
> line: 99
/**
* The synchronization state.
*/
private volatile long state;
网友评论