CyclicBarrier

作者: 得力小泡泡 | 来源:发表于2021-01-14 18:55 被阅读0次

    1、CyclicBarrier使用场景:

    先来描述一下它的使用场景:有若干个线程,比如说有五个线程,需要它们都到达了某一个点之后才能开始一起执行,也就是说假如其中只有四个线程到达了这个点,还差一个线程没到达,此时这四个线程都会进入等待状态,直到第五个线程也到达了这个点之后,这五个线程才开始一起进行执行状态,是不是这个场景的描述跟CountDownLatch很类似的,下面用一个简单的示例图来感受一下它们两者的区别:

    CountDownLatch使用场景图
    image.png
    CyclicBarrier使用场景图
    image.png

    所有子线程都已经到达屏障之后,此时屏障就会消失,所有子线程继续执行,若存子线程尚未到达屏障,其他到达了屏障的线程都会进行等待

    2、官方文档说明

    它是一个同步的工具,能够允许一组线程去互相等待直到都到达了屏障,CyclicBarrier对于涉及到固定大小的线程是非常有用的,线程们必须相互等待。该屏障称之为循环屏障,是因为当等待屏障的线程被释放之后,该屏障能循环使用

    3、关于CyclicBarrier的底层执行流程总结:

    • 1、初始化CyclicBarrier中的各种成员变量,包括parties、count以及Runnable(可选);

    • 2、当调用await()方法时,底层会先检查计数器是否已经归零,如果是的话,那么就首先执行可选的Runnable,接下来开始下一个generation;(注意:这里只是调用Runnable的run()方法,并不是调用start()方法开启另一个线程)

    • 3、在下一个分代中,将会重置count值为parties,并且创建新的Generation实例;

    • 4、同时会调用Condition的singalAll方法,唤醒所有在屏障前面等待的线程,让其开始继续执行;(注意:当有可选的Runnable时,是执行完run()方法中的汇总操作,其他线程才会继续执行)

    • 5、如果计数器没有归零,那么当前的调用线程将会通过Condition的await方法,在屏障前进行等待;

    • 6、以上所有执行流程均在lock锁的控制范围内,不会出现并发情况。

    • 7、在下一个分代时,该屏障又可以继续使用,例如计数器是3,线程1,线程2和线程3冲破了当前屏障后,下一个分代的屏障可以去给线程4,线程5和线程6使用,也可以又给线程1,线程2和线程3使用(自己总结的)

    4、典型事例

    1、当没有可选的Runnable时

    当所有线程到达屏障时,不需要进行汇总,最后一个线程到达时,屏障消除,所有线程继续执行

    image.png
    package com.concurrency2;
    
    import java.util.Random;
    import java.util.concurrent.CyclicBarrier;
    
    public class MyTest1 {
        public static void main(String[] args) {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
            for(int i = 0;i < 3;i ++) {
                new Thread(() -> {
                    try {
                        Thread.sleep((long)(Math.random() * 2000));
    
                        int randomInt = new Random().nextInt(500);
                        System.out.println("hello " + randomInt);
    
                        cyclicBarrier.await();
    
                        System.out.println("world " + randomInt);
    
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    }
    

    输出

    hello 30
    hello 471
    hello 343
    world 343
    world 471
    world 30
    
    2、当有可选的Runnable时

    当所有线程到达屏障时,需要进行汇总操作,等汇总操作进行完,屏障消除,所有线程继续执行

    image.png
    package com.concurrency2;
    
    import java.util.Random;
    import java.util.concurrent.CyclicBarrier;
    
    public class MyTest1 {
        public static void main(String[] args) {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
                System.out.println("汇总1 ...");
    
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                System.out.println("汇总2 ...");
            });
           //for(int u = 0, u < 2;u ++)//开两次屏障使用
            for(int i = 0;i < 3;i ++) {
                new Thread(() -> {
                    try {
                        Thread.sleep((long)(Math.random() * 2000));
    
                        int randomInt = new Random().nextInt(500);
                        System.out.println("hello " + randomInt);
    
                        cyclicBarrier.await();
    
                        System.out.println("world " + randomInt);
    
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    }
    

    输出

    hello 4
    hello 229
    hello 73
    汇总1 ...
    汇总2 ...
    world 73
    world 229
    world 4
    

    5、CyclicBarrier源代码分析

    前面讲过CountDownLatch是基于AQS实现的;而CyclicBarrier是基于ReentrantLock重入锁实现的,当然ReentrantLock也是基于AQS实现的,非要说CyclicBarrier也是基于AQS实现的也不为过。

    1、重要成员变量
        / /可以理解为初始化时 需要阻塞的任务个数
        private final int parties;
        / /剩余需要等待的任务个数,初始值为parties,直到为0时依次唤醒所有被阻塞的任务线程。
        private int count;
     
        / /每次对“栅栏”的主要成员变量进行变更操作,都应该加锁
        private final ReentrantLock lock = new ReentrantLock();
        / /用于阻塞和唤醒任务线程
       private final Condition trip = lock.newCondition();
     
        / /在所有线程被唤醒前,需要执行的一个Runable对应的run方法
        private final Runnable barrierCommand;
        / /用于表示“栅栏”当前的状态
        private Generation generation = new Generation();
    
    2、构造方法

    CyclicBarrier有两个重载的构造方法,一个是不带Runnable参数,另一个带有Runnable参数。本质上都会调用带Runnable参数的构造方法进行实例化,这里只贴出带Runnable参数的构造方法实现:

    public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;    / /为了实现复用,进行备份
            this.count = parties;   / /初始化,待阻塞的任务总数
            this.barrierCommand = barrierAction;   / /初始化
        }
    
    3、核心方法
    await()方法有两层含义:

    1、先检查前面是否已经有count个线程了,如果没有线程则会进入等待状态
    2、当检测到屏障已经有count个线程了,则所有线程会冲出屏障继续执行(如果有Runnable参数的构造方法先执行汇总方法)

    int index = --count;操作很明显不是原子性的,如果在多线程中不加lock肯定会出问题

    private void nextGeneration() {
            // signal completion of last generation
            trip.signalAll();
            // set up next generation
            count = parties;
            generation = new Generation();
        }
    
    public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
     
    private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock(); //加锁
            try {
                final Generation g = generation;
     
                if (g.broken)
                    throw new BrokenBarrierException();
                //有一个线程线程被中断,整个CyclicBarrier将不可用
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
     
                int index = --count; //待等待的任务数减1
                if (index == 0) {  // 如果待等待的任务数减至0,依次唤醒所有线程
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();//唤醒前先执行Runnable对象的run方法
                        ranAction = true;
                        nextGeneration();//重置整个CyclicBarrier,方便下次重用
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
     
                //如果待等待的任务数大于0,进行线程阻塞,直到count为0时被唤醒
                for (;;) {
                    try {
                        if (!timed)
                            trip.await();//阻塞当前线程
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);//延时阻塞当前线程
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
     
                    if (g.broken)//异常唤醒
                        throw new BrokenBarrierException();
     
                    if (g != generation)//正常被唤醒,generation会被新建
                        return index;
     
                    if (timed && nanos <= 0L) {//延迟阻塞时间到后唤醒
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
    }
    

    相关文章

      网友评论

        本文标题:CyclicBarrier

        本文链接:https://www.haomeiwen.com/subject/amdvaktx.html