用于协调多个线程同步执行操作的场合。
CyclicBarrier barrier = new CyclicBarrier(5);
for (int i = 0; i < 5; i++) {
new MyThread("线程-" + (i + 1), barrier).start();
}
public class MyThread extends Thread {
private final CyclicBarrier barrier;
private final Random random = new Random();
public MyThread(String name, CyclicBarrier barrier) {
super(name);
this.barrier = barrier;
}
@Override
public void run() {
try {
Thread.sleep(random.nextInt(2000));
System.out.println(Thread.currentThread().getName() + " - 已经到 达公司");
barrier.await();
Thread.sleep(random.nextInt(2000));
System.out.println(Thread.currentThread().getName() + " - 已经笔 试结束");
barrier.await();
Thread.sleep(random.nextInt(2000));
System.out.println(Thread.currentThread().getName() + " - 已经面 试结束");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}super.run();
} }
在整个过程中,有2个同步点:第1个同步点,要等所有应聘者都到达公司,再一起开始笔试;第2个同步点,要等所有应聘者都结束笔试,之后一起进入面试环节。
CyclicBarrier实现原理
CyclicBarrier基于ReentrantLock+Condition实现。
public class CyclicBarrier {
private final ReentrantLock lock = new ReentrantLock(); // 用于线程之间相互唤醒
private final Condition trip = lock.newCondition(); // 线程总数
private final int parties; private int count;
private Generation generation = new Generation();
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException(); // 参与方数量
this.parties = parties; this.count = parties;
// 当所有线程被唤醒时,执行barrierCommand表示Runnable。
this.barrierCommand = barrierAction;
}
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
} }
image.png
- CyclicBarrier是可以被重用的。以上一节的应聘场景为例,来了5个线程,这5个线程互相等待,到齐后一起被唤醒,各自执行接下来的逻辑;然后,这5个线程继续互相等待,到齐后再一起被唤醒。每一轮被称为一个Generation,就是一次同步点。
- CyclicBarrier 会响应中断。5个线程没有到齐,如果有线程收到了中断信号,所有阻塞的线程也会被唤醒,就是上面的breakBarrier()方法。然后count被重置为初始值(parties),重新开始。
- 上面的回调方法,barrierAction只会被第5个线程执行1次(在唤醒其他4个线程之前),而不是5个线程每个都执行1次。
网友评论