by shihang.mai
使用
CountDownLatch cd = new CountDownLatch(2);
//在需要等待的地方调用,那么该线程阻塞
cd.await();
//其他线程完成任务后调用此就会-1,直到为0,await的地方就会唤醒
cd.countDown();
源码分析
先来看一下构造
public class CountDownLatch {
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
}
}
调用AQS
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
protected final void setState(int newState) {
state = newState;
}
}
其实就设置了一下state=传入的值
await
public class CountDownLatch {
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
}
继续调用AQS
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
}
tryAcquireShared调用CountDownLatch内部类Sync
private static final class Sync extends AbstractQueuedSynchronizer {
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
}
当调用await,getState当然!=0,返回-1,那么继续调用AQS的doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 封装为share Node放入CLH队列
- 获取前驱节点,当然是head,而tryAcquireShared,state!=0当然返回-1
- 那么挂起当前线程
- 当其他线程countDown,知道state==0,那么这个线程唤醒
countDown
public class CountDownLatch {
public void countDown() {
sync.releaseShared(1);
}
}
继续调用AQS
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
}
调用CountDownLatch的内部类Sync
private static final class Sync extends AbstractQueuedSynchronizer {
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
释放的话调用AQS的doReleaseShared
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
每次countDown,会将state-1,知道为0调用AQS的doReleaseShared释放阻塞的线程
总结
- 初始化时设置state的值
- 调用await()方法,会加入到等待队列中,线程会被挂起,等待被唤醒。它会等待直到state值为0才继续执行
- 每次调用countDown()方法时,state减1,当state等于0时,会唤醒AQS等待队列中的线程。
网友评论