美文网首页后端开发进阶之路技术干货Java学习笔记
Java 中的 CyclicBarrier 多线程同步机制使用方

Java 中的 CyclicBarrier 多线程同步机制使用方

作者: 程序之心 | 来源:发表于2017-08-21 09:39 被阅读56次

    CyclicBarrier 是一种同步机制,它能够对处理一些算法的线程实现同步。换句话讲,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这里,然后所有线程才可以继续做其他事情。

    回顾

    在下图的流程中,线程1和线程2都到达第一个栅栏后才能够继续运行。如果线程1先到线程2后到,则线程1需要等待线程2到达栅栏处,然后两个线程才能继续运行。

    使用方法

    CyclicBarrier 提供了两个构造函数,可以传入线程个数和所有线程都到达栅栏后执行的操作。使用时,先创建 CyclicBarrier 对象,然后在需要等待的地方调用 await 方法即可。await 方法会等待所有线程到达才返回:当所有线程都到达时返回当前线程到达的次序;如果等待过程中出现超时,第一个到达的线程会收到超时异常 TimeoutException,同时其他线程被 broken 并抛出 BrokenBarrierException 异常。相关方法定义如下。

    //构造函数,提供线程数和操作
    public CyclicBarrier(int parties, Runnable barrierAction)
    //构造函数,仅提供线程数
    public CyclicBarrier(int parties)
    //返回需要等待的线程数
    public int getParties()
    //一直等待其他线程到达
    public int await() throws InterruptedException, BrokenBarrierException 
    //等待其他线程到达,并指定超时时间
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException 
    //查看是否broken
    public boolean isBroken() 
    //复位barrier
    public void reset() 
    //返回正在barrier处等待的线程数
    public int getNumberWaiting()
    

    使用示例

    如下的示例,演示了如何使用 CyclicBarrier 实现线程同步。CyclicBarrierTest 测试类定义了线程数 threadNums 和 CyclicBarrier 实例 barrier,在构造函数中创建了 CyclicBarrier 实例,当所有线程都到达栅栏处时打印到达提示, 在 test 方法中创建线程,线程启动后调用 await 方法等待,并打印当前线程是第几个到达栅栏。

    public class CyclicBarrierTest {
        private int threadNums;
        private CyclicBarrier barrier;
        
        public CyclicBarrierTest(int threadNums){
            this.threadNums = threadNums;
            barrier = new CyclicBarrier(threadNums, () -> println("All " 
                + threadNums + " threads reached barrier"));
        }
        
        private void println(String msg){
            SimpleDateFormat sdf = new SimpleDateFormat("[YYYY-MM-dd HH:mm:ss:SSS] ");
            System.out.println(sdf.format(new Date()) + msg);
        }
        
        public void test(){
            for(int i = 0; i < threadNums; i ++){
                new Thread(() -> {
                    println(Thread.currentThread().getName() + " start!");
                    try {
                        int index = threadNums - barrier.await();
                        println(Thread.currentThread().getName() 
                            + " arrive "+ index);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    }
    

    以启动 6 个线程为例,执行结果如下。6 个线程几乎同时启动,1 毫秒后同时到达栅栏并打印出 “All 6 threads reached barrier”,6 个线程到达的次序是 Thread-5、Thread-4、Thread-0、Thread-3、Thread-1、Thread-2。

    [2017-08-20 12:15:54:053] Thread-5 start!
    [2017-08-20 12:15:54:053] Thread-4 start!
    [2017-08-20 12:15:54:053] Thread-0 start!
    [2017-08-20 12:15:54:053] Thread-3 start!
    [2017-08-20 12:15:54:053] Thread-1 start!
    [2017-08-20 12:15:54:053] Thread-2 start!
    [2017-08-20 12:15:54:054] All 6 threads reached barrier
    [2017-08-20 12:15:54:055] Thread-2 arrive 6
    [2017-08-20 12:15:54:055] Thread-5 arrive 1
    [2017-08-20 12:15:54:055] Thread-4 arrive 2
    [2017-08-20 12:15:54:055] Thread-3 arrive 4
    [2017-08-20 12:15:54:055] Thread-0 arrive 3
    [2017-08-20 12:15:54:055] Thread-1 arrive 5
    

    如下的示例增加了 test2 方法,来测试 CyclicBarrier 的其他方法。各个线程的创建时间差了 300 ms,创建线程时打印出此时栅栏处正在等待的线程个数,await 等待的时间为 1s 。当 CyclicBarrier 等待超时时,超时的线程打印超时提示和总的线程数,被 broken 的线程打印 broken 提示。

    public class CyclicBarrierTest {
        private int threadNums;
        private CyclicBarrier barrier;
        
        public CyclicBarrierTest(int threadNums){
            this.threadNums = threadNums;
            barrier = new CyclicBarrier(threadNums, () -> 
                println("All " + threadNums + " threads reached barrier"));
        }
        
        private void println(String msg){
            SimpleDateFormat sdf = new SimpleDateFormat("[YYYY-MM-dd HH:mm:ss:SSS] ");
            System.out.println(sdf.format(new Date()) + msg);
        }
        
        public void test2() throws Exception{
            for(int i = 0; i < threadNums; i ++){
                new Thread(() -> {
                    println(Thread.currentThread().getName() + " start with " 
                        + barrier.getNumberWaiting() + " threads waiting");
                        try {
                            barrier.await(1, TimeUnit.SECONDS);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (BrokenBarrierException e) {
                            println(Thread.currentThread().getName() + " broken "
                                + barrier.isBroken());
                        } catch (TimeoutException e) {
                            println(Thread.currentThread().getName() 
                                + " time out to wait " + barrier.getParties()
                                + " threds");
                        }
                }).start();
                Thread.sleep(300);
            }
        }
    }
    

    从理论上分析,按照 300 ms 间隔创建线程,超时时间为 1s ,则在第 4 个线程创建并等待时发生超时并 broken 其他线程,创建第 5 个线程时已经没有线程等待。

    以 6 个线程为例,执行结果如下。Thread-0、Thread-1、Thread-2 依次启动并等待,此时并没有超时一切正常。Thread-3 启动后,时间已经达到 1s,此时 Thread-0 等待超时打印超时提示,Thread-1、Thread-2、Thread-3 同时收到 break 消息进入 broken 状态并打印 broken 提示。在此之后,Thread-4、Thread-5 创建并启动,已经没有线程在栅栏处等待,直接进入 broken 状态并打印 broken 提示。

    [2017-08-20 12:41:26:610] Thread-0 start with 0 threads waiting
    [2017-08-20 12:41:26:887] Thread-1 start with 1 threads waiting
    [2017-08-20 12:41:27:189] Thread-2 start with 2 threads waiting
    [2017-08-20 12:41:27:492] Thread-3 start with 3 threads waiting
    [2017-08-20 12:41:27:613] Thread-0 time out to wait 6 threds
    [2017-08-20 12:41:27:613] Thread-2 broken true
    [2017-08-20 12:41:27:613] Thread-3 broken true
    [2017-08-20 12:41:27:613] Thread-1 broken true
    [2017-08-20 12:41:27:794] Thread-4 start with 0 threads waiting
    [2017-08-20 12:41:27:794] Thread-4 broken true
    [2017-08-20 12:41:28:099] Thread-5 start with 0 threads waiting
    [2017-08-20 12:41:28:100] Thread-5 broken true
    

    分享学习笔记和技术总结,内容涉及 Java 进阶、虚拟机、MySQL、NoSQL、分布式计算、开源框架等多个领域,欢迎关注作者。

    image.png

    相关文章

      网友评论

        本文标题:Java 中的 CyclicBarrier 多线程同步机制使用方

        本文链接:https://www.haomeiwen.com/subject/knsfdxtx.html