美文网首页
线程并发工具之CyclicBarrier

线程并发工具之CyclicBarrier

作者: 传达室马大爷 | 来源:发表于2020-04-06 17:33 被阅读0次

    CyclicBarrier

    字面上的意思是循环障碍物,可以循环使用的一个障碍物。
    作用是控制阻塞多个线程等待,同时完成之后才能继续进行下一步的操作

    比如三个运动员A、B、C,分别要比试100米、1000米、5000米跑步,A、B、C要全部完成100米短跑之后,才能一起开始1000米跑步比赛,如果A先跑完了100米,必须要等待B、C完成后才能够继续,即1000米的发令枪裁判员必须等到A、B、C都跑完100米后在发1000米跑步的指令
    构造方法
    public CyclicBarrier(int parties);
    
    public CyclicBarrier(int parties, Runnable barrierAction);
    

    parties为线程的个数,这里即为运动员个数,每一个运动员即为一个线程
    Runnable barrierAction为在最后一个线程完成后解除阻塞后异步处理的事情,比如在每次跑步都达到终点后裁判需要统计一下每个人的分数和排名

    使用方法
    public int await() throws InterruptedException, BrokenBarrierException
    
    public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
    

    await()方法被调用时表示当前线程已经到达栅栏
    await(long timeout, TimeUnit unit):线程在timeout时间内进入waiting状态,如果超过这个时间则自动唤醒,继续执行

    实例:模拟上面A(张三)、B(李四)、C(王五)三个运动员比赛
    import java.util.ArrayList;
    import java.util.Comparator;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ThreadLocalRandom;
    import java.util.stream.Collectors;
    
    import com.shawntime.enjoy.architect.concurrency.SleepUtils;
    
    public class CyclicBarrierTest {
    
        public static void main(String[] args) throws InterruptedException {
            System.out.println("比赛开始");
            final List<Score> scoreList = new ArrayList<>();
            CyclicBarrier cyclicBarrier = new CyclicBarrier(3,  new ScoreRunnable(scoreList));
            Sportsman sportsman1 = new Sportsman("张三", cyclicBarrier, scoreList);
            sportsman1.start();
            Sportsman sportsman2 = new Sportsman("李四", cyclicBarrier, scoreList);
            sportsman2.start();
            Sportsman sportsman3 = new Sportsman("王五", cyclicBarrier, scoreList);
            sportsman3.start();
    
            sportsman1.join();
            sportsman2.join();
            sportsman3.join();
            System.out.println("比赛结束");
        }
    
        private static class ScoreRunnable implements Runnable {
    
            private List<Score> scoreList;
    
            public ScoreRunnable(List<Score> scoreList) {
                this.scoreList = scoreList;
            }
    
            @Override
            public void run() {
                Map<Integer, List<Score>> distanceMap = scoreList.stream()
                        .collect(Collectors.groupingBy(Score::getDistance));
                distanceMap.forEach((distance, scores) -> {
                    System.out.println(distance + "米跑步结果");
                    scores.stream()
                            .sorted(Comparator.comparingInt(Score::getTime))
                            .forEach(score -> System.out.println(score.getSportsmanName() + " : " + score.getTime() + "秒"));
                });
            }
        }
    
        private static class Sportsman extends Thread {
    
            private String sportsmanName;
    
            private CyclicBarrier cyclicBarrier;
    
            private List<Score> scoreList;
    
            public Sportsman(String sportsmanName,
                             CyclicBarrier cyclicBarrier,
                             List<Score> scoreList) {
                this.sportsmanName = sportsmanName;
                this.cyclicBarrier = cyclicBarrier;
                this.scoreList = scoreList;
            }
    
            @Override
            public void run() {
                try {
                    System.out.println(sportsmanName + "100米准备就绪...");
                    int time = ThreadLocalRandom.current().nextInt(5);
                    SleepUtils.sleepBySeconds(time);
                    Score score = getScore(sportsmanName, 100, time);
                    scoreList.add(score);
                    System.out.println(sportsmanName + "100米完成,1000米准备就绪...");
                    cyclicBarrier.await();
    
                    time = ThreadLocalRandom.current().nextInt(10);
                    SleepUtils.sleepBySeconds(time);
                    Score score2 = getScore(sportsmanName, 1000, time);
                    scoreList.add(score2);
                    System.out.println(sportsmanName + "1000米完成,5000米准备就绪...");
                    cyclicBarrier.await();
    
                    time = ThreadLocalRandom.current().nextInt(20);
                    SleepUtils.sleepBySeconds(time);
                    Score score3 = getScore(sportsmanName, 5000, time);
                    scoreList.add(score3);
                    System.out.println(sportsmanName + "5000米完成");
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
    
            private Score getScore(String name, int distance, int time) {
                Score score = new Score();
                score.setDistance(distance);
                score.setSportsmanName(name);
                score.setTime(time);
                return score;
            }
        }
    }
    

    输出结果
    比赛开始
    张三100米准备就绪...
    王五100米准备就绪...
    李四100米准备就绪...
    李四100米完成,1000米准备就绪...
    王五100米完成,1000米准备就绪...
    张三100米完成,1000米准备就绪...
    100米跑步结果
    李四 : 1秒
    王五 : 1秒
    张三 : 3秒
    王五1000米完成,5000米准备就绪...
    张三1000米完成,5000米准备就绪...
    李四1000米完成,5000米准备就绪...
    100米跑步结果
    李四 : 1秒
    王五 : 1秒
    张三 : 3秒
    1000米跑步结果
    王五 : 4秒
    张三 : 5秒
    李四 : 9秒
    李四5000米完成
    王五5000米完成
    张三5000米完成
    100米跑步结果
    李四 : 1秒
    王五 : 1秒
    张三 : 3秒
    5000米跑步结果
    李四 : 11秒
    王五 : 11秒
    张三 : 18秒
    1000米跑步结果
    王五 : 4秒
    张三 : 5秒
    李四 : 9秒
    比赛结束

    结论
    • CyclicBarrier可以循环使用
    • 三个线程全部达到达栅栏(即调用wait())方法后才会继续执行,否则一直阻塞等待其他线程
    • 每次全部达到栅栏后都会执行ScoreRunnable线程

    相关文章

      网友评论

          本文标题:线程并发工具之CyclicBarrier

          本文链接:https://www.haomeiwen.com/subject/wcwbphtx.html