ReentrnatLock.newCondition()是一个条件变量,这个变量对Object.wait/notify/notifyAll很好的扩展
条件变量为线程提供了一个含义,以便在某个状态条件不满足时挂起这个线程,并在条件满足时让另一个线程唤醒自己。
因为访问此共享状态信息发生在不同的线程中,所以它必须受保护,即,调用await之前,线程要获取锁,并且是一个独占锁。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); //将当前线程加入到Condition自己维护的队列中(AQS也维护一个队列)
int savedState = fullyRelease(node); //释放当前线程占有的锁
int interruptMode = 0;
//循环,检查节点是否在AQS的队列中(signal会使节点进入AQS队列中等待锁)
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//被唤醒后,重新开始竞争锁,同样,如果竞争不到还是会将自己沉睡,等待唤醒重新开始竞争。这时acquireQueued的语义。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
总的来说,Condition自己维护了一个队列,await()后会释放锁,并且节点从AQS的队列移到Condition的队列,而线程想要获取锁就要重新回到AQS的队列中等待资源。await()的整体流程是按如下操作进行的
- 将线程加入到Condition锁队列中,值得注意的是,这时不同于AQS的独立队列
- 释放锁,如果当前线程带锁等待,别的线程不能获取锁就会造成死锁
- while等待,直到线程被唤醒或者被cancel。LockSupport.park()会阻塞线程
- acquireQueued()获取锁,这时当前节点已经不在Condition队列
接下来看看Condition怎么维护自身的队列
private Node addConditionWaiter() {
Node t = lastWaiter; // If lastWaiter is cancelled, clean out.
//找到最后一个队尾元素,将新节点插入队列
if (t != null && t.waitStatus != Node.CONDITION) { //保证队尾元素的状态
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
可以看到Condition有这样两个属性
private transient Node firstWaiter;
private transient Node lastWaiter;
而Node有Node.nextWaiter属性,这样就可以构成一个FIFO队列。
signal()/signalAll()
前面说await()的时候就有提到signal,因为await的时候会调用park()阻塞线程,singal()里面调用unpark函数才能使await结束循环
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//将节点移出队列,移出一个就能结束循环
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
上面的代码很容易看出来,signal就是唤醒Condition队列中的第一个非CANCELLED节点线程,而signalAll就是唤醒所有非CANCELLED节点线程。当然了遇到CANCELLED线程就需要将其从FIFO队列中剔除。而进入到AQS队列的任务就交由transferForSignal()完成
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
nt ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
这里也执行了unpark操作后,await的循环就会结束,循环之后就是acquireQueued();
网友评论