美文网首页Java 并发程序员
【Java 并发笔记】CyclicBarrier 相关整理

【Java 并发笔记】CyclicBarrier 相关整理

作者: 58bc06151329 | 来源:发表于2019-01-18 17:05 被阅读0次

    文前说明

    作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

    本文仅供学习交流使用,侵权必删。
    不用于商业目的,转载请注明出处。

    1. 简介

    • CyclicBarrier(循环屏障/栅栏) 类似于 CountDownLatch(闭锁),它能阻塞一组线程直到某个事件的发生。
      • 与闭锁的关键区别在于,所有的线程必须同时到达屏障位置,才能继续执行。
      • 闭锁用于等待事件,而屏障用于等待其他线程。
      • CyclicBarrier 可以使一定数量的线程反复地在屏障位置处汇集。当线程到达屏障位置时将调用 await() 方法,这个方法将阻塞直到所有线程都到达屏障位置。如果所有线程都到达屏障位置,那么屏障将打开,此时所有的线程都将被释放,而屏障将被重置以便下次使用。
    • CyclicBarrier 是 JDK 1.5 的 java.util.concurrent 并发包中提供的一个并发工具类。
      • 所谓 Cyclic 即循环的意思,所谓 Barrier 即屏障的意思。
      • CyclicBarrier 是一个同步辅助类,它允许一组线程相互等待直到所有线程都到达一个公共的屏障点。
      • 在程序中有固定数量的线程,这些线程有时候必须等待彼此,这种情况下,使用 CyclicBarrier 很有帮助。
      • 这个屏障之所以用循环修饰,是因为在所有的线程释放彼此之后,这个屏障是可以 重新使用 的。

    1.1 CyclicBarrier 的应用场景

    • CyclicBarrier 常用于多线程分组计算。
      • 比如一个大型的任务,常常需要分配好多子任务去执行,只有当所有子任务都执行完成时候,才能执行主任务,这时候,就可以选择 CyclicBarrier。

    1.2 CyclicBarrier 方法说明

    CyclicBarrier(parties) 方法

    • 初始化相互等待的线程数量的构造方法。

    CyclicBarrier(parties,Runnable barrierAction) 方法

    • 初始化相互等待的线程数量以及屏障线程的构造方法。
    • 屏障线程的运行时机:等待的线程数量 =parties 之后,CyclicBarrier 打开屏障之前。
      • 例如在分组计算中,每个线程负责一部分计算,最终这些线程计算结束之后,交由屏障线程进行汇总计算。

    getParties 方法

    • 获取 CyclicBarrier 打开屏障的线程数量。

    getNumberWaiting 方法

    • 获取正在 CyclicBarrier 上等待的线程数量。

    await 方法

    • 在 CyclicBarrier 上进行阻塞等待,直到发生以下情形之一。
      • 在 CyclicBarrier 上等待的线程数量达到 parties,则所有线程被释放,继续执行。
      • 当前线程被中断,则抛出 InterruptedException 异常,并停止等待,继续执行。
      • 其他等待的线程被中断,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
      • 其他等待的线程超时,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
      • 其他线程调用 CyclicBarrier.reset() 方法,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
    • 线程调用 await() 表示自己已经到达栅栏。
    • BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程
      await() 时被中断或者超时。

    await(timeout,TimeUnit) 方法

    • 在 CyclicBarrier 上进行限时的阻塞等待,直到发生以下情形之一。
      • 在 CyclicBarrier 上等待的线程数量达到 parties,则所有线程被释放,继续执行。
      • 当前线程被中断,则抛出 InterruptedException 异常,并停止等待,继续执行。
      • 当前线程等待超时,则抛出 TimeoutException 异常,并停止等待,继续执行。
      • 其他等待的线程被中断,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
      • 其他等待的线程超时,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
      • 其他线程调用 CyclicBarrier.reset() 方法,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。

    isBroken 方法

    • 获取是否破损标志位 broken 的值,此值有以下几种情况。
      • CyclicBarrier 初始化时,broken=false,表示屏障未破损。
      • 如果正在等待的线程被中断,则 broken=true,表示屏障破损。
      • 如果正在等待的线程超时,则 broken=true,表示屏障破损。
      • 如果有线程调用 CyclicBarrier.reset() 方法,则 broken=false,表示屏障回到未破损状态。

    reset 方法

    • 使 CyclicBarrier 回归初始状态,它做了两件事。
      • 如果有正在等待的线程,则会抛出 BrokenBarrierException 异常,且这些线程停止等待,继续执行。
      • 将是否破损标志位 broken 置为 false。

    1.3 CyclicBarrier 和 CountDownLatch 的区别

    • CountDownLatch 是一个线程(或者多个),等待另外 N 个线程完成某个事情之后才能执行;CyclicBarrier 是 N 个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。
    • CountDownLatch 的计数器只能使用一次。而 CyclicBarrier 的计数器可以使用 reset() 方法重置;CyclicBarrier 能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
    • CountDownLatch 采用减计数方式;CyclicBarrier 采用加计数方式。

    2. CyclicBarrier 原理

    • CyclicBarrier 内部使用了 ReentrantLock 和 Condition 两个类。

    属性信息

    //用于保护屏障入口的锁
    private final ReentrantLock lock = new ReentrantLock();
    //线程等待条件
    private final Condition trip = lock.newCondition();
    //记录参与等待的线程数
    private final int parties;
    //当所有线程到达屏障点之后,首先执行的命令
    private final Runnable barrierCommand;
    private Generation generation = new Generation();
    //实际中仍在等待的线程数,每当有一个线程到达屏障点,count值就会减一;当一次新的运算开始后,count的值被重置为parties
    private int count;
    

    构造函数

    • CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程使用 await() 方法告诉 CyclicBarrier 已经到达了屏障,然后当前线程被阻塞。
    • CyclicBarrier 的另一个构造函数 CyclicBarrier(int parties, Runnable barrierAction),用于线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景。
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
     
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    

    await 方法

    • 调用 await() 方法的线程告诉 CyclicBarrier 自己已经到达屏障,然后当前线程被阻塞。直到 parties 个参与线程调用了 await() 方法,CyclicBarrier 同样提供带超时时间的 await() 和不带超时时间的 await() 方法。
    /**
    * 线程持续等待直到此barrier上的所有线程都调用了await()方法.
    *
    * 如果当前线程并不是到达的最后一个线程,则它被禁用线程调度目的,并且处于休眠状态,直到发生以下事件之一:
    * 1.最后一个线程到达;
    * 2.其他线程中断了当前线程.
    * 3.其它线程中断了其它等待的线程.
    * 4.在barrier上面等待的线程发生超时.
    * 5.其它线程调用了barrier上面的reset方法.
    *
    * 如果当前线程:
    * 1.在进入这一方法时,中断状态位被标记.
    * 2.在等待过程中被中断
    * 则会抛出中断异常InterruptedException,且当前线程的中断状态被清除.
    *
    * 会抛出BrokenBarrierException异常的情况有:
    * 1.当其它线程在等待时,如果barrier被reset;
    * 2.当调用await()方法时barrier发生了broken
    *
    * 任意等待线程发生了中断异常时,其它等待线程都会抛出BrokenBarrierException,且barrier的状态会变为broken.
    *
    * 如果当前线程是最后一个到达barrier的线程,且构造函数中的barrier action非null,则在其它线程可以继续执行前,当前线程会执行
    * barrier action.
    * 如果在barrier action的执行过程中发生了异常,则该异常会对当前线程产生影响,且barrier的会处于broken状态.
    *
    * @return 当前线程到达索引,第一个到达的索引值为:getParties() - 1;
    *         最后一个到达的索引值为:0
    */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            // 不超时等待
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
                BrokenBarrierException,
                TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
    
    • 两个方法最终都会调用 dowait(boolean, long) 方法,它是 CyclicBarrier 的核心方法。
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
                TimeoutException {
        // 获取独占锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 当前代
            final Generation g = generation;
            // 如果这代损坏了,抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
     
            // 如果线程中断了,抛出异常
            if (Thread.interrupted()) {
                // 将损坏状态设置为true
                // 并通知其他阻塞在此栅栏上的线程
                breakBarrier();
                throw new InterruptedException();
            }
     
            // 获取下标
            int index = --count;
            // 如果是 0,说明最后一个线程调用了该方法
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    // 执行栅栏任务
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 更新一代,将count重置,将generation重置
                    // 唤醒之前等待的线程
                    nextGeneration();
                    return 0;
                } finally {
                    // 如果执行栅栏任务的时候失败了,就将损坏状态设置为true
                    if (!ranAction)
                        breakBarrier();
                }
            }
     
            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                     // 如果没有时间限制,则直接等待,直到被唤醒
                    if (!timed)
                        trip.await();
                    // 如果有时间限制,则等待指定时间
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    // 当前代没有损坏
                    if (g == generation && ! g.broken) {
                        // 让栅栏失效
                        breakBarrier();
                        throw ie;
                    } else {
                        // 上面条件不满足,说明这个线程不是这代的
                        // 就不会影响当前这代栅栏的执行,所以,就打个中断标记
                        Thread.currentThread().interrupt();
                    }
                }
     
                // 当有任何一个线程中断了,就会调用breakBarrier方法
                // 就会唤醒其他的线程,其他线程醒来后,也要抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();
     
                // g != generation表示正常换代了,返回当前线程所在栅栏的下标
                // 如果 g == generation,说明还没有换代,那为什么会醒了?
                // 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。
                // 正是因为这个原因,才需要generation来保证正确。
                if (g != generation)
                    return index;
                
                // 如果有时间限制,且时间小于等于0,销毁栅栏并抛出异常
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            // 释放独占锁
            lock.unlock();
        }
    }
    
    • 如果该线程不是最后一个调用 await() 方法的线程,则它会一直处于等待状态,除非发生以下情况。
      • 最后一个线程到达,即 index == 0。
      • 某个参与线程等待超时。
      • 某个参与线程被中断。
      • 调用了 CyclicBarrier 的 reset() 方法,将屏障重置为初始状态。

    BrokenBarrierException 异常

    • 如果一个线程处于等待状态时,如果其他线程调用 reset(),或者调用的 barrier 原本就是被损坏的,则抛出 BrokenBarrierException 异常。
    • 任何线程在等待时被中断了,则其他所有线程都将抛出 BrokenBarrierException 异常,并将 barrier 置于损坏状态。

    Generation 对象

    • Generation 描述着 CyclicBarrier 的更新换代。
      • 在 CyclicBarrier 中,同一批线程属于同一代。
      • 当有 parties 个线程到达 barrier 之后,Generation 就会被更新换代。
      • 其中 broken 标识当前 CyclicBarrier 是否已经处于中断状态。
    /**
    * barrier每一次使用都代表了一个generation实例.
    * 当barrier发生trip或者reset时,对应的generation会发生改变.
    * 由于非确定性,锁可能会分配给等待线程,因此可能会存在许多和使用barrier的线程相关的generation.
    * 但是每次只能激活这些线程中的一个(使用计数的那个),并且其他的线程要么broken要么trip.
    * 如果出现了一个暂停,但并未reset,则不需要一个激活的generation.
    */
    private static class Generation {
        boolean broken = false;
    }
    
    • 默认 barrier 是没有损坏的。
      • 当 barrier 损坏了或者有一个线程中断了,则通过 breakBarrier() 来终止所有的线程。
      • breakBarrier() 中除了将 broken 设置为 true,还会调用 signalAll 将在 CyclicBarrier 处于等待状态的线程全部唤醒。
    private void breakBarrier() {
        // 设置状态
        generation.broken = true;
        // 恢复正在等待进入屏障的线程数量
        count = parties;
        // 唤醒所有线程
        trip.signalAll();
    }
    
    • 当所有线程都已经到达 barrier 处(index == 0),则会通过 nextGeneration() 进行更新换代操作,在这个步骤中唤醒了所有线程,重置了 count 和 generation。
    //当barrier发生trip时,用于更新状态并唤醒每一个线程.
    //这一方法只在持有lock时被调用.
    private void nextGeneration() {
        // signal completion of last generation
        // 唤醒所有线程
        trip.signalAll();
        // set up next generation
        // 恢复正在等待进入屏障的线程数量
        count = parties;
        // 新生一代
        generation = new Generation();
    }
    

    reset 方法

    • 需要先打破当前屏蔽,然后再重建一个新的屏蔽,否则可能会导致信号丢失。
    /**
    * 将barrier状态重置.如果此时有线程在barrier处等待,它们会抛出BrokenBarrierException并返回.
    * 注意:请注意,由于其他原因发生broken后重置可能会很复杂;线程需要通过一些方式来 完成同步,并选择一种方式完成reset.
    * 相对为后续的使用重建一个barrier,此重置操作更受欢迎.
    * 注意:这是一个需要加锁的操作.
    */
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
    

    isBroken 方法

    • 判断此屏障是否处于中断状态。
      • 如果因为构造或最后一次重置而导致中断或超时,从而使一个或多个参与者摆脱此屏障点,或者因为异常而导致某个屏障操作失败,则返回 true。否则返回 false。
    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }
    

    getNumberWaiting 方法

    //返回当前在屏障处等待的参与者数目,此方法主要用于调试和断言。
    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
    

    2.1 程序示例

    • 从程序的执行结果中也可以看出,所有的工作线程都运行 await() 方法之后都到达了屏障/栅栏位置,然后,3 个工作线程才开始执行业务处理。
    public class CyclicBarrierTest {
        // 自定义工作线程
        private static class Worker extends Thread {
            private CyclicBarrier cyclicBarrier;
            
            public Worker(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier = cyclicBarrier;
            }
            
            @Override
            public void run() {
                super.run();
                
                try {
                    System.out.println(Thread.currentThread().getName() + "开始等待其他线程");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName() + "开始执行");
                    // 工作线程开始处理,这里用Thread.sleep()来模拟业务处理
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + "执行完毕");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
     
        public static void main(String[] args) {
            int threadCount = 3;
            CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
            
            for (int i = 0; i < threadCount; i++) {
                System.out.println("创建工作线程" + i);
                Worker worker = new Worker(cyclicBarrier);
                worker.start();
            }
        }
    }
    /**
    创建工作线程0
    创建工作线程1
    Thread-0开始等待其他线程
    创建工作线程2
    Thread-1开始等待其他线程
    Thread-2开始等待其他线程
    Thread-2开始执行
    Thread-0开始执行
    Thread-1开始执行
    Thread-1执行完毕
    Thread-0执行完毕
    Thread-2执行完毕
    */
    

    3. 使用 CyclicBarrier 的注意事项

    • CyclicBarrier 使用独占锁来执行 await() 方法,并发性可能不是很高。
    • 如果在等待过程中,线程被中断了,就抛出异常。
      • 但如果中断的线程所对应的 CyclicBarrier 不是这一代,比如在最后一次线程执行 signalAll 后,并且更新了这个 " 代 " 对象。在这个区间,这个线程被中断了,那么, JDK 认为任务已经完成,不必在乎中断,就只打了一个中断 interrupt() 标记。
    • 如果线程被其他的 CyclicBarrier 唤醒,那么 g 肯定等于 generation,这个事件就不能 return 了,而是继续循环阻塞。
      • 反之,如果是当前 CyclicBarrier 唤醒,就返回线程在 CyclicBarrier 的下标,表示完成了一次冲过屏障的过程。
    • CyclicBarrier 的 await() 方法是使用 ReentrantLock 和 Condition 控制实现的。
      • 当调用 CyclicBarrier 的 await() 方法会间接调用 ConditionObject 的 await() 方法,会向 Condition 的等待队列中加入元素,当屏障关闭后首先执行指定的barrierAction(),然后依次执行等待队列中的任务,有先后顺序。
    • CyclicBarrier 类中加锁的方法有 dowait()isBroken()reset()getNumberWaiting()

    4. 总结

    • CyclicBarrier 的用途是让一组线程互相等待,直到全部到达某个公共屏障点才开始继续工作。
    • CyclicBarrier 是可以重复利用的。
    • 在等待的只要有一个线程发生中断,则其它线程就会被唤醒继续正常运行。
    • CyclicBarrier 指定的任务是进行 barrier 处最后一个线程来调用的,如果在执行这个任务发生异常时,则会传播到此线程,其它线程不受影响继续正常运行。

    参考资料

    https://blog.csdn.net/qq_38293564/article/details/80558157

    相关文章

      网友评论

        本文标题:【Java 并发笔记】CyclicBarrier 相关整理

        本文链接:https://www.haomeiwen.com/subject/fydndqtx.html