CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会【开门】,所有被屏障拦截的线程才会继续干活。
CycliBarrier 是基于 ReentrantLock实现的,看看它的构造器和属性。
private static class Generation {
boolean broken = false;
}
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition(); //等待开门的线程队列
/** The number of parties */
private final int parties;//在触发屏障之前必须调用await的线程数
/* The command to run when tripped */
private final Runnable barrierCommand;//当屏障被触发时执行的命令
/** The current generation */
private Generation generation = new Generation(); //当前代
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private int count;//仍在等待的线程数量。每个generation从parties倒计时到0。它会在创建新的generation或generation损坏时重置为parties。
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
可以看到一个Generation内部类,因为barrier是可以循环使用的,generation代表barrier的一次使用,暂且称为代。每当barrier开门或重置时,generation就会发生变化。
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();//上一代完成,唤醒所有上一代
// set up next generation
count = parties;//重置count
generation = new Generation();
}
内部的broken代表是否被破坏,ture代表barrier已损坏。
//将当前barrier的generation设置为已损坏并唤醒所有当代线程。仅在持有锁时调用。
private void breakBarrier() {
generation.broken = true;
count = parties;//重置count
trip.signalAll();//唤醒所有trip中的线程
}
barrier的await方法可中断,当线程被中断且没有达到开门条件,会破坏并重置Barrier;其他线程调用reset方法重置barrier时也会破坏并重置Barrier;当线程发现barrier已经被破坏就会抛BrokenBarrierException异常。
当最后一个线程到达时,barrier会开门(唤醒所有线程)并进入下一代,await返回线程完成的顺序,值越大代表越先完成。
核心方法主要实现在dowait方法:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)//如果当前barrier已损坏,抛BrokenBarrierException
throw new BrokenBarrierException();
if (Thread.interrupted()) {//调用该方法的线程有中断标记,破坏barrier后中断
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // 等于0时代表所有线程执行完毕,开门
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {//阻塞直到开门、损坏、中断或超时
try {
if (!timed)
trip.await(); //直接阻塞
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {//barrier开门或者reset几乎同时线程中断,if条件不满足,就认为barrier开门或者reset行为先完成
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)//调用了reset
throw new BrokenBarrierException();
if (g != generation)
return index;//线程到达的索引顺序
if (timed && nanos <= 0L) {//超时
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
重置barrier的方法:
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // 破坏barrier
nextGeneration(); // 重置
} finally {
lock.unlock();
}
}
网友评论