美文网首页
深入理解 CyclicBarrier

深入理解 CyclicBarrier

作者: 天还下着毛毛雨 | 来源:发表于2022-03-21 16:49 被阅读0次
    image

    1. 作用

    CyclicBarrier 可以 让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会释放。

    注意,被阻塞的线程 ,不是同时 被释放的。但时间间隔几乎 可以说没有。

    2. 使用方式

    2.1 代码示例

    public static void main(String[] args) {
        // 到达屏障的线程数量 : 3
        CyclicBarrier cb = new CyclicBarrier(3);
        for (int i = 0; i < 3; i++) {
            int finalI = i;
            new Thread(()->{
                try {
                    // 最后一个线程 休眠5秒, 看看所有到达屏障的线程 也是休眠5秒之后再执行。
                    if(finalI % 3 == 2){
                        Thread.sleep(5000);
                    }
                    // 阻塞在这里(也可以理解为到达屏障),直到 到达屏障的线程数量 = 3才会被释放。
                    cb.await();
                    System.out.println(Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        System.out.println("主线程执行完毕........");
    }
    

    最终执行结果, 肯定是最后 到达屏障的 最先执行,其次才是之前到达屏障阻塞的所有线程。其他线程被释放的顺序 也不一定是 按照 await()方法的调用顺序来定的。稍后源码具体分析一下。

    image

    2.2 api

    2.2.1. 构造方法

    一个有参构造,一个无参构造,有参构造相对于无参构造来说 多传入了一个Runnable实例, 当一个周期的屏障被 释放后, 可以执行 Runnable实例的 run方法逻辑。

    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    

    2.2.2. await()

    使线程阻塞。

    还有一个带超时时间的 await(long timeout, TimeUnit unit), 阻塞超过一定时间 就会抛出TimeoutException异常。

    3. 源码分析

    3.1 大致原理

    CyclicBarrier 是利用 ReentrantLock锁 和 ReentrantLock里的条件队列 来实现 所有线程 在屏障处 阻塞的效果的。

    dowait 大体逻辑(省略 一些判断的代码)

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock(); // q1 加锁
        try {
            // 判断count(需要到达屏障 的线程数) q2
            int index = --count;
            if (index == 0) {
                // 这个就是 有参构造传进来的第二个 参数 Runable 实例
                final Runnable command = barrierCommand;
                if (command != null)
                // 由最后一个到达屏障的线程 同步调用
                   command.run();
                // 释放所有 q3
                nextGeneration();
                return 0;           
            }
            for (;;) {
                try {
                    // 释放,进入条件队列阻塞 q4
                    trip.await();
                    // 被唤醒后 执行的代码 q5
                    if (g != generation)
                       return index; 
                } 
            }
        } finally {
            // 释放锁 q6
            lock.unlock();
        }
    }
    

    源码执行流程

    假设 需要到达屏障 的线程数 count = 3.

    当第一个线程进入dowait方法,来抢锁, 抢锁成功(q1),count - 1 =2(q2),执行q4代码 :会调用 ReentrantLock里的Condition实例的await方法 释放锁(并不是ReentrantLock.unlock方法),在 初始化条件队列里,并休眠 。

    当第二个线程进入dowait方法,来抢锁, 抢锁成功(q1),count - 1 =1(q2) ,执行q4代码 :会调用 ReentrantLock里的Condition实例的await方法 释放锁(并不是ReentrantLock.unlock方法) ,会接在条件队列的队尾,并休眠。

    当第count个(最后一个)线程进入dowait方法, 来抢锁, 抢锁成功 (q1),count - 1 =0(q2),当count =0时 , 执行q3代码:会把条件队列里的所有线程节点, 移动到 等待ReentrantLock的 阻塞队列中去。(此时其他阻塞的线程还没有唤醒 )。并开启下一个周期。q3 return出去后,执行finally代码块 ,q6, unlock()会唤醒阻塞队列的第一个线程, 然后执行自己的业务代码。 第一个阻塞线程在 q5醒来后 在,最终return,然后执行 finally q6 去 唤醒下一个 线程, 然后执行自己的业务代码。下一个线程 也在 q5处被唤醒,执行的也是一样的逻辑,需要在解锁的时候 唤醒自己的下一个节点。 直到所有的线程 都被唤醒。

    问题

    现在解释下之前的问题

    1. 为什么 是到达屏障的最后一个线程 最先执行 ?

    从上面 的执行逻辑 可以得知, 前面的线程 都 到 条件队列里 阻塞住了,最后一个到达屏障的 线程 才 会先发起 唤醒阻塞线程的动作。

    2. 为什么 线程被释放的顺序 不是 按照 线程调用await()的时间(比较极限)?

    原因就在于 dowait 在一开始 是需要抢锁的, 这把锁作为CyclicBarrier的成员变量 在 实例化 CyclicBarrier的时候被初始化。

    image

    默认是把非公平锁。

    如下图,这是某个线程持有锁 ,还没 进入 条件队列休眠,释放锁的 区间

    image

    假设并发很高, 在这段代码里 就有线程b,c,d 因为 抢不到锁被阻塞,而进入 阻塞队列中等待。假设 当前持有锁的线程 a 这个时候刚刚 进条件队列释放锁,还没来得及去 唤醒 阻塞队列里的线程来抢锁, 或者唤醒了 ,还在抢的路上。

    这个时候又来一个线程 e 因为是非公平锁, 走到这直接抢锁,不看阻塞队列是否有线程在排队, 万一抢到锁了, 就跑去 条件队列里 休眠了。那么 最终 进入条件的队列 顺序 是 a,e,b,c,d 被唤醒的顺序 也是 a,e,b,c,d。

    所以说 线程 恢复工作的顺序 不是 按照调用 await的顺序,而是 按照抢到锁的时间,由于是把非公平锁,所以在 并发很高的 情况下, 会 有这种 情况发生。

    3. 被阻塞的线程 ,为什么不是同时 被释放的?

    因为 唤醒 是 上一个线程调用unlock方法 去唤醒下一个线程的,是一个接着一个来的,所以不是同时 被释放。

    相关文章

      网友评论

          本文标题:深入理解 CyclicBarrier

          本文链接:https://www.haomeiwen.com/subject/pkjtjrtx.html