同步屏障CyclicBarrier

作者: 守住阳光 | 来源:发表于2018-07-26 16:27 被阅读0次

            CyclicBarrier字面意思是可循环(cyclic)使用的屏障(barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,知道最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

    一、CyclicBarrier简介

            CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。如下代码示例:

    public class CyclicBarrierTest {

        static  CyclicBarrier c = new CyclicBarrier(2);

        public static void main(String[] args) {

                new Thread(new Runnable() {

                       @Override

                        public void run() {

                                try {

                                       c.await();

                                 } catch (Exception e) {

                                 }

                                System.out.println(1);

                        }

               }).start();

               try {

                       c.await();

                } catch (Exception e) {

                }

                System.out.println(2);

          }

    }

            因为主线程和子线程的调度是由CPU决定的,两个线程都有可能先执行,所以会产生两种输出,第一种可能输出如下为1 2 ,第二种输出为2  1。

            如果把new CyclicBarrier(2)修改成new CyclicBarrier(3),则主线程和子线程会永远等待,因为没有第三个线程执行await方法,即没有第三个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行。

            CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrier�Action),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。如下代码:

    public class CyclicBarrierTest2 {

        static CyclicBarrier c = new CyclicBarrier(2, new A());

        public static void main(String[] args) {

                new Thread(new Runnable() {

                    @Override

                    public void run() {

                        try {

                            c.await();

                        } catch (Exception e) {

                        }

                        System.out.println(1);

                    }

            }).start();

            try {

                  c.await();

            } catch (Exception e) {

            }

            System.out.println(2);

       }

       static class A implements Runnable {

            @Override

            public void run() {

                System.out.println(3);

            }

        }

    }

            因为CyclicBarrier设置了拦截线程的数量是2,所以必须等代码中的第一个线程和线程A都执行完之后,才会继续执行主线程,然后输出2,所以代码执行后的输出如下:3  1  2。

    二、CyclicBarrier的应用场景

            CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。例如,用一个Excel保存了用户所有银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。代码如下:

    publicclass BankWaterService implements Runnable {

            //创建4个屏障,处理完之后执行当前类的run方法

            private CyclicBarrier c = new CyclicBarrier(4, this);

            //假设只有4个sheet,所以只启动4个线程

            private Executor executor = Executors.newFixedThreadPool(4);

            //保存每个sheet计算出的银流结果

            private ConcurrentHashMap  sheetBankWaterCount = new ConcurrentHashMap();

            private  void count() {

                 for (int i = 0; i< 4; i++) {

                        executor.execute(new Runnable() {

                            @Override

                            public void run() {

                                    //计算当前sheet的银流数据,计算代码省略

                                    sheetBankWaterCount.put(Thread.currentThread().getName(), 1);

                                    //银流计算完成,插入一个屏障

                                    try {

                                        c.await();

                                    } catch (InterruptedException |BrokenBarrierException e) {

                                        e.printStackTrace();

                                    }

                            }

                    });

                 }

            }

           @Override

           publicvoid run() {

                int result = 0;

                //汇总每个sheet计算出的结果

                for (Entrysheet : sheetBankWaterCount.entrySet()) {

                        result += sheet.getValue();

                }

                //将结果输出

                sheetBankWaterCount.put("result", result);

                System.out.println(result);

           }

            public  static  void main(String[] args) {

                    BankWaterService bankWaterCount = new BankWaterService();

                    bankWaterCount.count();

            }

    }

    三、CyclicBarrier和CountDownLatch的区别

            CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。

            CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得Cyclic-Barrier阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断。如下代码执行完之后会返回true。

    public class CyclicBarrierTest3 {

        static  CyclicBarrier c = new CyclicBarrier(2);

        public static void main(String[] args) throws InterruptedException,BrokenBarrierException {

                Thread thread = new Thread(new Runnable() {

                        @Override

                        public void run() {

                            try {

                                c.await();

                            } catch (Exception e) {

                             }

                       }

                });

                thread.start();

                thread.interrupt();

                try {

                    c.await();

                } catch (Exception e) {

                    System.out.println(c.isBroken());

                }

          }

    }

    相关文章

      网友评论

        本文标题:同步屏障CyclicBarrier

        本文链接:https://www.haomeiwen.com/subject/nambmftx.html