1. 介绍
在这篇文章中,将给出一个对CountDownLatch的指导性教程,以实际的例子说明其基本使用方法。
本质上,通过CountDownLatch
可以使得一个线程一直阻塞直到其它线程已经完成指定的任务。
2. 并发编程中的使用
简单地来说,CountDownLatch
有一个counter
域,你可以根据需要对它进行递减。我们可以使用它来阻塞一个正在被调用的线程直到它被递减为0。
如果我们正在做一些并行处理工作,可以以目标工作线程数量作为计数器的初值实例化CountDownLatch
对象。然后我们就可以在每个线程结束的时候调用countdown()
来保证每一个调用了await()
的独立线程被阻塞直到所有的工作进程执行结束。
3. 等待线程结束
让我们来尝试下这种模式,通过创建一个Worker,使用CountDownLatch
字段来通知我们其何时结束:
public class Worker implements Runnable {
private List<String> outputScraper;
private CountDownLatch countDownLatch;
public Worker(List<String> outputScraper, CountDownLatch countDownLatch) {
this.outputScraper = outputScraper;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
doSomeWork();
outputScraper.add("Counted down");
countDownLatch.countDown();
}
}
接下来,让我们创建测试用例验证我们可以让CountDownLatch
一直等待Worker
实例结束:
@Test
public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion()
throws InterruptedException {
List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(5);
List<Thread> workers = Stream
.generate(() -> new Thread(new Worker(outputScraper, countDownLatch)))
.limit(5)
.collect(toList());
workers.forEach(Thread::start);
countDownLatch.await();
outputScraper.add("Latch released");
assertThat(outputScraper)
.containsExactly(
"Counted down",
"Counted down",
"Counted down",
"Counted down",
"Counted down",
"Latch released"
);
}
自然而然,"Latch released"
将会一直是最后的输出 - 因为它取决于CountDownLatch
何时释放。
请注意如果没有调用await()
,我们将不能保证线程的执行次序,导致测试将会发生随机性的失败。
4. 多个线程等待同时开始运行
之前的例子使用了5个线程,而这次将会启动数千个线程,这样就很有可能出现一种情形:在后面启动的线程中调用start()
前,很多早先一些启动的线程已经执行完毕。这有可能使得一个并发问题难以被重现,因为我们不能够让这些线程并行运行。
为了解决这个问题,我们在之前的例子的基础上稍微改变一下CountdownLatch
的工作方式。并不是阻塞父线程直到一些子线程执行结束,我们可以阻塞子线程直到其它的子线程都已经开始运行。
让我们修改一下run()
方法让它在继续执行前先进行阻塞:
public class WaitingWorker implements Runnable {
private List<String> outputScraper;
private CountDownLatch readyThreadCounter;
private CountDownLatch callingThreadBlocker;
private CountDownLatch completedThreadCounter;
public WaitingWorker(
List<String> outputScraper,
CountDownLatch readyThreadCounter,
CountDownLatch callingThreadBlocker,
CountDownLatch completedThreadCounter) {
this.outputScraper = outputScraper;
this.readyThreadCounter = readyThreadCounter;
this.callingThreadBlocker = callingThreadBlocker;
this.completedThreadCounter = completedThreadCounter;
}
@Override
public void run() {
readyThreadCounter.countDown();
try {
callingThreadBlocker.await();
doSomeWork();
outputScraper.add("Counted down");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
completedThreadCounter.countDown();
}
}
}
现在,让我们修改一下测试用例,这会使得主线程一直阻塞,直到所有Worker线程都开始运行。接着主线程会解禁所有的Worker线程,然后再阻塞直到所有的Worker线程都执行结束:
@Test
public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime()
throws InterruptedException {
List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch readyThreadCounter = new CountDownLatch(5);
CountDownLatch callingThreadBlocker = new CountDownLatch(1);
CountDownLatch completedThreadCounter = new CountDownLatch(5);
List<Thread> workers = Stream
.generate(() -> new Thread(new WaitingWorker(
outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter)))
.limit(5)
.collect(toList());
workers.forEach(Thread::start);
readyThreadCounter.await();
outputScraper.add("Workers ready");
callingThreadBlocker.countDown();
completedThreadCounter.await();
outputScraper.add("Workers complete");
assertThat(outputScraper)
.containsExactly(
"Workers ready",
"Counted down",
"Counted down",
"Counted down",
"Counted down",
"Counted down",
"Workers complete"
);
}
这个模式对于重现并发bug很有用,因为能够被用来强制数千个线程并行执行相同的逻辑。
5. 提前终止CountdownLatch
有时候,我们可能遇到这样的情形,Worker在递减CountDownLatch
之前由于某些错误发生意外的终止。这会导致计数器永远不会递减为0,然后await()
永远不会终止:
@Override
public void run() {
if (true) {
throw new RuntimeException("Oh dear, I'm a BrokenWorker");
}
countDownLatch.countDown();
outputScraper.add("Counted down");
}
让我们修改之前的测试代码,使用BrokenWorker
,这显示await()
将会一直阻塞:
@Test
public void whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck()
throws InterruptedException {
List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(5);
List<Thread> workers = Stream
.generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch)))
.limit(5)
.collect(toList());
workers.forEach(Thread::start);
countDownLatch.await();
}
显而易见,这并不是我们所期望的 - 如果能让程序继续运行会比永远地陷入阻塞好得多。
为了解决这个问题,我们添加一个超时参数去调用await()
。
boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS);
assertThat(completed).isFalse();
如我们所见,测试代码最终将会超时,await()
将会返回false
。
6. 总结
在这篇快速指导文章中,我们说明了如何使用CountDownLatch
去阻塞一个线程直到其它线程已经完成其任务。
我们也展示了它如何被用来帮助调试并发问题(通过保证线程并行运行)。
这些例子的实现代码可以在over on GitHub上找到; 这是一个基于Maven的项目,因此应该很容易按原样运行。
翻译自https://www.baeldung.com/java-countdown-latch
,如有错误欢迎指正!
网友评论