美文网首页
java CyclicBarrier 循环屏障的使用及意外处理

java CyclicBarrier 循环屏障的使用及意外处理

作者: 饱饱想要灵感 | 来源:发表于2022-08-25 18:38 被阅读0次

    功能强大不是多线程的核心, 异常处理才是

    CyclicBarrier 循环屏障用于在线程中设立屏障, 当指定数目的线程到达屏障前, 才会放开屏障, 让线程继续后面的动作.

    例如, 宿舍6人聚餐, 约定6人集齐才能开饭, 那这个约定就类似于设立了循环屏障, 先到的人必须等着, 6人都到才能开饭.

    请看如下代码示例:

        @Test
        public void testCyclicBarrier() {
            // 循环屏障, 第一个参数指阻塞线程数, 第二个指达到阻塞线程数时先触发函数,然后才放行阻塞中的线程
            CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
                // 由最后一个到达的线程来执行
                System.out.printf("由%s号玩家汇报, 所有玩家已准备就绪,开始游戏!\n", Thread.currentThread().getName());
            });
    
            ExecutorService executorService = Executors.newCachedThreadPool();
    
            for (int i = 1; i <= 7; i++) {
                executorService.submit(() -> {
                    System.out.println(Thread.currentThread().getName() + "号玩家准备就绪...");
                    try {
                        cyclicBarrier.await();  // 有7个线程阻塞于此, 然后会执行下一行代码
    //                    cyclicBarrier.await(3, TimeUnit.SECONDS);  // 若超过3秒, 则屏障破损, 直接进入catch语句块, 即便后续达成条件也无法正常执行
                        System.out.println(Thread.currentThread().getName() + "号玩家通关了~");
                    } catch (InterruptedException | BrokenBarrierException  e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    

    那么问题来了
    以上出现的是最理想的情况, 假如设定屏障解除条件为7个线程, 但是只有6个到达, 或者8个到达, 又该怎么办呢?

    思考之后, 往下滚动查看解决方案~




















    分析:

    1. 如果只有6个线程到达, 假如使用cyclicBarrier.await(3, TimeUnit.SECONDS), 这6个线程就会全部超时报错, 屏障破损; 如果使用cyclicBarrier.await(), 就会一直处于阻塞状态;
    2. 如果有8个线程到达, 前面7个可以正常执行, 但是第8个会陷入和第1点一样的情况

    基于以上分析, 当然可以让它们都超时报错, 直接结束程序; 但也可以进行补救, 使它们都正常执行. 代码如下:

        public static void main(String[] args) {
            // 循环屏障, 第一个参数指阻塞线程数, 第二个指达到阻塞线程数时先触发函数,然后才放行阻塞中的线程
            CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
                // 由最后一个到达的线程来执行
                System.out.printf("由%s号玩家汇报, 所有玩家已准备就绪,开始游戏!\n", Thread.currentThread().getName());
            });
    
            ExecutorService executorService = Executors.newCachedThreadPool();
    
            for (int i = 1; i <= 8; i++) {
                executorService.submit(() -> {
                    System.out.println(Thread.currentThread().getName() + "号玩家准备就绪...");
                    try {
                        cyclicBarrier.await();  // 有7个线程阻塞于此, 然后会执行下一行代码
    //                    cyclicBarrier.await(3, TimeUnit.SECONDS);  // 若超过3秒, 则屏障破损, 直接进入catch语句块, 即便后续达成条件也无法正常执行
                        System.out.println(Thread.currentThread().getName() + "号玩家通关了~");
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                });
            }
    
           
            try {
                Thread.sleep(3000);
                System.out.println("3秒之后,观察循环屏障状态~");
                if (cyclicBarrier.getNumberWaiting() > 0 && cyclicBarrier.getNumberWaiting() < cyclicBarrier.getParties()) {
                    System.out.printf("还有%d人在苦苦等待, 说明程序已经不正常了!!\n", cyclicBarrier.getNumberWaiting());
                    ScheduledExecutorService warningPool = Executors.newScheduledThreadPool(1);
                    warningPool.scheduleAtFixedRate(() -> {
                        if (cyclicBarrier.getNumberWaiting() > 0 && cyclicBarrier.getNumberWaiting() < cyclicBarrier.getParties()) {
                            System.out.printf("紧急告警!!!循环屏障中还有%d人在苦苦等待!!!\n", cyclicBarrier.getNumberWaiting());
                        } else {
                            warningPool.shutdown();
                            System.out.println("我是告警任务, 我还没被关闭! 我任务已经完成! 请过来关我~");
                        }
                    }, 0, 1000, TimeUnit.MILLISECONDS);
                }
    
                Thread.sleep(5000);
                if (cyclicBarrier.getNumberWaiting() > 0 && cyclicBarrier.getNumberWaiting() < cyclicBarrier.getParties()) {
                    System.out.println("5秒之后安排新玩家救场~");
    
                    // 缺口数量 = 条件数量 - 等待数量
                    int lackCount = cyclicBarrier.getParties() - cyclicBarrier.getNumberWaiting();
    
                    for (int count = lackCount; count > 0; count--) {
                        executorService.submit(() -> {
                            System.out.println(Thread.currentThread().getName() + "号玩家准备救场...");
                            try {
                                cyclicBarrier.await(3, TimeUnit.SECONDS);
                                System.out.println(Thread.currentThread().getName() + "号玩家救场完成~");
                            } catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
                                e.printStackTrace();
                            }
                        });
                    }
                }
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    

    总结:

    1. CyclicBarrier 虽然功能强大, 但是需要考虑完整的异常处理情况;
    2. 对CyclicBarrier补救时, 会耗费一定的资源;
    3. 功能强大不是多线程的核心, 异常处理才是

    相关文章

      网友评论

          本文标题:java CyclicBarrier 循环屏障的使用及意外处理

          本文链接:https://www.haomeiwen.com/subject/vcztnrtx.html