美文网首页
并发容器和并发工具类的使用

并发容器和并发工具类的使用

作者: 若尘0328 | 来源:发表于2017-12-12 14:27 被阅读5次

CyclicBarrier和CountDownLatch

CyclicBarrier :字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。
下面的代码是开启五个线程去生成数据,等生成数据结束再启动一个线程去汇总生成的数据

//五个线程去生成数据,等三个线程生成数据完成再启动一个线程去汇总结果
public class CyclicBarrierDemo {
    private static ConcurrentHashMap<String,Integer> map=new ConcurrentHashMap<String,Integer>();
    static class Producer implements Runnable{
        private CyclicBarrier cyclicBarrier;
        public Producer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            Random random=new Random();
            int num=random.nextInt(1000)+1000;
            System.out.println(Thread.currentThread().getId()+"product "+num);
            map.put(Thread.currentThread().getId( )+"",num);
            try {
                Thread.sleep(1000);
                //生成数据后会在这里等待(阻塞),直到所有的线程都生成数据结束再放行
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
    private static class Consumer implements Runnable{
        @Override
        public void run() {
            int sum=0;
            for (Map.Entry<String,Integer> entry:map.entrySet()){
                sum+=entry.getValue();
            }
            System.out.println("consumer caculate sum ="+sum);
        }
    }

    public static void main(String[] args) {
        //这个构造方法的意思是等五个线程都到达屏障之后,开始执行Consumer线程的run方法
        CyclicBarrier cyclicBarrier=new CyclicBarrier(5,new Consumer());
        Producer producer=new Producer(cyclicBarrier);
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(producer);
            thread.start();
        }
    }
}

执行结果

11product 1890
12product 1849
13product 1314
14product 1346
15product 1658
consumer caculate sum =8057

CountDownLatch 允许一个或多个线程等待其他线程完成操作。CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。当我们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零。由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,只需要把这个CountDownLatch的引用传递到线程里即可。
上面的场景同样可以使用CountDownLatch来实现:

public class CountDownLatchDemo {

    static class Producer implements Runnable{
        private CountDownLatch countDownLatch;
        private ConcurrentHashMap<String,Integer> map;
        public Producer(CountDownLatch countDownLatch,ConcurrentHashMap<String,Integer> map) {
            this.countDownLatch = countDownLatch;
            this.map=map;
        }
        @Override
        public void run() {
            Random random=new Random();
            int num=random.nextInt(1000)+1000;
            System.out.println(Thread.currentThread().getId()+"product "+num);
            map.put(Thread.currentThread().getId( )+"",num);
            //生产完了就去减1
            countDownLatch.countDown();
        }
    }
    static class Consumer implements Runnable{
        private ConcurrentHashMap<String,Integer> map;
        private CountDownLatch countDownLatch;

        public Consumer(ConcurrentHashMap<String, Integer> map, CountDownLatch countDownLatch) {
            this.map = map;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                //消费者一直阻塞,直到生产者生产完毕,把countDownLatch的count减到0
                countDownLatch.await();
                int sum=0;
                for (Map.Entry<String,Integer> entry:map.entrySet()){
                    sum+=entry.getValue();
                }
                System.out.println("consumer calculate sum ="+sum );
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        int producorThreadSize=5;
        CountDownLatch countDownLatch=new CountDownLatch(producorThreadSize);
        ConcurrentHashMap map=new ConcurrentHashMap();
        Producer producer=new Producer(countDownLatch,map);
        for (int i = 0; i < producorThreadSize; i++) {
            new Thread(producer).start();
        }
        new Thread(new Consumer(map,countDownLatch)).start();
    }
}

执行结果:

13product 1978
15product 1752
14product 1449
12product 1373
11product 1318
consumer calculate sum =7870

CyclicBarrier和CountDownLatch的区别:
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,CountDownLatch.await一般阻塞主线程,所有的工作线程执行countDown,而CyclicBarrierton通过工作线程调用await从而阻塞工作线程,直到所有工作线程达到屏障。

相关文章

网友评论

      本文标题:并发容器和并发工具类的使用

      本文链接:https://www.haomeiwen.com/subject/vyvgixtx.html