CyclicBarrier允许一组线程在到达某个栅栏点(common barrier point)互相等待,直到最后一个线程到达栅栏点,栅栏才会打开,处于阻塞状态的线程恢复继续执行。
举例
举个例子来说明CyclicBarrier的使用:
比如吃鸡游戏4排,需要等4个队友均点击准备才可以开启比赛。
public class CyclicBarrierTest {
static class Player implements Runnable{
private String id;
private CyclicBarrier cyclicBarrier;
public Player() {
}
public Player(String id, CyclicBarrier cyclicBarrier) {
this.id = id;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try{
System.out.println(System.currentTimeMillis() + ":##" + id + "##开始赛前准备");
Thread.sleep((long) (Math.random() * 10000));
System.out.println(System.currentTimeMillis() + ":##" + id + "##准备完毕");
cyclicBarrier.await();
System.out.println(System.currentTimeMillis() + ":##" + id + "##进入刺激战场");
Thread.sleep((long) (Math.random() * 10000));
System.out.println(System.currentTimeMillis() + ":##" + id + "##成盒");
}catch (Exception e){
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
String[] ids = new String[]{"吃鸡帅萌新", "草丛伏地魔", "P城钢枪王", "AWM无敌狙神"};
CyclicBarrier barrier = new CyclicBarrier(4);
for(int i=0; i<4; i++){
service.execute(new Player(ids[i], barrier));
}
service.shutdown();
}
}
点击运行:
1595389608469:##吃鸡帅萌新##开始赛前准备
1595389608469:##草丛伏地魔##开始赛前准备
1595389608470:##P城钢枪王##开始赛前准备
1595389608470:##AWM无敌狙神##开始赛前准备
1595389609325:##草丛伏地魔##准备完毕
1595389609438:##P城钢枪王##准备完毕
1595389615813:##AWM无敌狙神##准备完毕
1595389618075:##吃鸡帅萌新##准备完毕
1595389618075:##吃鸡帅萌新##进入刺激战场
1595389618075:##草丛伏地魔##进入刺激战场
1595389618075:##AWM无敌狙神##进入刺激战场
1595389618075:##P城钢枪王##进入刺激战场
1595389619555:##吃鸡帅萌新##成盒
1595389624833:##草丛伏地魔##成盒
1595389625086:##P城钢枪王##成盒
1595389626059:##AWM无敌狙神##成盒
可以看到,当最后1个人准备好之后,4个人同时进入到刺激战场,相当于同时"冲破栅栏"。
源码剖析
首先看一下构造函数:
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
parties表示需要拦截的线程数,barrierAction主要是为了处理更加复杂的场景,当线程到达栅栏的时候,优先执行barrierAction。
接着看一下await方法:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
继续跟dowait方法:
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 当前线程获取独占锁
lock.lock();
try {
final Generation g = generation;
// 若栅栏已被打破,抛出BrokenBarrierException异常
if (g.broken)
throw new BrokenBarrierException();
// 只要有1个线程被中断,则打破栅栏
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 对count执行减1操作
// 最后一个到达栅栏的线程,才会执行下述代码
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 若barrierAction不为null,则优先执行barrierAction
if (command != null)
command.run();
ranAction = true;
// 创建下一代栅栏
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 只要不是最后一个线程,就执行自旋,直到栅栏被触发、线程被中断、等待超时
for (;;) {
try {
// 无超时设置
if (!timed)
// 当前线程被添加到Condition的条件队列中,阻塞挂起
trip.await();
// 有超时设置
else if (nanos > 0L)
// 当前线程被添加到Condition的条件队列中,阻塞挂起nanos纳秒
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
上述代码还是比较容易理解的,线程依次获取到独占锁,并对count执行减1操作,只要count未变为0,执行trip.await()后,则当前线程会被添加到Condition的条件队列中,阻塞挂起,等待唤醒、中断或超时等待等动作的发生。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 当前线程被添加到Condition的条件等待队列中
Node node = addConditionWaiter();
// 释放锁
long savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 当前线程被阻塞挂起
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
当最后一个线程获取锁到达栅栏时,count执行减1操作后正好为0,紧接着会执行nextGeneration操作:
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}
nextGeneration会执行trip.signalAll()将阻塞在trip上的线程依次唤醒,在trip(Condition)的await方法的阻塞处继续往下执行。很明显,会接着执行acquireQueued(node, savedState)方法,各个阻塞线程依次被添加到AQS的同步队列中去,参与获取独占锁的操作。
最后一个线程将其他阻塞线程唤醒后,紧接着会重置count和generation字段,从而实现栅栏的循环利用。
signalAll是Condition的接口方法,但其实现是在AQS中定义的,不清楚的可以去看一下我之前写的AQS源码详解系列,此处不再赘述。
总结
显然,CyclicBarrier是基于ReentrantLock和Condition来实现的,基本原理就是创建1个Condition,然后各个线程依次获取lock,执行Condition的await方法阻塞挂起。当最后一个线程(第parties个)到达栅栏时,会调用nextGeneration方法,唤醒Condition等待队列上的各个阻塞线程,并重置栅栏。唤醒后的线程将依次尝试获取锁执行后续代码。
网友评论