美文网首页
【并发】CountDownLatch

【并发】CountDownLatch

作者: 程序员驿站 | 来源:发表于2017-12-29 12:20 被阅读62次

作用

它允许一个或多个线程一直等待其他线程的执行完后再执行。

如何使用(demo)

1、先创建线程池


 private static final int processors = Runtime.getRuntime().availableProcessors();

   private static final ExecutorService executorService = Executors.newFixedThreadPool(processors, new ThreadFactory() {
           @Override
           public Thread newThread(@NotNull Runnable runnable) {
               String name = "name";
               Thread thread = new Thread(runnable, name);
               thread.setDaemon(true);
               return thread;
           }
       });


2、建立任务


 private class Task implements Callable<String> {

       private final String data;
       private final CountDownLatch latch;

       public Task(String data, CountDownLatch latch) {
           this.data = data;
           this.latch = latch;
       }


       @Override
       public String call() throws Exception {
           try {
               return transform(data);
           } catch (Exception e) {
              
               return null;
           } finally {
               // 这个非常重要,不管任务执行成功,失败,都要释放
               latch.countDown();
           }

       }
   }

3、执行任务

// Lists.newArrayList(); 用到guava ,初始化数据自己组织
List<String> strings = Lists.newArrayList();

List<Future<String>> futureList = Lists.newArrayList();

       final CountDownLatch latch = new CountDownLatch(strings.size());

       for (String string : strings) {

           final Future<String> future = executorService.submit(new Task(string, latch) {
           });

           futureList.add(future);
       }

       try {
           latch.await();
       } catch (InterruptedException e) {
            // log&monitor

       }

       List<String> resultList = Lists.newArrayList();
       for (Future<String> future : futureList) {

           try {
               final String str = future.get(300, TimeUnit.MILLISECONDS);

               if (str != null) {
                   resultList(str);
               }

           } catch (Exception e) {
               // log&monitor
           }
       }

实现原理

实现的代码真的是很简单,其底层是由AbstractQueuedSynchronizer(java并发的核心类) 支持的
类存在一个内部类Sync,继承自AbstractQueuedSynchronizer,其源代码如下


/**
    * Synchronization control For CountDownLatch.
    * Uses AQS state to represent count.
    */
   private static final class Sync extends AbstractQueuedSynchronizer {
       private static final long serialVersionUID = 4982264981922014374L;

       Sync(int count) {
           setState(count);
       }

       int getCount() {
           return getState();
       }

       protected int tryAcquireShared(int acquires) {
           return (getState() == 0) ? 1 : -1;
       }

       protected boolean tryReleaseShared(int releases) {
           // Decrement count; signal when transition to zero
           for (;;) {
               int c = getState();
               if (c == 0)
                   return false;
               int nextc = c-1;
               if (compareAndSetState(c, nextc))
                   return nextc == 0;
           }
       }
   }


相关文章

网友评论

      本文标题:【并发】CountDownLatch

      本文链接:https://www.haomeiwen.com/subject/pasigxtx.html