CyclicBarrier.png
Constructor
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
// 参与方数目,如果都到达,那么放开限制
this.parties = parties;
this.count = parties;
// 当限制放开,会执行barrierCommand
this.barrierCommand = barrierAction;
}
await
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 每一个参与人要进入wait流程的第一要素是,首先要拿锁
lock.lock();
try {
final Generation g = generation;
// 如果当前generation不可用,报错
if (g.broken)
throw new BrokenBarrierException();
// 中断处理
if (Thread.interrupted()) {
// 唤醒所有人并设置generation为broken
breakBarrier();
throw new InterruptedException();
}
// 到这里说明该参与人已经就位,计数减一
int index = --count;
// 如果计数归零,说明全部参与人都已经就位
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
// 如果有设定barrierCommand,那么开始执行该线程
command.run();
ranAction = true;
// 前面是上个阶段的barrier已经放开,这里重新再复用barrier
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 到这里,说明参与人没有全部就位,那么进入wait模式,等待人员到齐
for (;;) {
try {
// 都是等待condition,无非一个有时间限制,一个无限等待直到被唤醒或中断
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 如果在等待的过程中,人员都没到齐,却发生了中断,且并没有broken
// 那么没办法提前放开barrier
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
// 保存中断状态,或换代,或broken
Thread.currentThread().interrupt();
}
}
// 唤醒或中断后的处理。
// broken直接报错
if (g.broken)
throw new BrokenBarrierException();
// 如果已经换代,那么说明一切正常,返回需要的参与人数量
if (g != generation)
return index;
// 等待超时处理
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
Other
nextGeneration
// 这是复用CyclicBarrier的关键,重新生成generation
private void nextGeneration() {
// signal completion of last generation
// 唤醒所有wait中的参与人
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
breakBarrier
private void breakBarrier() {
// 设置generation的状态为broken,代表当前的barrier不可用
generation.broken = true;
count = parties;
// 既然这个barrier不可用,那么参与人wait也就没有意义,全部都唤醒
trip.signalAll();
}
网友评论