美文网首页
CyclicBarrier的克星—BrokenBarrierEx

CyclicBarrier的克星—BrokenBarrierEx

作者: 消失er | 来源:发表于2020-06-17 15:14 被阅读0次

    上篇 CyclicBarrier多任务协同的利器 我们借助部门TB的例子,一步步分析了 CyclicBarrier 多线程协调的功能。
    CyclicBarrier 功能强大的同时,意味着提供了更多的API,并且在使用过程中,可能有一些注意点。

    今天就来聊聊 BrokenBarrierException,从名字就能看出,是“屏障被破坏异常”,屏障被破坏时,CyclicBarrier 的期望功能就不能完成,甚至导致程序异常;
    BrokenBarrierException 可谓是 CyclicBarrier 的克星。

    上篇的例子,我们仅仅使用了 CyclicBarrier 最基本的API

    public CyclicBarrier(int parties);
    CyclicBarrier(int parties, Runnable barrierAction);
    public int await();
    

    实际还有:

    int getParties():获取CyclicBarrier打开屏障的线程数量,也成为方数
    int getNumberWaiting():获取正在CyclicBarrier上等待的线程数量
    int await(timeout,TimeUnit):带限时的阻塞等待
    boolean isBroken():获取是否破损标志位broken的值
    void reset():使得CyclicBarrier回归初始状态
    

    我们重点介绍一下,能够导致 BrokenBarrierException 的操作,然后给出详细示例:

    首先是 await() 和 await(timeout,TimeUnit)带时限的阻塞等待

        /**
         * Waits until all {@linkplain #getParties parties} have invoked
         * {@code await} on this barrier.
         *
         * <p>If the current thread is not the last to arrive then it is
         * disabled for thread scheduling purposes and lies dormant until
         * one of the following things happens:
         * <ul>
         * <li>The last thread arrives; or
         * <li>Some other thread {@linkplain Thread#interrupt interrupts}
         * the current thread; or
         * <li>Some other thread {@linkplain Thread#interrupt interrupts}
         * one of the other waiting threads; or
         * <li>Some other thread times out while waiting for barrier; or
         * <li>Some other thread invokes {@link #reset} on this barrier.
         * </ul>
         *
         * <p>If the current thread:
         * <ul>
         * <li>has its interrupted status set on entry to this method; or
         * <li>is {@linkplain Thread#interrupt interrupted} while waiting
         * </ul>
         * then {@link InterruptedException} is thrown and the current thread's
         * interrupted status is cleared.
         *
         * <p>If the barrier is {@link #reset} while any thread is waiting,
         * or if the barrier {@linkplain #isBroken is broken} when
         * {@code await} is invoked, or while any thread is waiting, then
         * {@link BrokenBarrierException} is thrown.
         *
         * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
         * then all other waiting threads will throw
         * {@link BrokenBarrierException} and the barrier is placed in the broken
         * state.
         *
         * <p>If the current thread is the last thread to arrive, and a
         * non-null barrier action was supplied in the constructor, then the
         * current thread runs the action before allowing the other threads to
         * continue.
         * If an exception occurs during the barrier action then that exception
         * will be propagated in the current thread and the barrier is placed in
         * the broken state.
         *
         * @return the arrival index of the current thread, where index
         *         {@code getParties() - 1} indicates the first
         *         to arrive and zero indicates the last to arrive
         * @throws InterruptedException if the current thread was interrupted
         *         while waiting
         * @throws BrokenBarrierException if <em>another</em> thread was
         *         interrupted or timed out while the current thread was
         *         waiting, or the barrier was reset, or the barrier was
         *         broken when {@code await} was called, or the barrier
         *         action (if present) failed due to an exception
         */
        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    

    await() 源码注释,描述了方法功能:调用该方法的线程进入等待,在CyclicBarrier上进行阻塞等待,直到发生以下情形之一:

    • 在CyclicBarrier上等待的线程数量达到parties,则所有线程被释放,继续执行。—— 正常情形
    • 当前线程被中断,则抛出InterruptedException异常,并停止等待,继续执行。
    • 其他等待的线程被中断,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
    • 其他等待的线程超时,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
    • 其他线程调用CyclicBarrier.reset()方法,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。

    除了第一种属于正常的情形,其他的都会导致 BrokenBarrierException。
    带时限的await() 会抛出 TimeoutException;

    public int await(long timeout, TimeUnit unit) throws InterruptedException,
                                                                 BrokenBarrierException,
                                                                 TimeoutException
    

    当前线程等待超时,则抛出TimeoutException异常,并停止等待,继续执行。
    当前线程抛出 TimeoutException 异常时,其他线程会抛出 BrokenBarrierException 异常。

    await() 和 await(timeout,TimeUnit)带时限的阻塞等待,总共会有4种情形,产生 BrokenBarrierException;
    下面我们一一来看。

    Barrier被破坏的4中情形

    为了加深大家对 CyclicBarrier 使用场景的熟悉,我们在复现产生 BrokenBarrierException 的4种情形时,使用运动员比赛的例子:

    1.如果有线程已经处于等待状态,调用reset方法会导致已经在等待的线程出现BrokenBarrierException异常。并且由于出现了BrokenBarrierException,将会导致始终无法等待。

    比如,五个运动员,其中一个在等待发令枪的过程中错误地接收到裁判传过来的指令,导致这个运动员以为今天比赛取消就离开了赛场。但是其他运动员都领会的裁判正确的指令,剩余的运动员在起跑线上无限地等待下去,并且裁判看到运动员没有到齐,也不会打发令枪。

    class MyThread extends Thread {
        private CyclicBarrier cyclicBarrier;
        private String name;
        private int ID;
    
        public MyThread(CyclicBarrier cyclicBarrier, String name,int ID) {
            super();
            this.cyclicBarrier = cyclicBarrier;
            this.name = name;
            this.ID=ID;
    
        }
        @Override
        public void run() {
            System.out.println(name + "开始准备");
            try {
                Thread.sleep(ID*1000);  //不同运动员准备时间不一样,方便模拟不同情况
                System.out.println(name + "准备完毕!在起跑线等待发令枪");
                try {
                    cyclicBarrier.await();
                    System.out.println(name + "跑完了路程!");
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                    System.out.println(name+"看不见起跑线了");
                }
                System.out.println(name+"退场!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    
    }
    public class Test {
    
        public static void main(String[] args) throws InterruptedException {
            CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
                @Override
                public void run() {
                    System.out.println("发令枪响了,跑!");
    
                }
            });
    
            for (int i = 0; i < 5; i++) {
                new MyThread(barrier, "运动员" + i + "号", i).start();
            }
            Thread.sleep(1000);
            barrier.reset();
        }
    }
    

    当发生 BrokenBarrierException 时,CyclicBarrier的保障被破坏,不能完成原功能;对应比赛场景,相当于运动员退场了。

    运行结果:

    运动员0号开始准备
    运动员2号开始准备
    运动员3号开始准备
    运动员0号准备完毕!在起跑线等待发令枪
    运动员1号开始准备
    运动员4号开始准备
    运动员1号准备完毕!在起跑线等待发令枪
    运动员0号看不见起跑线了
    运动员0号退场!
    java.util.concurrent.BrokenBarrierException
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
        at com.ljheee.juc.BrokenBarrierExceptionDemo$MyThread.run(BrokenBarrierExceptionDemo.java:31)
    运动员2号准备完毕!在起跑线等待发令枪
    运动员3号准备完毕!在起跑线等待发令枪
    运动员4号准备完毕!在起跑线等待发令枪
    

    从输出可以看到,运动员0号在等待的过程中,主线程调用了reset方法,导致抛出BrokenBarrierException异常。但是其他线程并没有受到影响,它们会一直等待下去,从而一直被阻塞。
    此时程序一直没停。

    这种场景下,因为有参与者提前离开,导致剩余参与者永久等待。

    2.如果在等待的过程中,线程被中断,也会抛出BrokenBarrierException异常,并且这个异常会传播到其他所有的线程。

    public class Test {
        static   Map<Integer,Thread>   threads=new HashMap<>();
        public static void main(String[] args) throws InterruptedException {
            CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
                @Override
                public void run() {
                    System.out.println("发令枪响了,跑!");
    
                }
            });
    
            for (int i = 0; i < 5; i++) {
            MyThread t = new MyThread(barrier, "运动员" + i + "号", i);
                threads.put(i, t);
                t.start();
            }
            Thread.sleep(3000);
            threads.get(1).interrupt();
        }
    }
    

    运行结果:

    运动员0号开始准备
    运动员2号开始准备
    运动员3号开始准备
    运动员1号开始准备
    运动员0号准备完毕!在起跑线等待发令枪
    运动员4号开始准备
    运动员1号准备完毕!在起跑线等待发令枪
    运动员2号准备完毕!在起跑线等待发令枪
    运动员3号准备完毕!在起跑线等待发令枪
    java.lang.InterruptedException
    运动员3号看不见起跑线了
    运动员3号退场!
    运动员2号看不见起跑线了
    运动员2号退场!
    运动员0号看不见起跑线了
    运动员0号退场!
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:234)
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
        at thread.MyThread.run(MyThread.java:27)
    java.util.concurrent.BrokenBarrierException
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
        at thread.MyThread.run(MyThread.java:27)
    java.util.concurrent.BrokenBarrierException
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
        at thread.MyThread.run(MyThread.java:27)
    java.util.concurrent.BrokenBarrierException
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
        at thread.MyThread.run(MyThread.java:27)
    运动员4号准备完毕!在起跑线等待发令枪
    java.util.concurrent.BrokenBarrierException
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
        at thread.MyThread.run(MyThread.java:27)
    运动员4号看不见起跑线了
    运动员4号退场!
    

    从输出可以看到,其中一个线程被中断,那么所有的运动员都退场了。
    在实际使用CyclicBarrier,一定要防止这种情况发生。

    3.如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,其他线程会抛出BrokenBarrierException,屏障被损坏。
    这个就好比运动员都没有问题,而是裁判出问题了。裁判权力比较大,直接告诉所有的运动员,今天不比赛了,你们都回家吧!

    public class Test {
        static Map<Integer, Thread> threads = new HashMap<>();
    
        public static void main(String[] args) throws InterruptedException {
            CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
                @Override
                public void run() {
                    String str = null;
                    str.substring(0, 1);// 模拟异常
                    System.out.println("发令枪响了,跑!");
    
                }
            });
    
            for (int i = 0; i < 5; i++) {
                MyThread t = new MyThread(barrier, "运动员" + i + "号", i);
                threads.put(i, t);
                t.start();
            }
    
        }
    }
    

    运行结果:

    运动员0号开始准备
    运动员3号开始准备
    运动员2号开始准备
    运动员1号开始准备
    运动员4号开始准备
    运动员0号准备完毕!在起跑线等待发令枪
    运动员1号准备完毕!在起跑线等待发令枪
    运动员2号准备完毕!在起跑线等待发令枪
    运动员3号准备完毕!在起跑线等待发令枪
    运动员4号准备完毕!在起跑线等待发令枪
    Exception in thread "Thread-4" java.util.concurrent.BrokenBarrierException
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
        at thread.MyThread.run(MyThread.java:27)
    运动员0号看不见起跑线了
    运动员0号退场!
    java.util.concurrent.BrokenBarrierException
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
        at thread.MyThread.run(MyThread.java:27)
    运动员3号看不见起跑线了
    运动员3号退场!
    java.util.concurrent.BrokenBarrierException
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
        at thread.MyThread.run(MyThread.java:27)
    运动员1号看不见起跑线了
    运动员1号退场!
    java.lang.NullPointerException
        at thread.Test$1.run(Test.java:15)
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:220)
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
        at thread.MyThread.run(MyThread.java:27)
    java.util.concurrent.BrokenBarrierException
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
        at thread.MyThread.run(MyThread.java:27)
    运动员2号看不见起跑线了
    运动员2号退场!
    

    我们在 CyclicBarrier 的构造方法中指定回调函数,并模拟了异常;
    可以看到,如果在执行屏障动作的过程中出现异常,那么所有的线程都会抛出BrokenBarrierException异常。
    这也提醒我们,使用带回调的CyclicBarrier构造方法时,指定的回调任务一定不要抛出异常,或者实现异常处理。

    4.如果超出指定的等待时间,当前线程会抛出 TimeoutException 异常,其他线程会抛出BrokenBarrierException异常。

    public class MyThread extends Thread {
        private CyclicBarrier cyclicBarrier;
        private String name;
        private int ID;
    
        public MyThread(CyclicBarrier cyclicBarrier, String name, int ID) {
            super();
            this.cyclicBarrier = cyclicBarrier;
            this.name = name;
            this.ID = ID;
    
        }
    
        @Override
        public void run() {
            System.out.println(name + "开始准备");
            try {
                Thread.sleep(ID * 1000);
                System.out.println(name + "准备完毕!在起跑线等待发令枪");
                try {
                    try {
                        cyclicBarrier.await(ID * 1000, TimeUnit.MILLISECONDS);
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                    System.out.println(name + "跑完了路程!");
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                    System.out.println(name + "看不见起跑线了");
                }
                System.out.println(name + "退场!");
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
    
        }
    }
    

    运行结果:

    运动员0号开始准备
    运动员3号开始准备
    运动员2号开始准备
    运动员1号开始准备
    运动员0号准备完毕!在起跑线等待发令枪
    运动员4号开始准备
    运动员0号跑完了路程!
    运动员0号退场!
    java.util.concurrent.TimeoutException
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:257)
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
        at com.ljheee.juc.BrokenBarrierExceptionDemo$MyThread.run(BrokenBarrierExceptionDemo.java:34)
    运动员1号准备完毕!在起跑线等待发令枪
    运动员2号准备完毕!在起跑线等待发令枪
    运动员1号跑完了路程!
    运动员1号退场!
    运动员2号看不见起跑线了
    运动员2号退场!
    java.util.concurrent.TimeoutException
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:257)
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
        at com.ljheee.juc.BrokenBarrierExceptionDemo$MyThread.run(BrokenBarrierExceptionDemo.java:34)
    java.util.concurrent.BrokenBarrierException
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
        at com.ljheee.juc.BrokenBarrierExceptionDemo$MyThread.run(BrokenBarrierExceptionDemo.java:34)
    运动员3号准备完毕!在起跑线等待发令枪
    java.util.concurrent.BrokenBarrierException
    运动员3号看不见起跑线了
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
    运动员3号退场!
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
        at com.ljheee.juc.BrokenBarrierExceptionDemo$MyThread.run(BrokenBarrierExceptionDemo.java:34)
    运动员4号准备完毕!在起跑线等待发令枪
    java.util.concurrent.BrokenBarrierException
    运动员4号看不见起跑线了
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
    运动员4号退场!
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
        at com.ljheee.juc.BrokenBarrierExceptionDemo$MyThread.run(BrokenBarrierExceptionDemo.java:34)
    

    从输出可以看到,如果其中一个参与者抛出TimeoutException,其他参与者会抛出 BrokenBarrierException。

    如何处理 BrokenBarrierException ?

    可以看到,使用 CyclicBarrier 还需注意许多事项,其中 BrokenBarrierException 被称为是 CyclicBarrier 的克星;
    那又如何 处理/预防 BrokenBarrierException 呢?

    当然,要预防,还需从 CyclicBarrier 的设计开始考虑,设计者已经帮我们考虑了一些问题,如检查是否被破坏,重置 CyclicBarrier 等。

    boolean isBroken():获取是否破损标志位broken的值,此值有以下几种情况:

    • CyclicBarrier初始化时,broken=false,表示屏障未破损。
    • 如果正在等待的线程被中断,则broken=true,表示屏障破损。
    • 如果正在等待的线程超时,则broken=true,表示屏障破损。
    • 如果有线程调用CyclicBarrier.reset()方法,则broken=false,表示屏障回到未破损状态。

    void reset():使得 CyclicBarrier 回归初始状态,直观来看它做了两件事:

    • 如果有正在等待的线程,则会抛出BrokenBarrierException异常,且这些线程停止等待,继续执行。
    • 将是否破损标志位broken置为false。

    在任务协同阶段,我们可以借助这两个API来做辅助;
    当然源码设计者肯定不能从源头将所有问题都解决,剩下的是需要我们根据业务情况,看是需要终止协作:抛异常、还是直接退出。
    并且根据触发 BrokenBarrierException 的场景,我们在相关代码实现时,尽量规避。


    推荐阅读

    本文首发于 公众号 架构道与术(ToBeArchitecturer),欢迎关注、学习更多干货~

    推荐阅读
    

    相关文章

      网友评论

          本文标题:CyclicBarrier的克星—BrokenBarrierEx

          本文链接:https://www.haomeiwen.com/subject/zrdsxktx.html