美文网首页
并发工具类(2):CyclicBarrier

并发工具类(2):CyclicBarrier

作者: 放开那个BUG | 来源:发表于2018-11-25 11:45 被阅读0次

    概述

    CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

    实例代码如下:

    package com.cyclicbarrier;
    
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    public class Test {
    
        
        static class Writer implements Runnable{
    
            private CyclicBarrier cyclicBarrier;
            public Writer(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier = cyclicBarrier;
            }
            
            @Override
            public void run() {
                System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
                 try {
                        Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                        System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }catch(BrokenBarrierException e){
                        e.printStackTrace();
                    }
                    System.out.println("所有线程写入完毕,继续处理其他任务...");
            }
            
        }
        
        
        public static void main(String[] args) {
            CyclicBarrier barrier = new CyclicBarrier(4);
            for(int i = 0; i < 4; i++)
                new Thread(new Writer(barrier)).start();
        }
    }
    

    打印结果:


    从上面输出结果可以看出,每个写入线程执行完写数据操作之后,就在等待其他线程写入操作完毕。
    当所有线程线程写入操作完毕之后,所有线程就继续进行后续的操作了。

    CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。代码如下:

    public class Test {
        public static void main(String[] args) {
            int N = 4;
            CyclicBarrier barrier  = new CyclicBarrier(N,new Runnable() {
                @Override
                public void run() {
                    System.out.println("当前线程"+Thread.currentThread().getName());   
                }
            });
     
            for(int i=0;i<N;i++)
                new Writer(barrier).start();
        }
        static class Writer extends Thread{
            private CyclicBarrier cyclicBarrier;
            public Writer(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier = cyclicBarrier;
            }
     
            @Override
            public void run() {
                System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
                try {
                    Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                    System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }catch(BrokenBarrierException e){
                    e.printStackTrace();
                }
                System.out.println("所有线程写入完毕,继续处理其他任务...");
            }
        }
    }
    

    打印结果:


    CyclicBarrier的应用场景

    CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。

    CyclicBarrier和CountDownLatch的区别

    • CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
    • CountDownLatch基于AQS;CyclicBarrier基于锁和Condition。本质上都是依赖于volatile和CAS实现的。

    源码解析

    这里有一篇关于CyclicBarrier的源码解析,https://www.cnblogs.com/go2sea/p/5615531.html

    相关文章

      网友评论

          本文标题:并发工具类(2):CyclicBarrier

          本文链接:https://www.haomeiwen.com/subject/pcqpqqtx.html