文前说明
作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。
本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。
1. 简介
- CyclicBarrier(循环屏障/栅栏) 类似于 CountDownLatch(闭锁),它能阻塞一组线程直到某个事件的发生。
- 与闭锁的关键区别在于,所有的线程必须同时到达屏障位置,才能继续执行。
- 闭锁用于等待事件,而屏障用于等待其他线程。
- CyclicBarrier 可以使一定数量的线程反复地在屏障位置处汇集。当线程到达屏障位置时将调用
await()
方法,这个方法将阻塞直到所有线程都到达屏障位置。如果所有线程都到达屏障位置,那么屏障将打开,此时所有的线程都将被释放,而屏障将被重置以便下次使用。
- CyclicBarrier 是 JDK 1.5 的 java.util.concurrent 并发包中提供的一个并发工具类。
- 所谓 Cyclic 即循环的意思,所谓 Barrier 即屏障的意思。
- CyclicBarrier 是一个同步辅助类,它允许一组线程相互等待直到所有线程都到达一个公共的屏障点。
- 在程序中有固定数量的线程,这些线程有时候必须等待彼此,这种情况下,使用 CyclicBarrier 很有帮助。
- 这个屏障之所以用循环修饰,是因为在所有的线程释放彼此之后,这个屏障是可以 重新使用 的。
1.1 CyclicBarrier 的应用场景
- CyclicBarrier 常用于多线程分组计算。
- 比如一个大型的任务,常常需要分配好多子任务去执行,只有当所有子任务都执行完成时候,才能执行主任务,这时候,就可以选择 CyclicBarrier。
1.2 CyclicBarrier 方法说明
CyclicBarrier(parties) 方法
- 初始化相互等待的线程数量的构造方法。
CyclicBarrier(parties,Runnable barrierAction) 方法
- 初始化相互等待的线程数量以及屏障线程的构造方法。
- 屏障线程的运行时机:等待的线程数量
=parties
之后,CyclicBarrier 打开屏障之前。- 例如在分组计算中,每个线程负责一部分计算,最终这些线程计算结束之后,交由屏障线程进行汇总计算。
getParties 方法
- 获取 CyclicBarrier 打开屏障的线程数量。
getNumberWaiting 方法
- 获取正在 CyclicBarrier 上等待的线程数量。
await 方法
- 在 CyclicBarrier 上进行阻塞等待,直到发生以下情形之一。
- 在 CyclicBarrier 上等待的线程数量达到 parties,则所有线程被释放,继续执行。
- 当前线程被中断,则抛出 InterruptedException 异常,并停止等待,继续执行。
- 其他等待的线程被中断,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
- 其他等待的线程超时,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
- 其他线程调用
CyclicBarrier.reset()
方法,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
- 线程调用
await()
表示自己已经到达栅栏。 - BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程
await()
时被中断或者超时。
await(timeout,TimeUnit) 方法
- 在 CyclicBarrier 上进行限时的阻塞等待,直到发生以下情形之一。
- 在 CyclicBarrier 上等待的线程数量达到 parties,则所有线程被释放,继续执行。
- 当前线程被中断,则抛出 InterruptedException 异常,并停止等待,继续执行。
- 当前线程等待超时,则抛出 TimeoutException 异常,并停止等待,继续执行。
- 其他等待的线程被中断,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
- 其他等待的线程超时,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
- 其他线程调用
CyclicBarrier.reset()
方法,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
isBroken 方法
- 获取是否破损标志位 broken 的值,此值有以下几种情况。
- CyclicBarrier 初始化时,
broken=false
,表示屏障未破损。 - 如果正在等待的线程被中断,则
broken=true
,表示屏障破损。 - 如果正在等待的线程超时,则
broken=true
,表示屏障破损。 - 如果有线程调用
CyclicBarrier.reset()
方法,则broken=false
,表示屏障回到未破损状态。
- CyclicBarrier 初始化时,
reset 方法
- 使 CyclicBarrier 回归初始状态,它做了两件事。
- 如果有正在等待的线程,则会抛出 BrokenBarrierException 异常,且这些线程停止等待,继续执行。
- 将是否破损标志位 broken 置为 false。
1.3 CyclicBarrier 和 CountDownLatch 的区别
- CountDownLatch 是一个线程(或者多个),等待另外 N 个线程完成某个事情之后才能执行;CyclicBarrier 是 N 个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。
- CountDownLatch 的计数器只能使用一次。而 CyclicBarrier 的计数器可以使用
reset()
方法重置;CyclicBarrier 能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。 - CountDownLatch 采用减计数方式;CyclicBarrier 采用加计数方式。
2. CyclicBarrier 原理
- CyclicBarrier 内部使用了 ReentrantLock 和 Condition 两个类。
属性信息
//用于保护屏障入口的锁
private final ReentrantLock lock = new ReentrantLock();
//线程等待条件
private final Condition trip = lock.newCondition();
//记录参与等待的线程数
private final int parties;
//当所有线程到达屏障点之后,首先执行的命令
private final Runnable barrierCommand;
private Generation generation = new Generation();
//实际中仍在等待的线程数,每当有一个线程到达屏障点,count值就会减一;当一次新的运算开始后,count的值被重置为parties
private int count;
构造函数
- CyclicBarrier 默认的构造方法是
CyclicBarrier(int parties)
,其参数表示屏障拦截的线程数量,每个线程使用await()
方法告诉 CyclicBarrier 已经到达了屏障,然后当前线程被阻塞。 - CyclicBarrier 的另一个构造函数
CyclicBarrier(int parties, Runnable barrierAction)
,用于线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景。
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
await 方法
- 调用
await()
方法的线程告诉 CyclicBarrier 自己已经到达屏障,然后当前线程被阻塞。直到 parties 个参与线程调用了await()
方法,CyclicBarrier 同样提供带超时时间的await()
和不带超时时间的await()
方法。
/**
* 线程持续等待直到此barrier上的所有线程都调用了await()方法.
*
* 如果当前线程并不是到达的最后一个线程,则它被禁用线程调度目的,并且处于休眠状态,直到发生以下事件之一:
* 1.最后一个线程到达;
* 2.其他线程中断了当前线程.
* 3.其它线程中断了其它等待的线程.
* 4.在barrier上面等待的线程发生超时.
* 5.其它线程调用了barrier上面的reset方法.
*
* 如果当前线程:
* 1.在进入这一方法时,中断状态位被标记.
* 2.在等待过程中被中断
* 则会抛出中断异常InterruptedException,且当前线程的中断状态被清除.
*
* 会抛出BrokenBarrierException异常的情况有:
* 1.当其它线程在等待时,如果barrier被reset;
* 2.当调用await()方法时barrier发生了broken
*
* 任意等待线程发生了中断异常时,其它等待线程都会抛出BrokenBarrierException,且barrier的状态会变为broken.
*
* 如果当前线程是最后一个到达barrier的线程,且构造函数中的barrier action非null,则在其它线程可以继续执行前,当前线程会执行
* barrier action.
* 如果在barrier action的执行过程中发生了异常,则该异常会对当前线程产生影响,且barrier的会处于broken状态.
*
* @return 当前线程到达索引,第一个到达的索引值为:getParties() - 1;
* 最后一个到达的索引值为:0
*/
public int await() throws InterruptedException, BrokenBarrierException {
try {
// 不超时等待
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
- 两个方法最终都会调用
dowait(boolean, long)
方法,它是 CyclicBarrier 的核心方法。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 当前代
final Generation g = generation;
// 如果这代损坏了,抛出异常
if (g.broken)
throw new BrokenBarrierException();
// 如果线程中断了,抛出异常
if (Thread.interrupted()) {
// 将损坏状态设置为true
// 并通知其他阻塞在此栅栏上的线程
breakBarrier();
throw new InterruptedException();
}
// 获取下标
int index = --count;
// 如果是 0,说明最后一个线程调用了该方法
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 执行栅栏任务
if (command != null)
command.run();
ranAction = true;
// 更新一代,将count重置,将generation重置
// 唤醒之前等待的线程
nextGeneration();
return 0;
} finally {
// 如果执行栅栏任务的时候失败了,就将损坏状态设置为true
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
// 如果没有时间限制,则直接等待,直到被唤醒
if (!timed)
trip.await();
// 如果有时间限制,则等待指定时间
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 当前代没有损坏
if (g == generation && ! g.broken) {
// 让栅栏失效
breakBarrier();
throw ie;
} else {
// 上面条件不满足,说明这个线程不是这代的
// 就不会影响当前这代栅栏的执行,所以,就打个中断标记
Thread.currentThread().interrupt();
}
}
// 当有任何一个线程中断了,就会调用breakBarrier方法
// 就会唤醒其他的线程,其他线程醒来后,也要抛出异常
if (g.broken)
throw new BrokenBarrierException();
// g != generation表示正常换代了,返回当前线程所在栅栏的下标
// 如果 g == generation,说明还没有换代,那为什么会醒了?
// 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。
// 正是因为这个原因,才需要generation来保证正确。
if (g != generation)
return index;
// 如果有时间限制,且时间小于等于0,销毁栅栏并抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 释放独占锁
lock.unlock();
}
}
- 如果该线程不是最后一个调用
await()
方法的线程,则它会一直处于等待状态,除非发生以下情况。- 最后一个线程到达,即 index == 0。
- 某个参与线程等待超时。
- 某个参与线程被中断。
- 调用了 CyclicBarrier 的
reset()
方法,将屏障重置为初始状态。
BrokenBarrierException 异常
- 如果一个线程处于等待状态时,如果其他线程调用
reset()
,或者调用的 barrier 原本就是被损坏的,则抛出 BrokenBarrierException 异常。 - 任何线程在等待时被中断了,则其他所有线程都将抛出 BrokenBarrierException 异常,并将 barrier 置于损坏状态。
Generation 对象
- Generation 描述着 CyclicBarrier 的更新换代。
- 在 CyclicBarrier 中,同一批线程属于同一代。
- 当有 parties 个线程到达 barrier 之后,Generation 就会被更新换代。
- 其中 broken 标识当前 CyclicBarrier 是否已经处于中断状态。
/**
* barrier每一次使用都代表了一个generation实例.
* 当barrier发生trip或者reset时,对应的generation会发生改变.
* 由于非确定性,锁可能会分配给等待线程,因此可能会存在许多和使用barrier的线程相关的generation.
* 但是每次只能激活这些线程中的一个(使用计数的那个),并且其他的线程要么broken要么trip.
* 如果出现了一个暂停,但并未reset,则不需要一个激活的generation.
*/
private static class Generation {
boolean broken = false;
}
- 默认 barrier 是没有损坏的。
- 当 barrier 损坏了或者有一个线程中断了,则通过
breakBarrier()
来终止所有的线程。 - 在
breakBarrier()
中除了将 broken 设置为 true,还会调用 signalAll 将在 CyclicBarrier 处于等待状态的线程全部唤醒。
- 当 barrier 损坏了或者有一个线程中断了,则通过
private void breakBarrier() {
// 设置状态
generation.broken = true;
// 恢复正在等待进入屏障的线程数量
count = parties;
// 唤醒所有线程
trip.signalAll();
}
- 当所有线程都已经到达 barrier 处(index == 0),则会通过
nextGeneration()
进行更新换代操作,在这个步骤中唤醒了所有线程,重置了 count 和 generation。
//当barrier发生trip时,用于更新状态并唤醒每一个线程.
//这一方法只在持有lock时被调用.
private void nextGeneration() {
// signal completion of last generation
// 唤醒所有线程
trip.signalAll();
// set up next generation
// 恢复正在等待进入屏障的线程数量
count = parties;
// 新生一代
generation = new Generation();
}
reset 方法
- 需要先打破当前屏蔽,然后再重建一个新的屏蔽,否则可能会导致信号丢失。
/**
* 将barrier状态重置.如果此时有线程在barrier处等待,它们会抛出BrokenBarrierException并返回.
* 注意:请注意,由于其他原因发生broken后重置可能会很复杂;线程需要通过一些方式来 完成同步,并选择一种方式完成reset.
* 相对为后续的使用重建一个barrier,此重置操作更受欢迎.
* 注意:这是一个需要加锁的操作.
*/
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
isBroken 方法
- 判断此屏障是否处于中断状态。
- 如果因为构造或最后一次重置而导致中断或超时,从而使一个或多个参与者摆脱此屏障点,或者因为异常而导致某个屏障操作失败,则返回 true。否则返回 false。
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
getNumberWaiting 方法
//返回当前在屏障处等待的参与者数目,此方法主要用于调试和断言。
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
2.1 程序示例
- 从程序的执行结果中也可以看出,所有的工作线程都运行
await()
方法之后都到达了屏障/栅栏位置,然后,3 个工作线程才开始执行业务处理。
public class CyclicBarrierTest {
// 自定义工作线程
private static class Worker extends Thread {
private CyclicBarrier cyclicBarrier;
public Worker(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
super.run();
try {
System.out.println(Thread.currentThread().getName() + "开始等待其他线程");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + "开始执行");
// 工作线程开始处理,这里用Thread.sleep()来模拟业务处理
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "执行完毕");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
int threadCount = 3;
CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
for (int i = 0; i < threadCount; i++) {
System.out.println("创建工作线程" + i);
Worker worker = new Worker(cyclicBarrier);
worker.start();
}
}
}
/**
创建工作线程0
创建工作线程1
Thread-0开始等待其他线程
创建工作线程2
Thread-1开始等待其他线程
Thread-2开始等待其他线程
Thread-2开始执行
Thread-0开始执行
Thread-1开始执行
Thread-1执行完毕
Thread-0执行完毕
Thread-2执行完毕
*/
3. 使用 CyclicBarrier 的注意事项
- CyclicBarrier 使用独占锁来执行
await()
方法,并发性可能不是很高。 - 如果在等待过程中,线程被中断了,就抛出异常。
- 但如果中断的线程所对应的 CyclicBarrier 不是这一代,比如在最后一次线程执行 signalAll 后,并且更新了这个 " 代 " 对象。在这个区间,这个线程被中断了,那么, JDK 认为任务已经完成,不必在乎中断,就只打了一个中断
interrupt()
标记。
- 但如果中断的线程所对应的 CyclicBarrier 不是这一代,比如在最后一次线程执行 signalAll 后,并且更新了这个 " 代 " 对象。在这个区间,这个线程被中断了,那么, JDK 认为任务已经完成,不必在乎中断,就只打了一个中断
- 如果线程被其他的 CyclicBarrier 唤醒,那么 g 肯定等于 generation,这个事件就不能 return 了,而是继续循环阻塞。
- 反之,如果是当前 CyclicBarrier 唤醒,就返回线程在 CyclicBarrier 的下标,表示完成了一次冲过屏障的过程。
- CyclicBarrier 的
await()
方法是使用 ReentrantLock 和 Condition 控制实现的。- 当调用 CyclicBarrier 的
await()
方法会间接调用 ConditionObject 的await()
方法,会向 Condition 的等待队列中加入元素,当屏障关闭后首先执行指定的barrierAction()
,然后依次执行等待队列中的任务,有先后顺序。
- 当调用 CyclicBarrier 的
- CyclicBarrier 类中加锁的方法有
dowait()
,isBroken()
,reset()
,getNumberWaiting()
。
4. 总结
- CyclicBarrier 的用途是让一组线程互相等待,直到全部到达某个公共屏障点才开始继续工作。
- CyclicBarrier 是可以重复利用的。
- 在等待的只要有一个线程发生中断,则其它线程就会被唤醒继续正常运行。
- CyclicBarrier 指定的任务是进行 barrier 处最后一个线程来调用的,如果在执行这个任务发生异常时,则会传播到此线程,其它线程不受影响继续正常运行。
网友评论