在并发编程中,往往会遇到这样的需求:一个或几个线程必须等到一系列执行前置任务的线程执行完成后才能继续执行,而CountDownLatch就是实现了这种机制的同步器。我们先看一下CountDownLatch的基本用法,然后再通过源码来分析一下CountDownLatch的具体实现。
CountDownLatch示例
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemos {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
System.out.println("all thread is start");
System.out.println("count->" + latch.getCount());
new Thread(()->{
try {
latch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("count->" + latch.getCount());
System.out.println("all pre-thread is finished");
}).start();
new Thread(()->{
System.out.println("Thread 1 process...");
try {
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
latch.countDown();
System.out.println("Thread 1 count->" + latch.getCount());
}).start();
new Thread(()->{
System.out.println("Thread 2 process...");
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
latch.countDown();
System.out.println("Thread 2 count->" + latch.getCount());
}).start();
new Thread(()->{
System.out.println("Thread 3 process...");
try {
Thread.currentThread().sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
latch.countDown();
System.out.println("Thread 3 count->" + latch.getCount());
}).start();
}
}
运行结果如下:
all thread is start
count->3
Thread 1 process...
Thread 2 process...
Thread 3 process...
Thread 2 count->2
Thread 1 count->1
Thread 3 count->0
count->0
all pre-thread is finished
CountDownLatch是一个具有倒数计数功能同步辅助器,在初始化的时候传入count
参数来初始化实例,没调用一次countDown()
方法,count的值都会减1,而线程调用await()
方法会一直被阻塞直到count为零。在await()方法不再阻塞以后,所有等待的线程都会被释放,并且任何await()的子调用都会立刻返回。CountDownLatch最典型的应用就是使某些线程在等待若干前置线程执行完成后才能一起执行。
CountDownLatch源码简析
CountDownLatch的同步机制实现主要依赖于内部类Sync
,而Sync继承了AbstractQueuedSynchronizer
。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
CountDownLatch在初始化对象的时候,会传入一个整型参数,该值会将sync
中的state设置为指定值,该state会作为一个共享锁存在。而await()
和countDown
操作都是针对AQS中的state进行同步相关的原子操作。AQS这里不做详细介绍。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public void countDown() {
sync.releaseShared(1);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//====AbstractQueuedSynchronizer中的实现方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//====AbstractQueuedSynchronizer中的实现方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
网友评论