回环栅栏,通过它可以实现让一组线程在栅栏前等待,直到栅栏打开,再按AQS锁队列中的顺序依此执行,当然这里有插队情况。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。想想CountDownLatch,它是让一些线程等待另一些线程执行完再执行。
先来看看用法:
public class CyclicBarrierTest {
static class Writer extends Thread {
private CyclicBarrier barrier;
public Writer(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
System.out.println("Thread name " + Thread.currentThread().getName() + " is writing");
try {
Thread.sleep(5000);
System.out.println("Thread name " + Thread.currentThread().getName() + " write over");
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("All thread is done," +Thread.currentThread().getName() + " can start forward");
}
}
public static void main(String[] args) {
final int n = 4;
final CyclicBarrier barrier = new CyclicBarrier(n,
() -> System.out.println("Choose " + Thread.currentThread().getName() +" thread to do this mission"));
for (int i = 0; i < n; i++) {
new Writer(barrier).start();
}
}
}
运行结果:
Thread name Thread-0 is writing
Thread name Thread-2 is writing
Thread name Thread-1 is writing
Thread name Thread-3 is writing
Thread name Thread-3 write over
Thread name Thread-0 write over
Thread name Thread-1 write over
Thread name Thread-2 write over
Choose Thread-2 thread to do this mission
All thread is done,Thread-2 can start forward
All thread is done,Thread-3 can start forward
All thread is done,Thread-1 can start forward
All thread is done,Thread-0 can start forward
所有线程都被栅栏挡住直到全部执行完(上面例子会挑一个线程执行run方法),再被一起释放;
源码
先来看看构造函数以及成员变量
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties;
private final Runnable barrierCommand;
private Generation generation = new Generation();
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
分三条线来讲源码:
- 调用await()
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
- 调用await(long timeout, TimeUnit unit)
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
- 中断处理
dowait
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
dowait方法是核心,我们先来按照第一条线来讲,timed = false,nanos = 0L。
初始设置有四个线程A,B,C,D,A线程执行完自己的业务逻辑,最后await——>dowait,执行到lock(这里的lock指的是ReentrentLock),BCD执行完业务由于lock阻塞就得等待被放入到锁队列(指由AQS维护的队列,有别于Condition的条件队列)中等待,接着A会进入for循环执行trip.await(),这里调用了Condition的await方法,之前文章介绍过它,此时A就会被放入到条件队列中等待,唤醒在锁队列中等待的线程。(假设锁队列中顺序是B,C,D)B就会被唤醒,执行A一样的步骤,最后被放入条件队列等待;C也一样;最后轮到D,index == 0 为true,执行nextGeneration
private void nextGeneration() {
// 调用了Condition的signalAll,之前分析过,将所有条件队列中的节点放回到锁队列,
//并不会唤醒它们
trip.signalAll();
// 我们说CyclicBarrier是可重复的,实现就在这,重置count与generation
count = parties;
generation = new Generation();
}
之后D会执行lock.unlock(),锁队列中的线程将会依此被唤醒;
(以上假设的执行顺序只是一种情况,实际中可能BCD并不会等待,可能A之后唤醒的不是B,因为ReentrentLock默认是非公平的....等等)
第二条线:tined为true,nanos也有了值;
还按照上面的情景,A会执行trip.awaitNanos
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//创建当前线程节点,加入到条件队列
Node node = addConditionWaiter();
//唤醒锁队列中的下个节点
int savedState = fullyRelease(node);
//计算等待截止时间
final long deadline = System.nanoTime() + nanosTimeout;
//标识你的中断处理,是抛出还是重新恢复中断状态
int interruptMode = 0;
while (!isOnSyncQueue(node)) { //检测是否在锁队列
if (nanosTimeout <= 0L) {
//节点状态设为0,放回到锁队列
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold) //1秒
LockSupport.parkNanos(this, nanosTimeout); //挂起
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
//尝试更改同步状态,否则继续等待
//(执行到这说明节点已经在锁队列中。情况一:被前一个节点唤醒后,跳出循环执行到这。
//情况二:由于超时,线程不在被挂起,抢到执行资格后将自己放回到锁队列后,
//尝试更改同步状态,即确认是否轮到自己执行了,存在**插队**可能)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters(); //解除可达性,方便GC回收
if (interruptMode != 0)
//确定中断策略,立即抛出或恢复中断标记//
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime(); //返回差值,用于之后判断
}
(while循环条件是isOnSyncQueue,检测节点是否被放回锁队列,为什么要有这个检测?超时节点会执行transferAfterCancelledWait放回操作;未超时signal同样会将节点放回锁队列,待之后节点在锁队列中被唤醒,会跳出循环执行acquireQueued操作。)
接着我们第二条线的逻辑,A被放入条件队列中阻塞,在锁队列中的BCD依此被唤醒,BC加入条件队列,最后D会唤醒它们;这是一种可能,现在我们假设,A超时了,A线程恢复就绪状态,若它抢到执行资格,代码执行transferAfterCancelledWait,它会将自己放回到锁队列中并尝试更改同步状态(即代码到了acquireQueued逻辑,失败会被挂起等待唤醒),也就是尝试插队,也就是说不在遵守CyclicBarrier规则,即它不用再等其他线程执行完了。在这里我们假设它更改同步状态成功,意味着独占,接下来返回时间差值,由于超时一定是小于0 的,回到dowait执行breakBarrier,
(这里可能会有疑惑,方法不是有锁保护吗A怎么还能执行,注意不要和Synchronized内置锁搞混,JUC是利用同步状态的CAS更改操作来进行执行资格的收放从而确保线程安全,得不到资格的会被放入到队列中挂起,利用LockSupport)
(超时的线程会恢复线程的就绪状态所以可能插队先一步执行)
private void breakBarrier() {
generation.broken = true; //标记broken为true,之后线程抛异常
count = parties; //恢复初值
trip.signalAll(); //将所有条件队列中的节点返回锁队列(注意并不唤醒线程)
}
总结一下就是,如果有一个线程超时了,他将会把自己放回到锁队列,无论他是否插队成功,总之轮到它执行(acquireQueued成功),他会将所有条件队列中的节点放回到锁队列,然后抛TimeoutException异常,那么之后新线程(指刚调动dowait的线程会先进入锁队列,它们不会进入条件队列,轮到它执行时会直接抛BrokenBarrierException异常),那么原本在条件队列中等待的那些线程,它们被放回到了锁队列等到轮到它们执行会根据nanos来判断是否抛TimeoutException异常。
所以一旦有线程再条件队列中等待时超时,新线程抛出BrokenBarrierException异常代表CyclicBarrier已经被破坏,实际功能相当于一个ReentrentLock。
那么问题来了?当你看到TimeoutException时代表什么呢?它仅仅只能代表任务执行超时了,我的意思是说条件队列中线程没有超时,最后一个线程也执行完了await逻辑,此时节点全在锁队列中,它们被依次唤醒,这个过程会消耗时间,造成TimeoutException。每种异常都向你传达了相应的的信息。---
当你看到BrokenBarrierException,代表CyclicBarrier已被破坏,就应该依据你想实现的功能来更改你的等待时间。
中断处理
在dowait中:
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
如果当前运行的线程在进入条件队列前,检查出自己被中断,那么breakBarrier被调用,条件队列中节点被放回到锁队列,自己抛InterruptedException异常,新线程(定义在上面)会抛BrokenBarrierException。
breakBarrier方法的调用表明CyclicBarrier规则被破坏,实际变为一个ReentrentLock。
ConditionObject里的awaitNanos方法,一开始处会检测中断;之后若等待期间被中断,会执行checkInterruptWhileWaiting
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
意思是将节点加入锁队列,成功则将interruptMode 赋值为THROW_IE 代表要立即抛出,否则为REINTERRUPT代表恢复中断标记,不同的中断策略在reportInterruptAfterWait具体实现。
回到dowait
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) { //等待中被中断
//g.broken为false,则调用breakBarrier,抛InterruptedException异常
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
//g.broken为true,只是恢复中断标记,
//接下来会抛出BrokenBarrierException
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
前面分析过BrokenBarrierException代表CyclicBarrier被破坏,所以我想它的重要性要过与InterruptedException,所以上面当线程两个都有时就抛BrokenBarrierException异常。
网友评论