1. 作用
CyclicBarrier 可以 让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会释放。
注意,被阻塞的线程 ,不是同时 被释放的。但时间间隔几乎 可以说没有。
2. 使用方式
2.1 代码示例
public static void main(String[] args) {
// 到达屏障的线程数量 : 3
CyclicBarrier cb = new CyclicBarrier(3);
for (int i = 0; i < 3; i++) {
int finalI = i;
new Thread(()->{
try {
// 最后一个线程 休眠5秒, 看看所有到达屏障的线程 也是休眠5秒之后再执行。
if(finalI % 3 == 2){
Thread.sleep(5000);
}
// 阻塞在这里(也可以理解为到达屏障),直到 到达屏障的线程数量 = 3才会被释放。
cb.await();
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
System.out.println("主线程执行完毕........");
}
最终执行结果, 肯定是最后 到达屏障的 最先执行,其次才是之前到达屏障阻塞的所有线程。其他线程被释放的顺序 也不一定是 按照 await()方法的调用顺序来定的。稍后源码具体分析一下。
image2.2 api
2.2.1. 构造方法
一个有参构造,一个无参构造,有参构造相对于无参构造来说 多传入了一个Runnable实例, 当一个周期的屏障被 释放后, 可以执行 Runnable实例的 run方法逻辑。
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
2.2.2. await()
使线程阻塞。
还有一个带超时时间的 await(long timeout, TimeUnit unit), 阻塞超过一定时间 就会抛出TimeoutException异常。
3. 源码分析
3.1 大致原理
CyclicBarrier 是利用 ReentrantLock锁 和 ReentrantLock里的条件队列 来实现 所有线程 在屏障处 阻塞的效果的。
dowait 大体逻辑(省略 一些判断的代码)
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock(); // q1 加锁
try {
// 判断count(需要到达屏障 的线程数) q2
int index = --count;
if (index == 0) {
// 这个就是 有参构造传进来的第二个 参数 Runable 实例
final Runnable command = barrierCommand;
if (command != null)
// 由最后一个到达屏障的线程 同步调用
command.run();
// 释放所有 q3
nextGeneration();
return 0;
}
for (;;) {
try {
// 释放,进入条件队列阻塞 q4
trip.await();
// 被唤醒后 执行的代码 q5
if (g != generation)
return index;
}
}
} finally {
// 释放锁 q6
lock.unlock();
}
}
源码执行流程
假设 需要到达屏障 的线程数 count = 3.
当第一个线程进入dowait方法,来抢锁, 抢锁成功(q1),count - 1 =2(q2),执行q4代码 :会调用 ReentrantLock里的Condition实例的await方法 释放锁(并不是ReentrantLock.unlock方法),在 初始化条件队列里,并休眠 。
当第二个线程进入dowait方法,来抢锁, 抢锁成功(q1),count - 1 =1(q2) ,执行q4代码 :会调用 ReentrantLock里的Condition实例的await方法 释放锁(并不是ReentrantLock.unlock方法) ,会接在条件队列的队尾,并休眠。
当第count个(最后一个)线程进入dowait方法, 来抢锁, 抢锁成功 (q1),count - 1 =0(q2),当count =0时 , 执行q3代码:会把条件队列里的所有线程节点, 移动到 等待ReentrantLock的 阻塞队列中去。(此时其他阻塞的线程还没有唤醒 )。并开启下一个周期。q3 return出去后,执行finally代码块 ,q6, unlock()会唤醒阻塞队列的第一个线程, 然后执行自己的业务代码。 第一个阻塞线程在 q5醒来后 在,最终return,然后执行 finally q6 去 唤醒下一个 线程, 然后执行自己的业务代码。下一个线程 也在 q5处被唤醒,执行的也是一样的逻辑,需要在解锁的时候 唤醒自己的下一个节点。 直到所有的线程 都被唤醒。
问题
现在解释下之前的问题
1. 为什么 是到达屏障的最后一个线程 最先执行 ?
从上面 的执行逻辑 可以得知, 前面的线程 都 到 条件队列里 阻塞住了,最后一个到达屏障的 线程 才 会先发起 唤醒阻塞线程的动作。
2. 为什么 线程被释放的顺序 不是 按照 线程调用await()的时间(比较极限)?
原因就在于 dowait 在一开始 是需要抢锁的, 这把锁作为CyclicBarrier的成员变量 在 实例化 CyclicBarrier的时候被初始化。
image默认是把非公平锁。
如下图,这是某个线程持有锁 ,还没 进入 条件队列休眠,释放锁的 区间
image假设并发很高, 在这段代码里 就有线程b,c,d 因为 抢不到锁被阻塞,而进入 阻塞队列中等待。假设 当前持有锁的线程 a 这个时候刚刚 进条件队列释放锁,还没来得及去 唤醒 阻塞队列里的线程来抢锁, 或者唤醒了 ,还在抢的路上。
这个时候又来一个线程 e 因为是非公平锁, 走到这直接抢锁,不看阻塞队列是否有线程在排队, 万一抢到锁了, 就跑去 条件队列里 休眠了。那么 最终 进入条件的队列 顺序 是 a,e,b,c,d 被唤醒的顺序 也是 a,e,b,c,d。
所以说 线程 恢复工作的顺序 不是 按照调用 await的顺序,而是 按照抢到锁的时间,由于是把非公平锁,所以在 并发很高的 情况下, 会 有这种 情况发生。
3. 被阻塞的线程 ,为什么不是同时 被释放的?
因为 唤醒 是 上一个线程调用unlock方法 去唤醒下一个线程的,是一个接着一个来的,所以不是同时 被释放。
网友评论