不多BB开门见山
CyclicBarrier作用主要是让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障踩会开门,所有被屏障拦截的线程才会继续运行
public class CyclicTest {
private static ExecutorService executorService = Executors.newCachedThreadPool();
private static AtomicInteger count= new AtomicInteger();
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
@Override
public void run() {
if (count.get()==0) {
System.out.println("4个人到齐了,可以打麻将了");
}else {
System.out.println("大家还想打一局");
}
}
});
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 8; i++) {
Thread.sleep(100);
executorService.submit(() -> {
try {
if (count.get()==0) {
System.out.println(String.format("%s:%s", Thread.currentThread().getName(), "来打麻将了"));
cyclicBarrier.await();
System.out.println(String.format("%s:%s", Thread.currentThread().getName(), "开始摸牌"));
count.incrementAndGet();
}else {
System.out.println(String.format("%s:%s", Thread.currentThread().getName(), "想在玩一局"));
cyclicBarrier.await();
System.out.println(String.format("%s:%s", Thread.currentThread().getName(), "第二局开始摸牌"));
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
while (!executorService.isTerminated()) {
}
System.out.println("这局结束后都累了,大家回去睡觉了");
}
}

我传入cyclicBarrier的屏障点的数量是4所以必须得四个线程调用cyclicBarrier.await()才可以往下执行,每一个线程调用cyclicBarrier.await()后达到屏障点的数量就会减1,直到屏障点为0后先执行传入的Runnable对象然后才继续执行下面的代码,也就是四个线程达到了屏障才能执行下面的代码,当第一批的4个线程达到屏障后,第二批也需要4个线程达到,证明了cyclicBarrier的屏障是可以重复起效果的,线程数量每达到一次屏障点就会初始化为最初值
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
private static class Generation {
boolean broken = false;
}
/*可重入锁*/
private final ReentrantLock lock = new ReentrantLock();
/*等待队列*/
private final Condition trip = lock.newCondition();
/*参与等待的线程数量*/
private final int parties;
/*当所有线程达到屏障点之后,首先执行的任务*/
private final Runnable barrierCommand;
/*实际中仍在等待的线程数,每当有一个线程到达屏障点,count值就会减少一;
当一次新的运算开始后,count会被重置为parties*/
private int count;
/*表示当前屏障的状态,true被破坏,false没有被破坏*/
private Generation generation = new Generation();
核心方法await
该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态
直到所有线程都到达屏障点,当前线程才会被唤醒
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
dowait()方法
timed:true表示当前线程进入timed_waiting状态,false表示当前线程进入waiting状态。
nanos:线程超时等待的时长
方法的主要步骤
1、对当前操作进行加锁
2、判断当前屏障是否被打破或者当前线程被中断,抛出异常。
3、若步骤二均未发生,判断当前线程是否是到达屏障点的最后一个线程,如果是,执行BarrierCommand(如果有),唤醒全部在Condition队列上等待的线程,并重置CyclicBarrier的状态,更新generation。
4、若当前线程不是最后一个到达屏障点的线程,那么当前线程被挂起,直至被唤醒/中断/超时。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final CyclicBarrier.Generation g = generation;
/*如果当前屏障被破坏,抛出异常*/
if (g.broken)
throw new BrokenBarrierException();
/*如果当前调用await函数的线程被中断,调用breakBarrier函数并抛出异常*/
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
/*由于当前线程已经达到屏障点,那么等待到达屏障点的线程数量减少一*/
int index = --count;
/*index为0说明所有的线程都到达了屏障点*/
if (index == 0) { // tripped
/*判断是否有优先执行的任务*/
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
/*有优先执行的任务,那么由当前调用await()的线程执行该任务*/
if (command != null)
command.run();
ranAction = true;
/*执行唤醒全部线程的操作*/
nextGeneration();
return 0;
} finally {
/*说明在执行barrierCommand的时候发生了异常*/
if (!ranAction)
breakBarrier();
}
}
/*若所有的线程还未全部到达屏障点*/
for (;;) {
try {
/*判断阻塞方式,并将当前线程加入Condition队列中*/
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//若遭遇中断
//这个中断异常是从Condition中抛出来的,对应THROW_IN
if (g == generation && ! g.broken) {
breakBarrier();
/*将当前异常直接抛出*/
throw ie;
} else {
/*
* 这里两种情况:
* 1. g!=generation,说明被reset过
* 2. g==generation&&g.broken, 说明屏障被破坏了
* 对于第二种情况,当前屏障已经被破坏,说明这次中断是属于这一轮等待到达屏障点的,接下来的操作会直接抛出异常(对应if(g.broken))
* 对于第一种情况,由于g!=generation, 说明reset了或者这一轮到达屏障点的任务已经完成,那么接下来的操作会直接退出循环(对应if(g!=generation)),所以这次中断是属于后续执行的操作,我们把中断外抛。
*/
Thread.currentThread().interrupt();
}
}
/*判断屏障是否被破坏*/
if (g.broken)
throw new BrokenBarrierException();
/*
* 若当前线程被唤醒发现generation发生改变
* 这里有两种情况:
* 1. CyclicBarrier被reset
* 2. 说明最后一个线程到达屏障点
*/
if (g != generation)
return index;
/*当前线程等待超时*/
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
nextGeneration()主要做的就是唤醒等待的线程然后对屏障点进行初始化
private void nextGeneration() {
/*唤醒Condition队列中的所有线程*/
trip.signalAll();
/*更新count为初始值*/
count = parties;
/*更新generator*/
generation = new CyclicBarrier.Generation();
}
breakBarrier(): 唤醒所有Condition队列中的线程,并重置CyclicBarrier的成员变量count,并且把generation的broken变量设置为true,表示当前屏障被破坏。
private void breakBarrier() {
/*表示当前屏障被破坏*/
generation.broken = true;
/*更新count为初始值*/
count = parties;
/*唤醒Condition队列中的所有线程*/
trip.signalAll();
}
/*重置CyclicBarrier*/
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
/*判断屏障是否被破坏*/
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
网友评论