简书 占小狼
转载请注明原创出处,谢谢!
前言
JDK的并发包中提供了几个非常有用的工具类,这些工具类给我们在业务开发过程中提供了一种并发流程控制的手段,本文会基于实际应用场景介绍如何使用CountDownLatch,以及内部实现机制。
CountDownLatch是什么
CountDownLatch也叫闭锁,在JDK1.5被引入,允许一个或多个线程等待其他线程完成操作后再执行。
CountDownLatch内部会维护一个初始值为线程数量的计数器,主线程执行await方法,如果计数器大于0,则阻塞等待。当一个线程完成任务后,计数器值减1。当计数器为0时,表示所有的线程已经完成任务,等待的主线程被唤醒继续执行。
过程图.png
应用场景
应用程序的主线程希望在负责启动框架服务的线程已经完成之后再执行。在例子中,模拟了一个应用的启动类,具体实现如下。
Service.java:所有服务的基类,具体实现在execute方法实现。
class Service implements Runnable {
private CountDownLatch latch;
public Service(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
execute();
} finally {
if (latch != null)
latch.countDown();
}
}
public void execute() {}
}
HealthCheckService.java:服务具体实现类,类似的还有DatabaseCheckerService,这里我们使用TimeUnit.SECONDS.sleep模拟长时间的操作。
class HealthCheckService extends Service {
public HealthCheckService(CountDownLatch latch) {
super(latch);
}
@Override
public void execute() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Application.java:应用启动类,使用线程池执行每个服务的任务。负责初始化闭锁,然后等待,直到所有服务都被检测完。
class Application {
private CountDownLatch latch;
public void startUp() throws Exception {
latch = new CountDownLatch(2);
List<Service> services = new ArrayList<>();
services.add(new DatabaseCheckerService(latch));
services.add(new HealthCheckService(latch));
Executor executor = Executors.newFixedThreadPool(services.size());
for (Service service : services) {
executor.execute(service);
}
latch.await();
System.out.println("all service is start up");
}
}
实现原理
CountDownLatch实现主要基于java同步器AQS,不熟悉的可以移步这里 深入浅出java同步器。
其内部维护一个AQS子类,并重写了相关方法。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
await实现
主线程执行await方法,tryAcquireShared方法中如果state不等于0,返回-1,则加入到等待队列中,主线程通过LockSupport.park(this)被挂起。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
countDown实现
countDown方法委托sync实现state的减1操作,即通过unsafe.compareAndSwapInt方法设置state值。
public void countDown() {sync.releaseShared(1);}
如果state为0,通过LockSupport.unpark唤醒await方法中挂起的主线程。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
和CyclicBarrier的区别
- CyclicBarrier 允许一系列线程相互等待对方到达一个点,正如 barrier 表示的意思,该点就像一个栅栏,先到达的线程被阻塞在栅栏前,必须等到所有线程都到达了才能够通过栅栏;
- CyclicBarrier 持有一个变量 parties,表示需要全部到达的线程数量;先到达的线程调用 barrier.await 方法进行等待,一旦到达的线程数达到 parties 变量所指定的数,栅栏打开,所有线程都可以通过;
- CyclicBarrier 构造方法接受另一个 Runnable 类型参数 barrierAction,该参数表明再栅栏被打开的时候需要采取的动作,null 表示不采取任何动作,注意该动作将会在栅栏被打开而所有线程接着运行前被执行;
- CyclicBarrier 是可重用的,当最后一个线程到达的时候,栅栏被打开,所有线程通过之后栅栏重新关闭,进入下一代;
- CyclicBarrier.reset 方法能够手动重置栅栏,此时正在等待的线程会收到 BrokenBarrierException
异常。
总结
通过本文的介绍,希望大家能够了解CountDownLatch的应用场景和工作机制。
END。
我是占小狼。
在魔都艰苦奋斗,白天是上班族,晚上是知识服务工作者。
读完我的文章有收获,记得关注和点赞哦,如果非要打赏,我也是不会拒绝的啦!
网友评论