美文网首页Java 并发程序员
【Java 并发笔记】CyclicBarrier 相关整理

【Java 并发笔记】CyclicBarrier 相关整理

作者: 58bc06151329 | 来源:发表于2019-01-18 17:05 被阅读0次

文前说明

作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。

1. 简介

  • CyclicBarrier(循环屏障/栅栏) 类似于 CountDownLatch(闭锁),它能阻塞一组线程直到某个事件的发生。
    • 与闭锁的关键区别在于,所有的线程必须同时到达屏障位置,才能继续执行。
    • 闭锁用于等待事件,而屏障用于等待其他线程。
    • CyclicBarrier 可以使一定数量的线程反复地在屏障位置处汇集。当线程到达屏障位置时将调用 await() 方法,这个方法将阻塞直到所有线程都到达屏障位置。如果所有线程都到达屏障位置,那么屏障将打开,此时所有的线程都将被释放,而屏障将被重置以便下次使用。
  • CyclicBarrier 是 JDK 1.5 的 java.util.concurrent 并发包中提供的一个并发工具类。
    • 所谓 Cyclic 即循环的意思,所谓 Barrier 即屏障的意思。
    • CyclicBarrier 是一个同步辅助类,它允许一组线程相互等待直到所有线程都到达一个公共的屏障点。
    • 在程序中有固定数量的线程,这些线程有时候必须等待彼此,这种情况下,使用 CyclicBarrier 很有帮助。
    • 这个屏障之所以用循环修饰,是因为在所有的线程释放彼此之后,这个屏障是可以 重新使用 的。

1.1 CyclicBarrier 的应用场景

  • CyclicBarrier 常用于多线程分组计算。
    • 比如一个大型的任务,常常需要分配好多子任务去执行,只有当所有子任务都执行完成时候,才能执行主任务,这时候,就可以选择 CyclicBarrier。

1.2 CyclicBarrier 方法说明

CyclicBarrier(parties) 方法

  • 初始化相互等待的线程数量的构造方法。

CyclicBarrier(parties,Runnable barrierAction) 方法

  • 初始化相互等待的线程数量以及屏障线程的构造方法。
  • 屏障线程的运行时机:等待的线程数量 =parties 之后,CyclicBarrier 打开屏障之前。
    • 例如在分组计算中,每个线程负责一部分计算,最终这些线程计算结束之后,交由屏障线程进行汇总计算。

getParties 方法

  • 获取 CyclicBarrier 打开屏障的线程数量。

getNumberWaiting 方法

  • 获取正在 CyclicBarrier 上等待的线程数量。

await 方法

  • 在 CyclicBarrier 上进行阻塞等待,直到发生以下情形之一。
    • 在 CyclicBarrier 上等待的线程数量达到 parties,则所有线程被释放,继续执行。
    • 当前线程被中断,则抛出 InterruptedException 异常,并停止等待,继续执行。
    • 其他等待的线程被中断,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
    • 其他等待的线程超时,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
    • 其他线程调用 CyclicBarrier.reset() 方法,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
  • 线程调用 await() 表示自己已经到达栅栏。
  • BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程
    await() 时被中断或者超时。

await(timeout,TimeUnit) 方法

  • 在 CyclicBarrier 上进行限时的阻塞等待,直到发生以下情形之一。
    • 在 CyclicBarrier 上等待的线程数量达到 parties,则所有线程被释放,继续执行。
    • 当前线程被中断,则抛出 InterruptedException 异常,并停止等待,继续执行。
    • 当前线程等待超时,则抛出 TimeoutException 异常,并停止等待,继续执行。
    • 其他等待的线程被中断,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
    • 其他等待的线程超时,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
    • 其他线程调用 CyclicBarrier.reset() 方法,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。

isBroken 方法

  • 获取是否破损标志位 broken 的值,此值有以下几种情况。
    • CyclicBarrier 初始化时,broken=false,表示屏障未破损。
    • 如果正在等待的线程被中断,则 broken=true,表示屏障破损。
    • 如果正在等待的线程超时,则 broken=true,表示屏障破损。
    • 如果有线程调用 CyclicBarrier.reset() 方法,则 broken=false,表示屏障回到未破损状态。

reset 方法

  • 使 CyclicBarrier 回归初始状态,它做了两件事。
    • 如果有正在等待的线程,则会抛出 BrokenBarrierException 异常,且这些线程停止等待,继续执行。
    • 将是否破损标志位 broken 置为 false。

1.3 CyclicBarrier 和 CountDownLatch 的区别

  • CountDownLatch 是一个线程(或者多个),等待另外 N 个线程完成某个事情之后才能执行;CyclicBarrier 是 N 个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。
  • CountDownLatch 的计数器只能使用一次。而 CyclicBarrier 的计数器可以使用 reset() 方法重置;CyclicBarrier 能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
  • CountDownLatch 采用减计数方式;CyclicBarrier 采用加计数方式。

2. CyclicBarrier 原理

  • CyclicBarrier 内部使用了 ReentrantLock 和 Condition 两个类。

属性信息

//用于保护屏障入口的锁
private final ReentrantLock lock = new ReentrantLock();
//线程等待条件
private final Condition trip = lock.newCondition();
//记录参与等待的线程数
private final int parties;
//当所有线程到达屏障点之后,首先执行的命令
private final Runnable barrierCommand;
private Generation generation = new Generation();
//实际中仍在等待的线程数,每当有一个线程到达屏障点,count值就会减一;当一次新的运算开始后,count的值被重置为parties
private int count;

构造函数

  • CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程使用 await() 方法告诉 CyclicBarrier 已经到达了屏障,然后当前线程被阻塞。
  • CyclicBarrier 的另一个构造函数 CyclicBarrier(int parties, Runnable barrierAction),用于线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景。
public CyclicBarrier(int parties) {
    this(parties, null);
}
 
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

await 方法

  • 调用 await() 方法的线程告诉 CyclicBarrier 自己已经到达屏障,然后当前线程被阻塞。直到 parties 个参与线程调用了 await() 方法,CyclicBarrier 同样提供带超时时间的 await() 和不带超时时间的 await() 方法。
/**
* 线程持续等待直到此barrier上的所有线程都调用了await()方法.
*
* 如果当前线程并不是到达的最后一个线程,则它被禁用线程调度目的,并且处于休眠状态,直到发生以下事件之一:
* 1.最后一个线程到达;
* 2.其他线程中断了当前线程.
* 3.其它线程中断了其它等待的线程.
* 4.在barrier上面等待的线程发生超时.
* 5.其它线程调用了barrier上面的reset方法.
*
* 如果当前线程:
* 1.在进入这一方法时,中断状态位被标记.
* 2.在等待过程中被中断
* 则会抛出中断异常InterruptedException,且当前线程的中断状态被清除.
*
* 会抛出BrokenBarrierException异常的情况有:
* 1.当其它线程在等待时,如果barrier被reset;
* 2.当调用await()方法时barrier发生了broken
*
* 任意等待线程发生了中断异常时,其它等待线程都会抛出BrokenBarrierException,且barrier的状态会变为broken.
*
* 如果当前线程是最后一个到达barrier的线程,且构造函数中的barrier action非null,则在其它线程可以继续执行前,当前线程会执行
* barrier action.
* 如果在barrier action的执行过程中发生了异常,则该异常会对当前线程产生影响,且barrier的会处于broken状态.
*
* @return 当前线程到达索引,第一个到达的索引值为:getParties() - 1;
*         最后一个到达的索引值为:0
*/
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // 不超时等待
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
            BrokenBarrierException,
            TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}
  • 两个方法最终都会调用 dowait(boolean, long) 方法,它是 CyclicBarrier 的核心方法。
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
            TimeoutException {
    // 获取独占锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 当前代
        final Generation g = generation;
        // 如果这代损坏了,抛出异常
        if (g.broken)
            throw new BrokenBarrierException();
 
        // 如果线程中断了,抛出异常
        if (Thread.interrupted()) {
            // 将损坏状态设置为true
            // 并通知其他阻塞在此栅栏上的线程
            breakBarrier();
            throw new InterruptedException();
        }
 
        // 获取下标
        int index = --count;
        // 如果是 0,说明最后一个线程调用了该方法
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                // 执行栅栏任务
                if (command != null)
                    command.run();
                ranAction = true;
                // 更新一代,将count重置,将generation重置
                // 唤醒之前等待的线程
                nextGeneration();
                return 0;
            } finally {
                // 如果执行栅栏任务的时候失败了,就将损坏状态设置为true
                if (!ranAction)
                    breakBarrier();
            }
        }
 
        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                 // 如果没有时间限制,则直接等待,直到被唤醒
                if (!timed)
                    trip.await();
                // 如果有时间限制,则等待指定时间
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 当前代没有损坏
                if (g == generation && ! g.broken) {
                    // 让栅栏失效
                    breakBarrier();
                    throw ie;
                } else {
                    // 上面条件不满足,说明这个线程不是这代的
                    // 就不会影响当前这代栅栏的执行,所以,就打个中断标记
                    Thread.currentThread().interrupt();
                }
            }
 
            // 当有任何一个线程中断了,就会调用breakBarrier方法
            // 就会唤醒其他的线程,其他线程醒来后,也要抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
 
            // g != generation表示正常换代了,返回当前线程所在栅栏的下标
            // 如果 g == generation,说明还没有换代,那为什么会醒了?
            // 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。
            // 正是因为这个原因,才需要generation来保证正确。
            if (g != generation)
                return index;
            
            // 如果有时间限制,且时间小于等于0,销毁栅栏并抛出异常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放独占锁
        lock.unlock();
    }
}
  • 如果该线程不是最后一个调用 await() 方法的线程,则它会一直处于等待状态,除非发生以下情况。
    • 最后一个线程到达,即 index == 0。
    • 某个参与线程等待超时。
    • 某个参与线程被中断。
    • 调用了 CyclicBarrier 的 reset() 方法,将屏障重置为初始状态。

BrokenBarrierException 异常

  • 如果一个线程处于等待状态时,如果其他线程调用 reset(),或者调用的 barrier 原本就是被损坏的,则抛出 BrokenBarrierException 异常。
  • 任何线程在等待时被中断了,则其他所有线程都将抛出 BrokenBarrierException 异常,并将 barrier 置于损坏状态。

Generation 对象

  • Generation 描述着 CyclicBarrier 的更新换代。
    • 在 CyclicBarrier 中,同一批线程属于同一代。
    • 当有 parties 个线程到达 barrier 之后,Generation 就会被更新换代。
    • 其中 broken 标识当前 CyclicBarrier 是否已经处于中断状态。
/**
* barrier每一次使用都代表了一个generation实例.
* 当barrier发生trip或者reset时,对应的generation会发生改变.
* 由于非确定性,锁可能会分配给等待线程,因此可能会存在许多和使用barrier的线程相关的generation.
* 但是每次只能激活这些线程中的一个(使用计数的那个),并且其他的线程要么broken要么trip.
* 如果出现了一个暂停,但并未reset,则不需要一个激活的generation.
*/
private static class Generation {
    boolean broken = false;
}
  • 默认 barrier 是没有损坏的。
    • 当 barrier 损坏了或者有一个线程中断了,则通过 breakBarrier() 来终止所有的线程。
    • breakBarrier() 中除了将 broken 设置为 true,还会调用 signalAll 将在 CyclicBarrier 处于等待状态的线程全部唤醒。
private void breakBarrier() {
    // 设置状态
    generation.broken = true;
    // 恢复正在等待进入屏障的线程数量
    count = parties;
    // 唤醒所有线程
    trip.signalAll();
}
  • 当所有线程都已经到达 barrier 处(index == 0),则会通过 nextGeneration() 进行更新换代操作,在这个步骤中唤醒了所有线程,重置了 count 和 generation。
//当barrier发生trip时,用于更新状态并唤醒每一个线程.
//这一方法只在持有lock时被调用.
private void nextGeneration() {
    // signal completion of last generation
    // 唤醒所有线程
    trip.signalAll();
    // set up next generation
    // 恢复正在等待进入屏障的线程数量
    count = parties;
    // 新生一代
    generation = new Generation();
}

reset 方法

  • 需要先打破当前屏蔽,然后再重建一个新的屏蔽,否则可能会导致信号丢失。
/**
* 将barrier状态重置.如果此时有线程在barrier处等待,它们会抛出BrokenBarrierException并返回.
* 注意:请注意,由于其他原因发生broken后重置可能会很复杂;线程需要通过一些方式来 完成同步,并选择一种方式完成reset.
* 相对为后续的使用重建一个barrier,此重置操作更受欢迎.
* 注意:这是一个需要加锁的操作.
*/
public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

isBroken 方法

  • 判断此屏障是否处于中断状态。
    • 如果因为构造或最后一次重置而导致中断或超时,从而使一个或多个参与者摆脱此屏障点,或者因为异常而导致某个屏障操作失败,则返回 true。否则返回 false。
public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally {
        lock.unlock();
    }
}

getNumberWaiting 方法

//返回当前在屏障处等待的参与者数目,此方法主要用于调试和断言。
public int getNumberWaiting() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return parties - count;
    } finally {
        lock.unlock();
    }
}

2.1 程序示例

  • 从程序的执行结果中也可以看出,所有的工作线程都运行 await() 方法之后都到达了屏障/栅栏位置,然后,3 个工作线程才开始执行业务处理。
public class CyclicBarrierTest {
    // 自定义工作线程
    private static class Worker extends Thread {
        private CyclicBarrier cyclicBarrier;
        
        public Worker(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        
        @Override
        public void run() {
            super.run();
            
            try {
                System.out.println(Thread.currentThread().getName() + "开始等待其他线程");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + "开始执行");
                // 工作线程开始处理,这里用Thread.sleep()来模拟业务处理
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + "执行完毕");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
 
    public static void main(String[] args) {
        int threadCount = 3;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
        
        for (int i = 0; i < threadCount; i++) {
            System.out.println("创建工作线程" + i);
            Worker worker = new Worker(cyclicBarrier);
            worker.start();
        }
    }
}
/**
创建工作线程0
创建工作线程1
Thread-0开始等待其他线程
创建工作线程2
Thread-1开始等待其他线程
Thread-2开始等待其他线程
Thread-2开始执行
Thread-0开始执行
Thread-1开始执行
Thread-1执行完毕
Thread-0执行完毕
Thread-2执行完毕
*/

3. 使用 CyclicBarrier 的注意事项

  • CyclicBarrier 使用独占锁来执行 await() 方法,并发性可能不是很高。
  • 如果在等待过程中,线程被中断了,就抛出异常。
    • 但如果中断的线程所对应的 CyclicBarrier 不是这一代,比如在最后一次线程执行 signalAll 后,并且更新了这个 " 代 " 对象。在这个区间,这个线程被中断了,那么, JDK 认为任务已经完成,不必在乎中断,就只打了一个中断 interrupt() 标记。
  • 如果线程被其他的 CyclicBarrier 唤醒,那么 g 肯定等于 generation,这个事件就不能 return 了,而是继续循环阻塞。
    • 反之,如果是当前 CyclicBarrier 唤醒,就返回线程在 CyclicBarrier 的下标,表示完成了一次冲过屏障的过程。
  • CyclicBarrier 的 await() 方法是使用 ReentrantLock 和 Condition 控制实现的。
    • 当调用 CyclicBarrier 的 await() 方法会间接调用 ConditionObject 的 await() 方法,会向 Condition 的等待队列中加入元素,当屏障关闭后首先执行指定的barrierAction(),然后依次执行等待队列中的任务,有先后顺序。
  • CyclicBarrier 类中加锁的方法有 dowait()isBroken()reset()getNumberWaiting()

4. 总结

  • CyclicBarrier 的用途是让一组线程互相等待,直到全部到达某个公共屏障点才开始继续工作。
  • CyclicBarrier 是可以重复利用的。
  • 在等待的只要有一个线程发生中断,则其它线程就会被唤醒继续正常运行。
  • CyclicBarrier 指定的任务是进行 barrier 处最后一个线程来调用的,如果在执行这个任务发生异常时,则会传播到此线程,其它线程不受影响继续正常运行。

参考资料

https://blog.csdn.net/qq_38293564/article/details/80558157

相关文章

网友评论

    本文标题:【Java 并发笔记】CyclicBarrier 相关整理

    本文链接:https://www.haomeiwen.com/subject/fydndqtx.html