简介
JDK中提供了一些用于线程之间协同等待的工具类,CountDownLatch和CyclicBarrier就是最典型的两个线程同步辅助类。
<b>CountDownLatch :</b>一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待
<b>CyclicBarrier :</b>一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。
二者功能实现的区别:
1.CountDownLatch是一次性的,CyclicBarrier可以重设置。
2.CountDownLatch强调一个线程等多个线程完成某件事情。CyclicBarrier是多个线程互等,等大家都完成。
3.CyclicBarrier有getNumberWaiting接口返回被阻塞的线程数
二者功能介绍:
<h4>使用CountDownLatch实现:</h4>
1、5个运动员相继都准备就绪
2、教练员响起发令枪
3、运动员起跑
流程图:
<pre>
</pre>
demo code:
<pre>
public class TestCountDownLatch {
private static final int RUNNER_NUMBER = 5; // 运动员个数
private static final Random RANDOM = new Random();
public static void main(String[] args) {
// 用于判断发令之前运动员是否已经完全进入准备状态,需要等待5个运动员,所以参数为5
CountDownLatch readyLatch = new CountDownLatch(RUNNER_NUMBER);
// 用于判断裁判是否已经发令,只需要等待一个裁判,所以参数为1
CountDownLatch startLatch = new CountDownLatch(1);
for (int i = 0; i < RUNNER_NUMBER; i++) {
Thread t = new Thread(new Runner((i + 1) + "号运动员", readyLatch, startLatch));
t.start();
}
try {
readyLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("裁判:所有运动员准备完毕,开始...");
startLatch.countDown();
}
static class Runner implements Runnable {
private CountDownLatch readyLatch;
private CountDownLatch startLatch;
private String name;
public Runner(String name, CountDownLatch readyLatch, CountDownLatch startLatch) {
this.name = name;
this.readyLatch = readyLatch;
this.startLatch = startLatch;
}
public void run() {
int readyTime = RANDOM.nextInt(1000);
try {
Thread.sleep(readyTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + ":我已经准备完毕.");
readyLatch.countDown();
try {
startLatch.await(); // 等待裁判发开始命令
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + ":开跑...");
}
}
}
---------------------------输出----------------------------------------------------------
3号运动员:我已经准备完毕.
1号运动员:我已经准备完毕.
4号运动员:我已经准备完毕.
5号运动员:我已经准备完毕.
2号运动员:我已经准备完毕.
裁判:所有运动员准备完毕,开始...
3号运动员:开跑...
4号运动员:开跑...
1号运动员:开跑...
2号运动员:开跑...
5号运动员:开跑...
</pre>
<h4>使用CyclicBarrier模拟实现:</h4>
1、前端调用restful api,已知商品id以后,调用后端商品起价接口、商品图片信息接口
2、调用后端商品起价接口、商品图片信息接口
3、在汇总模块中将多个接口的值拼接组合返回给前端
流程图:
<pre>
Paste_Image.png
</pre>
demo code:
<pre>
public class TestCyclicBarrier {
private static final int THREAD_NUMBER = 2;
private static final Random RANDOM = new Random();
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(THREAD_NUMBER, new Runnable() {
public void run() {
System.out.println("2个接口都调用完成进行后续处理。。。");
}
});
for (int i = 0; i < THREAD_NUMBER; i++) {
Thread t = new Thread(new Worker(barrier,i));
t.start();
}
}
static class Worker implements Runnable {
private CyclicBarrier barrier;
private int apiIndex;
public Worker(CyclicBarrier barrier,int apiIndex) {
this.barrier = barrier;
this.apiIndex = apiIndex;
}
public void run() {
int time = RANDOM.nextInt(1000);
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("接口" + apiIndex + "调用完成");
try {
barrier.await(); // 等待所有线程都调用过此函数才能进行后续动作
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
---------------------------输出----------------------------------------------------------
接口0调用完成
接口1调用完成
2个接口都调用完成进行后续处理。。。
</pre>
<h4>CountDownLatch源码:</h4>
CountDownLatch.countDown:
<pre>
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //尝试将初始化的state减运算
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed,只执行这里,中间一大段都会被跳过
break;
}
}
</pre>
CountDownLatch.await:
<pre>
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //state状态不为0.既
doAcquireSharedInterruptibly(arg);
}
Node节点的waitStatus取值:
static final int CANCELLED = 1; //节点因为超时或者中断被取消。该状态不会再发生变,而且被取消节点对应的线程不会再发生阻塞。
static final int SIGNAL = -1; //后继节点将被或者已经被阻塞,所以当前节点在释放或者取消时,需要unpark它的后继节点。
static final int CONDITION = -2; //该状态仅供在条件队列中的节点使用。当该节点转移到同步队列中时,该状态将被设置为0。
static final int PROPAGATE = -3; //仅在共享模式下使用。在doReleaseShared()方法中,仅仅会设置头节点的状态为PROPAGATE。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); //共享的形式
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor(); //前一个节点
if (p == head) {
int r = tryAcquireShared(arg); //判断state值
if (r >= 0) {
setHeadAndPropagate(node, r); //设置头结点为node,且最终设置头结点的waitStatus为Node.PROPAGATE且唤醒node后面的状态<0的某个结点(如果有的话)
p.next = null; // help GC
failed = false;
return; //结束阻塞
}
}
if (shouldParkAfterFailedAcquire(p, node) && //shouldParkAfterFailedAcquire 判断前一个结点的状态来确定结点是否应该被阻塞
parkAndCheckInterrupt()) //阻塞且判断是否被中断,如果被中断则抛异常中断
throw new InterruptedException();
}
} finally {
if (failed) //异常的话acquire失败,在阻塞队列中取消node,如果node为头结点的话,且唤醒node后面的状态<0的某个结点(如果有的话)
cancelAcquire(node);
}
}
</pre>
<h4>CyclicBarrier源码:</h4>
CyclicBarrier:因为这个类代码量比较少全局分析一下:
<pre>
public class CyclicBarrier {
private static class Generation {
boolean broken = false; //当前代被中止
}
private final ReentrantLock lock = new ReentrantLock(); //lock
private final Condition trip = lock.newCondition();
private final int parties; //屏障需要拦截的任务数
private final Runnable barrierCommand; //线程都执行结束到这个屏障以后执行执行的任务
private Generation generation = new Generation(); //当前代,主要是为了reset的时候预留
private int count; //用于统计剩余任务数,初始化的时候=parties。为0的时候结束释放锁,lock.unlock();
private void nextGeneration() {
// signal completion of last generation
trip.signalAll(); //通知所有被阻塞的线程
// set up next generation
count = parties;
generation = new Generation();
}
private void breakBarrier() {
generation.broken = true; //中断
count = parties; //count被重置
trip.signalAll();
}
private int dowait(boolean timed, long nanos) //可以定义是否带超时的等待
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken) //屏障被中断
throw new BrokenBarrierException();
if (Thread.interrupted()) { //当前线程被中断
breakBarrier(); //唤醒所有
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run(); //如果barrierCommand不为空的话执行任务,不是start重启一个线程
ranAction = true;
nextGeneration(); //notice all 且更新换代
return 0;
} finally {
if (!ranAction)
breakBarrier(); //如果执行barrierCommand失败的话,唤醒所有wait的线程
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;)
try {
if (!timed) //如果没有设置wait时间话
trip.await(); //阻塞
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos); //设置等待时间
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) { //未中断
breakBarrier();//则重新中断
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
public CyclicBarrier(int parties, Runnable barrierAction) { //初始化
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) { //只有线程数,没有task,表示到达barrier后什么也不做
this(parties, null);
}
public int getParties() {
return parties;
}
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen;
}
}
public int await(long timeout, TimeUnit unit) //设置阻塞的最大时间
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
public boolean isBroken() { //返回屏障的阻塞状态,感觉用ReentrantReadWriteLock 的readLock效率更高
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
public void reset() { //重置
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;//返回被阻塞的线程数
} finally {
lock.unlock();
}
}
}
</pre>
<b>思考:</b>
1、CountDownLatch和CyclicBarrier都是通过设置的阻塞任务数减操作,而不是网上有些人说的CyclicBarrier就是通过count加知道任务数(JDK 1.7)
2、CountDownLatch更多是直接结合AQS来做阻塞使用的是共享锁,而CyclicBarrier是直接用ReentrantLock来实现使用的是排它锁,虽然ReentrantLock也是使用了AQS来实现,
3、CyclicBarrier的代码结构看起来更简单清晰,CountDownLatch用的很多基础的AQS的方法
4、CountDownLatch偏向于计数,CyclicBarrier可以指定栅栏结束以后的任务也方便重置当前栅栏
网友评论