美文网首页
Java并发编程之CountDownLatch的使用

Java并发编程之CountDownLatch的使用

作者: polyau | 来源:发表于2020-06-16 01:24 被阅读0次

    1. 介绍

    在这篇文章中,将给出一个对CountDownLatch的指导性教程,以实际的例子说明其基本使用方法。

    本质上,通过CountDownLatch可以使得一个线程一直阻塞直到其它线程已经完成指定的任务。

    2. 并发编程中的使用

    简单地来说,CountDownLatch有一个counter域,你可以根据需要对它进行递减。我们可以使用它来阻塞一个正在被调用的线程直到它被递减为0。

    如果我们正在做一些并行处理工作,可以以目标工作线程数量作为计数器的初值实例化CountDownLatch对象。然后我们就可以在每个线程结束的时候调用countdown()来保证每一个调用了await()的独立线程被阻塞直到所有的工作进程执行结束。

    3. 等待线程结束

    让我们来尝试下这种模式,通过创建一个Worker,使用CountDownLatch字段来通知我们其何时结束:

    public class Worker implements Runnable {
        private List<String> outputScraper;
        private CountDownLatch countDownLatch;
     
        public Worker(List<String> outputScraper, CountDownLatch countDownLatch) {
            this.outputScraper = outputScraper;
            this.countDownLatch = countDownLatch;
        }
     
        @Override
        public void run() {
            doSomeWork();
            outputScraper.add("Counted down");
            countDownLatch.countDown();
        }
    }
    

    接下来,让我们创建测试用例验证我们可以让CountDownLatch一直等待Worker实例结束:

    @Test
    public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion()
      throws InterruptedException {
     
        List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
        CountDownLatch countDownLatch = new CountDownLatch(5);
        List<Thread> workers = Stream
          .generate(() -> new Thread(new Worker(outputScraper, countDownLatch)))
          .limit(5)
          .collect(toList());
     
          workers.forEach(Thread::start);
          countDownLatch.await(); 
          outputScraper.add("Latch released");
     
          assertThat(outputScraper)
            .containsExactly(
              "Counted down",
              "Counted down",
              "Counted down",
              "Counted down",
              "Counted down",
              "Latch released"
            );
        }
    

    自然而然,"Latch released"将会一直是最后的输出 - 因为它取决于CountDownLatch何时释放。

    请注意如果没有调用await(),我们将不能保证线程的执行次序,导致测试将会发生随机性的失败。

    4. 多个线程等待同时开始运行

    之前的例子使用了5个线程,而这次将会启动数千个线程,这样就很有可能出现一种情形:在后面启动的线程中调用start()前,很多早先一些启动的线程已经执行完毕。这有可能使得一个并发问题难以被重现,因为我们不能够让这些线程并行运行。

    为了解决这个问题,我们在之前的例子的基础上稍微改变一下CountdownLatch的工作方式。并不是阻塞父线程直到一些子线程执行结束,我们可以阻塞子线程直到其它的子线程都已经开始运行。

    让我们修改一下run()方法让它在继续执行前先进行阻塞:

    public class WaitingWorker implements Runnable {
     
        private List<String> outputScraper;
        private CountDownLatch readyThreadCounter;
        private CountDownLatch callingThreadBlocker;
        private CountDownLatch completedThreadCounter;
     
        public WaitingWorker(
          List<String> outputScraper,
          CountDownLatch readyThreadCounter,
          CountDownLatch callingThreadBlocker,
          CountDownLatch completedThreadCounter) {
     
            this.outputScraper = outputScraper;
            this.readyThreadCounter = readyThreadCounter;
            this.callingThreadBlocker = callingThreadBlocker;
            this.completedThreadCounter = completedThreadCounter;
        }
     
        @Override
        public void run() {
            readyThreadCounter.countDown();
            try {
                callingThreadBlocker.await();
                doSomeWork();
                outputScraper.add("Counted down");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                completedThreadCounter.countDown();
            }
        }
    }
    

    现在,让我们修改一下测试用例,这会使得主线程一直阻塞,直到所有Worker线程都开始运行。接着主线程会解禁所有的Worker线程,然后再阻塞直到所有的Worker线程都执行结束:

    @Test
    public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime()
     throws InterruptedException {
      
        List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
        CountDownLatch readyThreadCounter = new CountDownLatch(5);
        CountDownLatch callingThreadBlocker = new CountDownLatch(1);
        CountDownLatch completedThreadCounter = new CountDownLatch(5);
        List<Thread> workers = Stream
          .generate(() -> new Thread(new WaitingWorker(
            outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter)))
          .limit(5)
          .collect(toList());
     
        workers.forEach(Thread::start);
        readyThreadCounter.await(); 
        outputScraper.add("Workers ready");
        callingThreadBlocker.countDown(); 
        completedThreadCounter.await(); 
        outputScraper.add("Workers complete");
     
        assertThat(outputScraper)
          .containsExactly(
            "Workers ready",
            "Counted down",
            "Counted down",
            "Counted down",
            "Counted down",
            "Counted down",
            "Workers complete"
          );
    }
    

    这个模式对于重现并发bug很有用,因为能够被用来强制数千个线程并行执行相同的逻辑。

    5. 提前终止CountdownLatch

    有时候,我们可能遇到这样的情形,Worker在递减CountDownLatch之前由于某些错误发生意外的终止。这会导致计数器永远不会递减为0,然后await()永远不会终止:

    @Override
    public void run() {
        if (true) {
            throw new RuntimeException("Oh dear, I'm a BrokenWorker");
        }
        countDownLatch.countDown();
        outputScraper.add("Counted down");
    }
    

    让我们修改之前的测试代码,使用BrokenWorker,这显示await()将会一直阻塞:

    @Test
    public void whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck()
      throws InterruptedException {
      
        List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
        CountDownLatch countDownLatch = new CountDownLatch(5);
        List<Thread> workers = Stream
          .generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch)))
          .limit(5)
          .collect(toList());
     
        workers.forEach(Thread::start);
        countDownLatch.await();
    }
    

    显而易见,这并不是我们所期望的 - 如果能让程序继续运行会比永远地陷入阻塞好得多。

    为了解决这个问题,我们添加一个超时参数去调用await()

    boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS);
    assertThat(completed).isFalse();
    

    如我们所见,测试代码最终将会超时,await()将会返回false

    6. 总结

    在这篇快速指导文章中,我们说明了如何使用CountDownLatch去阻塞一个线程直到其它线程已经完成其任务。

    我们也展示了它如何被用来帮助调试并发问题(通过保证线程并行运行)。

    这些例子的实现代码可以在over on GitHub上找到; 这是一个基于Maven的项目,因此应该很容易按原样运行。

    翻译自https://www.baeldung.com/java-countdown-latch
    ,如有错误欢迎指正!

    相关文章

      网友评论

          本文标题:Java并发编程之CountDownLatch的使用

          本文链接:https://www.haomeiwen.com/subject/wlikxktx.html