CyclicBarrier是java并发包中的常用工具之一,通常被用来与CountDownlatch对比。CyclicBarrier能够实现CountDownLatch的效果,此外还能重复使用,而CountDownLatch则只能做一次计数。但是对于实现的源码而言,CyclicBarrier对于CountDowLatch有着本质的不同。CountDownLatch基于AQS的等待队列实现。而CyclicBarrier则依赖于ReentrantLock的Condition。我们前面在介绍AQS的时候知道,Condition与AQS本身的等待队列是采用的不同的队列。
1.类注释
CyclicBarrier是一种同步工具,它允许一组线程全部互相等待以到达一个公共的障碍点。CyclicBarrier在固定线程数量的程序中很有用。该线程有时会必须互相等待,该屏障称为cyclic,因为其可以在释放等待线程之后重新使用。
CyclicBarrier支持可选的Runnable命令,该命令在屏障的最后一个线程到达之后,在释放任何线程之前,每个屏障点操作一次,屏障操作对于在任何一方继续之前更新共享状态很有用。
示例如下:
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
Runnable barrierAction =
new Runnable() { public void run() { mergeRows(...); }};
barrier = new CyclicBarrier(N, barrierAction);
List<Thread> threads = new ArrayList<Thread>(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}
// wait until done
for (Thread thread : threads)
thread.join();
}
}}
在这个代码中,每个工作线程处理将会处理矩阵的一行,然后在内存的屏障处等待,直到所有的行都处理完毕,处理所有的行之后,将执行Runnable屏障操作并将其合并,如果合并确定已找到解决方案,则执行done(),并将返回true。并且每个worker将终止。
如果屏障操作不依赖于执行时暂停的各方,则该方中的任何线程都可以在释放该操作时执行该操作。为方便起见,每次调用await都会返回该线程在屏障处的到达索引。然后,您可以选择哪个线程应执行屏障操作,例如:
if (barrier.await() == 0) {
// log the completion of this iteration
}}
CyclicBarrier对于失败的同步尝试使用全无损模型,如果线程由于中断,失败,或者超时而过早离开屏障点,则在该屏障点等待的其他所有线程也将通过异常离开,BrokenBarrierException或者InterruptedException。(如果它们几乎同时被中断)
2.类结构及构造函数
CyclicBarrier的类结构如下:
image.png
在CyclicBarrier内部,存在一个Generation内部类。
private static class Generation {
boolean broken = false;
}
这个类的结构非常简单。但是在CyclicBarrier中作用非常关键,在Barrier的每次使用的时候,都会生成一个实例,每单CyclicBarrier被触发或者重置的时候,生成都会更改,这使得Barrier可以与线程关联产生很多Generation。由于这种不确定性,可以将锁分配给等待的线程,但是一次只能激活其中之一,其余的需要全部等待。如果有中断但是没有后续的重置,则不需要可用的Generation。
提供的构造函数如下:
2.1 CyclicBarrier(int parties, Runnable barrierAction)
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
2.2 CyclicBarrier(int parties)
此方法实际上是调用的前面的方法。无command处理。
public CyclicBarrier(int parties) {
this(parties, null);
}
3.成员变量及常量
变量及常量 | 类型 | 说明 |
---|---|---|
lock | private final ReentrantLock | 用于保护屏障的锁 |
trip | private final Condition | 等待条件,直到条件满足。 |
parties | private final int | 参与方数量。 |
barrierCommand | private final Runnable | 条件达到之后需要允许的线程。 |
generation | private Generation generation | 当前的Generation |
count | private int | 仍在等待的参与方数量。按照参与方数量不断倒数,直到为0,则产生一个新的Generation。 |
实际上通过变量表可以看出,Generation是一代的意思,那么也就是说,每次产生一个Generation,当这个Generation对应的count为0之后,再产生一个新的Genrtation。
4.重要方法
4.1 nextGeneration
此方法将更新barrier的状态,并将等待的线程都唤醒。但是调用这个方法需要持有锁。
private void nextGeneration() {
// signal completion of last generation
//调用trip的signalAll方法
trip.signalAll();
// set up next generation
//重置count,count随着到达的parties而减少
count = parties;
//产生新的gereration
generation = new Generation();
}
4.2 breakBarrier
设置当前的barrier的broken状态为true,并唤醒所有人。仅在锁定时调用。
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
4.3 dowait
主要的屏障代码,涵盖了各种策略。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//持有的锁
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//g为当前的generation
final Generation g = generation;
//如果g的briken为true,则屏障失效,并抛出异常
if (g.broken)
throw new BrokenBarrierException();
//如果线程被重点,则break屏障,且抛出异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//index为count减去1,index从0开始
int index = --count;
//如果index为0 说明已经截止,需要重新产生
if (index == 0) { // tripped
//如果ranAction为false
boolean ranAction = false;
try {
//定义command
final Runnable command = barrierCommand;
//如果command不为空,则执行run
if (command != null)
command.run();
//之后将runActrion修改为true
ranAction = true;
//产生下一个Generation
nextGeneration();
//返回0
return 0;
} finally {
//如果ranAtion不为true,则强行执行berak
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
//循环,直到条件被触发
//死循环
for (;;) {
try {
//如果传入没有等待时间,则await无限期等待
if (!timed)
trip.await();
//反之根据传入的nanos判断
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
//异常处理
} catch (InterruptedException ie) {
//如果g还是原有的generation,且没有被broken,则抛出异常
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
//反之
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
//执行中断 Thread.currentThread().interrupt();
}
}
//中断抛出异常
if (g.broken)
throw new BrokenBarrierException();
//g没有更新 则返回index
if (g != generation)
return index;
//如果timed且nanos小于0
if (timed && nanos <= 0L) {
//将barrier berak
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//解锁
lock.unlock();
}
}
4.4 await
调用dowait方法,不带时间,无限期等待。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
4.5 await(long timeout, TimeUnit unit)
携带等待时间,调用dowait方法。
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
4.6 isBroken
判断当前barrier是否被broken
public boolean isBroken() {
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//返回状态
return generation.broken;
} finally {
lock.unlock();
}
}
4.7 reset
重置。
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
//加锁,break 当前barrier
try {
breakBarrier(); // break the current generation
//产生下一个Generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
4.8 getNumberWaiting
public int getNumberWaiting() {
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//用parties减去count得到当前的waiting
return parties - count;
} finally {
lock.unlock();
}
}
5.总结
本文对CyclicBarrier的源码进行了分析,可以看到,CyclicBarrier是在AQS之上,基于ReentrantLock和Condition的综合应用。其核心是,每次计数,都产生一个Generate。之后根据参与者个数,计算阻塞的数量,当阻塞的线程达到参与者数量之后,就唤醒全部等待线程,然后产生一个新的Generate。而Generate内部只有一个boolean的变量,可以通过修改这个boolean的状态将Generate break。
网友评论