什么是CyclicBarrier
CyclicBarrier
是一个多线程协调工具。每个工作线程处理完逻辑后阻塞等待,当所有线程(parties个数)都处于阻塞(await)状态的时候,执行特定的动作(barrierAction)。CyclicBarrier
特别适合用于在工作线程继续执行之前更新共享资源。
使用方法
CyclicBarrier
使用起来比较简单。它的构造函数如下:
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
其中parties
的含义为越过barrier是需要有多少个线程调用await
方法(进入阻塞状态)。
barrierAction
的含义为越过barrier的时候需要执行的动作(比如更新共享资源)。
工作线程执行完业务逻辑需要等待的时候,调用await
方法即可。
下面介绍下CyclicBarrier
的源代码。
await方法
await
方法作用是让工作线程进入等待状态。代码如下:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
注意:
- 如果线程被中断,抛出
InterruptedException
。 - 如果本线程在await阻塞的时候其他线程中断或等待超时,或者是
CyclicBarrier
被reset,或者是await状态的时候barrier被突破,或者是barrierAction
执行时候抛出异常,这时候抛出BrokenBarrierException
。
还有另一个版本的await
方法如下所示,它规定了线程处于等待状态的最长时间。
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
注意:如果等待超时,抛出TimeoutException
。
CyclicBarrier
内部有一个ReentrantLock
,用于在操作CyclicBarrier
内部变量的时候加锁,防止造成并发问题。除此之外还有一个Condition
对象,用于协调所有线程的等待和唤醒操作。代码如下所示:
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
CyclicBarrier
内部还有一个很重要的类Generation
。一个Generation
可以认为是CyclicBarrier
的一个轮回,即从各个线程调用await
方法到触发barrierAction
整个过程。
遇到如下的情况之一,CyclicBarrier
会进入新的Generation
:
- 调用了重置(
reset
)方法。 - 共有
parties
个线程进入了await
状态,barrierAction成功执行完成。 - barrier被突破的时候(等待线程被中断,barrierAction执行过程抛出异常,有线程等待超时,
CyclicBarrier
被reset
)。
接下来我们看看dowait
方法。该方法有两个参数,分别为是否有等待时间限制和等待时间。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 获取持有的可重入锁
final ReentrantLock lock = this.lock;
// 获得锁
lock.lock();
try {
// 获取generation
final Generation g = generation;
// 如果barrier已经被突破
if (g.broken)
throw new BrokenBarrierException();
// 如果线程被中断,突破屏障
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 需要wait的线程数量减1
int index = --count;
// 如果wait状态的线程数量足够,可以触发barrierCommand执行
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 运行barrierCommand
final Runnable command = barrierCommand;
if (command != null)
// 这里可以得出一个结论,barrierAction由最后一个调用await方法的线程执行
command.run();
ranAction = true;
// 进入下一代
nextGeneration();
return 0;
} finally {
// 如果执行barrierCommand出现错误,所有等待的线程直接突破屏障继续执行
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// 如果--count不为0
for (;;) {
try {
// 根据是否设定await时间调用condition对应的等待方法
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 如果线程被中断,没有进入下一代也没有breakBarrier
if (g == generation && ! g.broken) {
// 所有等待线程突破屏障
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
// 中断当前线程
Thread.currentThread().interrupt();
}
}
// 如果已调用过breakBarrier
if (g.broken)
throw new BrokenBarrierException();
// 如果进入了新一代(调用reset方法或者是触发的barrierCommand执行完毕)
if (g != generation)
return index;
// 如果是定时任务并且定时时间小于等于0,报超时异常
// 或者说线程调用的是限时await方法,等待超时后也会抛出超时异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 解锁
lock.unlock();
}
}
CyclicBarrier
内部有一个变量叫做count
,用来表示还有多少个线程到达await
状态。在初始化的时候设置为parties
。每一个线程调用了await
方法,count
会减1。一旦count
为0的时候,说明有parties
个线程都进入了await
状态,开始执行barrierAction
。执行完毕之后,唤醒所有await
的线程。
breakBarrier
突破屏障的方法代码如下:
注意:只有在持有锁的时候才可以调用。
private void breakBarrier() {
// 设置generation已突破状态为true
generation.broken = true;
// 重置count数量为parties数
count = parties;
// 唤醒所有等待的线程
trip.signalAll();
}
Generation
每次CyclicBarrier
等待中线程数量到达parties数,触发barrierCommand,或者是调用了reset
方法重置了CyclicBarrier
,会进入一个新的generation。
Generation
中只有一个状态量broken
,用来表示是否调用了breakBarrier
方法。
private static class Generation {
boolean broken = false;
}
nextGeneration
方法,用于控制CyclicBarrier
进入新的generation。
在下面2种情形会进入到新的generation:
- 足够数量的线程(parties个)进入await状态后触发
barrierCommand
。barrierCommand
执行完毕,进入新的generation - 调用了
reset
方法
private void nextGeneration() {
// signal completion of last generation
// 唤醒所有等待的线程
trip.signalAll();
// set up next generation
// 重置count为parties
count = parties;
// 生成新的generation并替换
generation = new Generation();
}
reset方法
用于重置CyclicBarrier
到初始状态。这时候如果有线程处于等待状态,会抛出BrokenBarrierException
。
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
和CountDownLatch的不同之处
- 等待逻辑不同。
CountDownLatch
为工作线程不阻塞,需要操作共享资源的线程阻塞(await),工作线程调用countDown
n次之后操作共享资源的线程脱离阻塞恢复执行。CyclicBarrier
则相反,工作线程阻塞等待(调用await),没有专门的线程执行共享资源操作(barrierAction
)。当进入await状态的线程达到预定数量(parties)的时候,谁最后一个进入await状态谁执行barrierAction
,然后所有线程恢复执行。 -
CountDownLatch
的内部状态(count)不可以重置,使用后必须重新创建。但是CyclicBarrier
的状态是可以重置的。
本文为原创内容,欢迎大家讨论、批评指正与转载。转载时请注明出处。
网友评论