CountDownLatch常用于多线程开发中,某些线程需要等待一部分线程执行完毕之后再进行。最常见的场景就是一部调用多个接口,然后将返回值汇聚成一个汇总结果。下面将介绍CountDownLatch的使用及原理解析。
1. 使用演示
public class CountDownLatchTest {
private static CountDownLatch countDownLatch = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread(new MThread()).start();
new Thread(new MThread()).start();
Thread.sleep(2000);
countDownLatch.countDown();
countDownLatch.countDown();
}
public static class MThread implements Runnable{
@Override
public void run() {
try {
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "run结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- 输出
out =>
Thread-0run结束
Thread-1run结束
2. 底层逻辑展示
CountDownLatch底层依赖AQS实现。
- 初始化时通过构造方法将
state
赋值。- 当有线程(可以是多个)执行
await
时,进入到等待队列中,并执行LockSupport.park
挂起线程。- 当有线程执行
countDown
时,进行CAS的state-1操作。- 当线程执行
countDown
,执行完state-1操作发现state == 0时,执行LockSupport.unpark
唤醒等待队列中的全部线程。
下面是流程图:
![](https://img.haomeiwen.com/i18215596/c1b4d04968f2ed09.png)
3. 重点源码说明
- 初始化
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count); // 初始化state
}
- 执行await
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1; // 如果state == 0 则直接放行,如果state != 0则阻塞线程
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 创建共享锁节点
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 执行LockSupport.park阻塞线程
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);// 执行LockSupport.park阻塞线程
return Thread.interrupted();
}
- 执行coutDown
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 尝试释放共享锁
doReleaseShared(); // 释放共享锁
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc)) // CAS 执行state - 1
return nextc == 0; // 如果state == 0,则表示可以释放共享锁
}
}
private void doReleaseShared() {
for (;;) { // 唤醒等待队列中的全部阻塞线程
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
网友评论