美文网首页Java干货分享
Java中同步器CyclicBarrier

Java中同步器CyclicBarrier

作者: 淡定_蜗牛 | 来源:发表于2019-03-01 13:34 被阅读0次

    1.简介

    CyclicBarriers是Java 5作为java.util.concurrent包的一部分引入的同步构造。

    在本文中,我们将在并发方案中探索此实现。

    2. Java并发 - 同步器

    该的java.util.concurrent包中包含几类,以帮助管理的一组互相协作的线程。其中一些包括:

    • CyclicBarrier
    • Phaser
    • CountDownLatch
    • Exchanger
    • Semaphore
    • SynchronousQueue

    这些类为线程之间的常见交互模式提供了开箱即用的功能。如果我们有一组线程相互通信并遵循一个或多个更常见的线程

    如果我们有一组相互通信并且类似于常见模式之一的线程,我们可以简单地重用相应的库类(也称为Synchronizers),而不是尝试使用一组锁和条件对象和synchronized关键字来提出自定义方案。。

    3. CyclicBarrier

    CyclicBarrier是一个同步器,它允许一组线程等待彼此达成共同执行点,也被称为barrier。

    CyclicBarriers用于程序中,在这些程序中我们有固定数量的线程,在继续执行之前必须等待彼此到达公共点。

    barrier称为循环,因为它可以在等待线程释放后重新使用。

    4.用法

    CyclicBarrier的构造函数很简单。它需要一个整数,表示需要在barrier实例上调用await()方法以表示到达公共执行点的线程数:

    public CyclicBarrier(int parties)
    

    需要同步其执行的线程也称为parties ,并且调用await()方法是我们如何注册某个线程已达到障碍点。

    此调用是同步的,调用此方法的线程会暂停执行,直到指定数量的线程在屏障上调用相同的方法。所需线程数已调用await()的情况称为跳越障碍。

    或者,我们可以将第二个参数传递给构造函数,该构造函数是一个Runnable实例。这个逻辑将由跳过障碍的最后一个线程运行:

    public CyclicBarrier(int parties, Runnable barrierAction)
    

    5.实例化

    要查看CyclicBarrier的运行情况,让我们考虑以下情况:

    有一个操作是固定数量的线程执行并将相应的结果存储在列表中。当所有线程完成执行其操作时,其中一个(通常是跳过屏障的最后一个)开始处理每个线程获取的数据。

    让我们实现所有操作发生的主类:

    public class CyclicBarrierDemo {
    
        private CyclicBarrier cyclicBarrier;
        private List<List<Integer>> partialResults
         = Collections.synchronizedList(new ArrayList<>());
        private Random random = new Random();
        private int NUM_PARTIAL_RESULTS;
        private int NUM_WORKERS;
    
        // ...
    }
    

    这个类非常简单 - NUM_WORKERS是要执行的线程数,NUM_PARTIAL_RESULTS是每个工作线程将要生成的结果数。

    最后,我们有partialResults,它是一个列表,用于存储每个工作线程的结果。请注意,此列表是SynchronizedList,因为多个线程将同时写入它,并且add()方法在普通ArrayList上不是线程安全的。

    现在让我们实现每个工作线程的逻辑:

    public class CyclicBarrierDemo {
    
        // ...
    
        class NumberCruncherThread implements Runnable {
    
            @Override
            public void run() {
                String thisThreadName = Thread.currentThread().getName();
                List<Integer> partialResult = new ArrayList<>();
    
                // Crunch some numbers and store the partial result
                for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) {    
                    Integer num = random.nextInt(10);
                    System.out.println(thisThreadName
                      + ": Crunching some numbers! Final result - " + num);
                    partialResult.add(num);
                }
    
                partialResults.add(partialResult);
                try {
                    System.out.println(thisThreadName 
                      + " waiting for others to reach barrier.");
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    // ...
                } catch (BrokenBarrierException e) {
                    // ...
                }
            }
        }
    
    }
    

    我们现在将实现屏障跳闸时运行的逻辑。

    为了简单起见,我们只需添加部分结果列表中的所有数字:

    public class CyclicBarrierDemo {
    
        // ...
        
        class AggregatorThread implements Runnable {
    
            @Override
            public void run() {
    
                String thisThreadName = Thread.currentThread().getName();
    
                System.out.println(
                  thisThreadName + ": Computing sum of " + NUM_WORKERS 
                  + " workers, having " + NUM_PARTIAL_RESULTS + " results each.");
                int sum = 0;
    
                for (List<Integer> threadResult : partialResults) {
                    System.out.print("Adding ");
                    for (Integer partialResult : threadResult) {
                        System.out.print(partialResult+" ");
                        sum += partialResult;
                    }
                    System.out.println();
                }
                System.out.println(thisThreadName + ": Final result = " + sum);
            }
        }
    }
    

    最后一步是构造CyclicBarrier并使用main()方法启动:

    public class CyclicBarrierDemo {
    
        // Previous code
     
        public void runSimulation(int numWorkers, int numberOfPartialResults) {
            NUM_PARTIAL_RESULTS = numberOfPartialResults;
            NUM_WORKERS = numWorkers;
    
            cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread());
    
            System.out.println("Spawning " + NUM_WORKERS
              + " worker threads to compute "
              + NUM_PARTIAL_RESULTS + " partial results each");
     
            for (int i = 0; i < NUM_WORKERS; i++) {
                Thread worker = new Thread(new NumberCruncherThread());
                worker.setName("Thread " + i);
                worker.start();
            }
        }
    
        public static void main(String[] args) {
            CyclicBarrierDemo demo = new CyclicBarrierDemo();
            demo.runSimulation(5, 3);
        }
    }
    

    在上面的代码中,我们使用5个线程初始化循环屏障,每个线程产生3个整数作为其计算的一部分,并将其存储在结果列表中。

    一旦屏障被触发,跳过屏障的最后一个线程执行AggregatorThread中指定的逻辑,即 - 添加线程产生的所有数字。

    6.结果

    以下是上述程序执行的输出 - 每次执行都可能会产生不同的结果,因为线程可以以不同的顺序生成:

    Spawning 5 worker threads to compute 3 partial results each
    Thread 0: Crunching some numbers! Final result - 6
    Thread 0: Crunching some numbers! Final result - 2
    Thread 0: Crunching some numbers! Final result - 2
    Thread 0 waiting for others to reach barrier.
    Thread 1: Crunching some numbers! Final result - 2
    Thread 1: Crunching some numbers! Final result - 0
    Thread 1: Crunching some numbers! Final result - 5
    Thread 1 waiting for others to reach barrier.
    Thread 3: Crunching some numbers! Final result - 6
    Thread 3: Crunching some numbers! Final result - 4
    Thread 3: Crunching some numbers! Final result - 0
    Thread 3 waiting for others to reach barrier.
    Thread 2: Crunching some numbers! Final result - 1
    Thread 2: Crunching some numbers! Final result - 1
    Thread 2: Crunching some numbers! Final result - 0
    Thread 2 waiting for others to reach barrier.
    Thread 4: Crunching some numbers! Final result - 9
    Thread 4: Crunching some numbers! Final result - 3
    Thread 4: Crunching some numbers! Final result - 5
    Thread 4 waiting for others to reach barrier.
    Thread 4: Computing final sum of 5 workers, having 3 results each.
    Adding 6 2 2 
    Adding 2 0 5 
    Adding 6 4 0 
    Adding 1 1 0 
    Adding 9 3 5 
    Thread 4: Final result = 46
    

    如上面的输出所示,线程4是跳过屏障并且还执行最终聚合逻辑的线程。如上例所示,线程实际上也不必按照它们启动的顺序运行。

    7.结论

    在本文中,我们看到了CyclicBarrier是什么,以及它有什么样的情况。

    我们还实现了一个场景,在继续使用其他程序逻辑之前,我们需要固定数量的线程才能到达固定的执行点。

    image

    欢迎大家关注公众号:「Java知己」,关注公众号,回复「1024」你懂得,免费领取 30 本经典编程书籍。关注我,与 10 万程序员一起进步。 每天更新Java知识哦,期待你的到来!

    image

    相关文章

      网友评论

        本文标题:Java中同步器CyclicBarrier

        本文链接:https://www.haomeiwen.com/subject/zxtkuqtx.html