先来看下例子
public class ConditionTest implements Runnable {
private Lock lock;
private Condition con;
public ConditionTest(Lock lock, Condition condition) {
this.lock = lock;
this.con = condition;
}
public void run() {
if ("thread1".equals(Thread.currentThread().getName()))
testThread1Waiter();
if ("thread2".equals(Thread.currentThread().getName()))
testThread2Signal();
}
public void testThread1Waiter() {
lock.lock();
try {
try {
System.out.println("thead1被阻塞");
con.await();
System.out.println("thead1被唤醒");
} catch (InterruptedException e) {
}
} finally {
lock.unlock();
}
}
public void testThread2Signal() {
lock.lock();
try {
con.signal();
System.out.println("thead2唤醒等待线程");
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(new ConditionTest(lock, condition), "thread1").start();
new Thread(new ConditionTest(lock, condition), "thread2").start();
}
}
运行结果
thead1被阻塞
thead2唤醒等待线程
thead1被唤醒
Condition的await()实现原理
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
①首先判断当前线程是否被中断了,如果已经被中断了,则直接抛InterruptedException给上层调用者,否则进入②
if (Thread.interrupted())
throw new InterruptedException();
②把当前线程所对应的节点放入condition队列中
Node node = addConditionWaiter();
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
首先拿到队列(注意数据结构,Condition构建出来的也是一个队列)中最后一个等待者,紧接着判断,判断最后一个等待者的waitStatus不是CONDITION的话,
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
解绑取消的等待者,因为通过这句代码
Node node = new Node(Thread.currentThread(), Node.CONDITION);
我们看到,new出来的Node的状态都是CONDITION的。
那么unlinkCancelledWaiters做了什么?就是一些指针遍历并判断状态的操作,总结一下就是:从头到尾遍历每一个Node,遇到Node的waitStatus不是CONDITION的就从队列中踢掉,该节点的前后节点相连。
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
//把不是condition的都踢掉
//1、如果首节点不是condition
if (t.waitStatus != Node.CONDITION) {
//2、因为上面 Node next = t.nextWaiter;这里已经记录了t.nextWaiter;的值,所以可以把t.nextWaiter设置为null;
t.nextWaiter = null;
//3、一开始trail是NULL的,所以进入4,否则进入5
if (trail == null)
//4、把下个节点当作首节点,进入6
firstWaiter = next;
else
//5、把首节点指向trail.nextWaiter,进入6
trail.nextWaiter = next;
//6、如果next为NULL,说明conition队列就一个节点,trail指向lastWaiter ,然后进入9
if (next == null)
lastWaiter = trail;
}
else
//8、如果首节点是condition状态,首节点指向trail,然后进入9
trail = t;
//9、把下个节点设置为首节点,继续循环判断是否为condition节点,当然如果下个节点为null。也就是等待队列只有一个节点的话,那就退出循环了
t = next;
}
}
等待队列的基本结构如下图所示:
等待队列
插入节点只需要将原有尾节点的nextWaiter指向当前节点,并且更新尾节点。更新节点并没有像AQS更新同步队列使用CAS是因为调用await()方法的线程必定是获取了锁的线程,锁保证了操作的线程安全。
AQS实质上拥有一个同步队列和多个等待队列,具体对应关系如下图所示:
AQS同步队列与等待队列③完全释放Node的状态
int savedState = fullyRelease(node);
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
首先获取state,release的时候将整个state传过去,理由是某线程可能多次调用了lock()方法,比如调用了10次lock,那么此线程就将state加到了10,所以这里要将10传过去,将状态全部释放,这样后面的线程才能重新从state=0开始竞争锁,这也是方法被命名为fullyRelease的原因,因为要完全释放锁,释放锁之后,如果有竞争锁的线程,那么就唤醒第一个,这都是release方法的逻辑了
④判断Node是否在AbstractQueuedSynchronizer构建的队列中而不是Node是否在Condition构建的队列(waitstatus == condition状态)中,如果Node不在AbstractQueuedSynchronizer构建的队列中,那么调用LockSupport的park方法阻塞。
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
//循环当前节点是否是AQS尾节点
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
⑤唤醒后重新参与竞争,如果获取不到锁,将再次睡眠等待唤醒
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
⑥如果节点被取消了,清除condition队列中被取消的节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
Condition的signal()实现原理
①要能signal(),当前线程必须持有独占锁,否则抛出异常IllegalMonitorStateException。
那么真正操作的时候,获取第一个waiter,如果有waiter,调用doSignal方法:
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
② 1. 重新设置firstWaiter,指向第一个waiter的nextWaiter,也就是把首节点下个节点当作首节点
- 如果第一个waiter的nextWaiter为null,说明当前队列中只有一个waiter,lastWaiter置空
- 因为firstWaiter = first.nextWaiter,已经赋值给firstWaiter了,所以把first.nextWaiter置NULL,方便gc回收
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
③方法本意是将一个节点从Condition队列转换为AbstractQueuedSynchronizer队列,
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
总结一下方法的实现:
- 尝试将Node的waitStatus从CONDITION置为0,这一步失败直接返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
- 当前节点进入调用enq方法进入AbstractQueuedSynchronizer队列
Node p = enq(node);
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- 当前节点通过CAS机制将waitStatus置为SIGNAL
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
最后返回true。再次回到这里的代码
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
while循环里的!transferForSignal(first) 变为false了,直接退出循环,所以signal()是唤醒其中一个等待唤醒的线程
从唤醒的代码我们可以得出一个重要结论:某个await()的节点被唤醒之后并不意味着它后面的代码会立即执行,它会被加入到AbstractQueuedSynchronizer队列的尾部,只有前面等待的节点获取锁全部完毕才能轮到它。
不过正常情况 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 这个判断是不会为true的,所以,不会在这个时候唤醒该线程。
只有到发送signal信号的线程调用reentrantLock.unlock()后因为它已经被加到AQS的等待队列中,所以才会被唤醒。
signalAll方法的作用就是将Condition队列中所有等待的节点逐一队列中从移除,由CONDITION状态变为SIGNAL状态并加入AbstractQueuedSynchronizer队列的尾部。signal()则是唤醒其中一个等待唤醒的线程
signalAll实现,唤醒等待队列中所有的等待节点
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
网友评论