美文网首页javaAndroid知识Android开发
CountDownLatch or CyclicBarrier

CountDownLatch or CyclicBarrier

作者: hello_coke | 来源:发表于2016-12-07 22:22 被阅读35次

    简介

    JDK中提供了一些用于线程之间协同等待的工具类,CountDownLatch和CyclicBarrier就是最典型的两个线程同步辅助类。
    <b>CountDownLatch :</b>一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待
    <b>CyclicBarrier :</b>一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。

    二者功能实现的区别:

    1.CountDownLatch是一次性的,CyclicBarrier可以重设置。
    2.CountDownLatch强调一个线程等多个线程完成某件事情。CyclicBarrier是多个线程互等,等大家都完成。
    3.CyclicBarrier有getNumberWaiting接口返回被阻塞的线程数

    二者功能介绍:

    <h4>使用CountDownLatch实现:</h4>
    1、5个运动员相继都准备就绪
    2、教练员响起发令枪
    3、运动员起跑
    流程图:
    <pre>

    Paste_Image.png
    </pre>
    demo code:
    <pre>
    public class TestCountDownLatch {
    private static final int RUNNER_NUMBER = 5; // 运动员个数
    private static final Random RANDOM = new Random();
    public static void main(String[] args) {
    // 用于判断发令之前运动员是否已经完全进入准备状态,需要等待5个运动员,所以参数为5
    CountDownLatch readyLatch = new CountDownLatch(RUNNER_NUMBER);
    // 用于判断裁判是否已经发令,只需要等待一个裁判,所以参数为1
    CountDownLatch startLatch = new CountDownLatch(1);
    for (int i = 0; i < RUNNER_NUMBER; i++) {
    Thread t = new Thread(new Runner((i + 1) + "号运动员", readyLatch, startLatch));
    t.start();
    }
    try {
    readyLatch.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("裁判:所有运动员准备完毕,开始...");
    startLatch.countDown();
    }
    static class Runner implements Runnable {
    private CountDownLatch readyLatch;
    private CountDownLatch startLatch;
    private String name;
    public Runner(String name, CountDownLatch readyLatch, CountDownLatch startLatch) {
    this.name = name;
    this.readyLatch = readyLatch;
    this.startLatch = startLatch;
    }
    public void run() {
    int readyTime = RANDOM.nextInt(1000);
    try {
    Thread.sleep(readyTime);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println(name + ":我已经准备完毕.");
    readyLatch.countDown();
    try {
    startLatch.await(); // 等待裁判发开始命令
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println(name + ":开跑...");
    }
    }
    }
    ---------------------------输出----------------------------------------------------------
    3号运动员:我已经准备完毕.
    1号运动员:我已经准备完毕.
    4号运动员:我已经准备完毕.
    5号运动员:我已经准备完毕.
    2号运动员:我已经准备完毕.
    裁判:所有运动员准备完毕,开始...
    3号运动员:开跑...
    4号运动员:开跑...
    1号运动员:开跑...
    2号运动员:开跑...
    5号运动员:开跑...
    </pre>
    <h4>使用CyclicBarrier模拟实现:</h4>
    1、前端调用restful api,已知商品id以后,调用后端商品起价接口、商品图片信息接口
    2、调用后端商品起价接口、商品图片信息接口
    3、在汇总模块中将多个接口的值拼接组合返回给前端
    流程图:
    <pre>
    Paste_Image.png
    </pre>
    demo code:
    <pre>
    public class TestCyclicBarrier {
    private static final int THREAD_NUMBER = 2;
    private static final Random RANDOM = new Random();
    public static void main(String[] args) {
    CyclicBarrier barrier = new CyclicBarrier(THREAD_NUMBER, new Runnable() {
    public void run() {
    System.out.println("2个接口都调用完成进行后续处理。。。");
    }
    });
    for (int i = 0; i < THREAD_NUMBER; i++) {
    Thread t = new Thread(new Worker(barrier,i));
    t.start();
    }
    }
    static class Worker implements Runnable {
    private CyclicBarrier barrier;
    private int apiIndex;
    public Worker(CyclicBarrier barrier,int apiIndex) {
    this.barrier = barrier;
    this.apiIndex = apiIndex;
    }
    public void run() {
    int time = RANDOM.nextInt(1000);
    try {
    Thread.sleep(time);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("接口" + apiIndex + "调用完成");
    try {
    barrier.await(); // 等待所有线程都调用过此函数才能进行后续动作
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (BrokenBarrierException e) {
    e.printStackTrace();
    }
    }
    }
    }
    ---------------------------输出----------------------------------------------------------
    接口0调用完成
    接口1调用完成
    2个接口都调用完成进行后续处理。。。
    </pre>
    <h4>CountDownLatch源码:</h4>
    CountDownLatch.countDown:
    <pre>
    public void countDown() {
    sync.releaseShared(1);
    }
    public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { //尝试将初始化的state减运算
    doReleaseShared();
    return true;
    }
    return false;
    }
    private void doReleaseShared() {
    for (;;) {
    Node h = head;
    if (h != null && h != tail) {
    int ws = h.waitStatus;
    if (ws == Node.SIGNAL) {
    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    continue; // loop to recheck cases
    unparkSuccessor(h);
    }
    else if (ws == 0 &&
    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    continue; // loop on failed CAS
    }
    if (h == head) // loop if head changed,只执行这里,中间一大段都会被跳过
    break;
    }
    }
    </pre>
    CountDownLatch.await:
    <pre>
    public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    if (tryAcquireShared(arg) < 0) //state状态不为0.既
    doAcquireSharedInterruptibly(arg);
    }
    Node节点的waitStatus取值:
    static final int CANCELLED = 1; //节点因为超时或者中断被取消。该状态不会再发生变,而且被取消节点对应的线程不会再发生阻塞。
    static final int SIGNAL = -1; //后继节点将被或者已经被阻塞,所以当前节点在释放或者取消时,需要unpark它的后继节点。
    static final int CONDITION = -2; //该状态仅供在条件队列中的节点使用。当该节点转移到同步队列中时,该状态将被设置为0。
    static final int PROPAGATE = -3; //仅在共享模式下使用。在doReleaseShared()方法中,仅仅会设置头节点的状态为PROPAGATE。
    private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED); //共享的形式
    boolean failed = true;
    try {
    for (;;) {
    final Node p = node.predecessor(); //前一个节点
    if (p == head) {
    int r = tryAcquireShared(arg); //判断state值
    if (r >= 0) {
    setHeadAndPropagate(node, r); //设置头结点为node,且最终设置头结点的waitStatus为Node.PROPAGATE且唤醒node后面的状态<0的某个结点(如果有的话)
    p.next = null; // help GC
    failed = false;
    return; //结束阻塞
    }
    }
    if (shouldParkAfterFailedAcquire(p, node) && //shouldParkAfterFailedAcquire 判断前一个结点的状态来确定结点是否应该被阻塞
    parkAndCheckInterrupt()) //阻塞且判断是否被中断,如果被中断则抛异常中断
    throw new InterruptedException();
    }
    } finally {
    if (failed) //异常的话acquire失败,在阻塞队列中取消node,如果node为头结点的话,且唤醒node后面的状态<0的某个结点(如果有的话)
    cancelAcquire(node);
    }
    }
    </pre>
    <h4>CyclicBarrier源码:</h4>
    CyclicBarrier:因为这个类代码量比较少全局分析一下:
    <pre>
    public class CyclicBarrier {
    private static class Generation {
    boolean broken = false; //当前代被中止
    }
    private final ReentrantLock lock = new ReentrantLock(); //lock
    private final Condition trip = lock.newCondition();
    private final int parties; //屏障需要拦截的任务数
    private final Runnable barrierCommand; //线程都执行结束到这个屏障以后执行执行的任务
    private Generation generation = new Generation(); //当前代,主要是为了reset的时候预留
    private int count; //用于统计剩余任务数,初始化的时候=parties。为0的时候结束释放锁,lock.unlock();
    private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll(); //通知所有被阻塞的线程
    // set up next generation
    count = parties;
    generation = new Generation();
    }
    private void breakBarrier() {
    generation.broken = true; //中断
    count = parties; //count被重置
    trip.signalAll();
    }
    private int dowait(boolean timed, long nanos) //可以定义是否带超时的等待
    throws InterruptedException, BrokenBarrierException,
    TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    final Generation g = generation;
    if (g.broken) //屏障被中断
    throw new BrokenBarrierException();
    if (Thread.interrupted()) { //当前线程被中断
    breakBarrier(); //唤醒所有
    throw new InterruptedException();
    }
    int index = --count;
    if (index == 0) { // tripped
    boolean ranAction = false;
    try {
    final Runnable command = barrierCommand;
    if (command != null)
    command.run(); //如果barrierCommand不为空的话执行任务,不是start重启一个线程
    ranAction = true;
    nextGeneration(); //notice all 且更新换代
    return 0;
    } finally {
    if (!ranAction)
    breakBarrier(); //如果执行barrierCommand失败的话,唤醒所有wait的线程
    }
    }
    // loop until tripped, broken, interrupted, or timed out
    for (;;)
    try {
    if (!timed) //如果没有设置wait时间话
    trip.await(); //阻塞
    else if (nanos > 0L)
    nanos = trip.awaitNanos(nanos); //设置等待时间
    } catch (InterruptedException ie) {
    if (g == generation && ! g.broken) { //未中断
    breakBarrier();//则重新中断
    throw ie;
    } else {
    Thread.currentThread().interrupt();
    }
    }
    if (g.broken)
    throw new BrokenBarrierException();
    if (g != generation)
    return index;
    if (timed && nanos <= 0L) {
    breakBarrier();
    throw new TimeoutException();
    }
    }
    } finally {
    lock.unlock();
    }
    }
    public CyclicBarrier(int parties, Runnable barrierAction) { //初始化
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
    }
    public CyclicBarrier(int parties) { //只有线程数,没有task,表示到达barrier后什么也不做
    this(parties, null);
    }
    public int getParties() {
    return parties;
    }
    public int await() throws InterruptedException, BrokenBarrierException {
    try {
    return dowait(false, 0L);
    } catch (TimeoutException toe) {
    throw new Error(toe); // cannot happen;
    }
    }
    public int await(long timeout, TimeUnit unit) //设置阻塞的最大时间
    throws InterruptedException,
    BrokenBarrierException,
    TimeoutException {
    return dowait(true, unit.toNanos(timeout));
    }
    public boolean isBroken() { //返回屏障的阻塞状态,感觉用ReentrantReadWriteLock 的readLock效率更高
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    return generation.broken;
    } finally {
    lock.unlock();
    }
    }
    public void reset() { //重置
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    breakBarrier(); // break the current generation
    nextGeneration(); // start a new generation
    } finally {
    lock.unlock();
    }
    }
    public int getNumberWaiting() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    return parties - count;//返回被阻塞的线程数
    } finally {
    lock.unlock();
    }
    }
    }
    </pre>
    <b>思考:</b>
    1、CountDownLatch和CyclicBarrier都是通过设置的阻塞任务数减操作,而不是网上有些人说的CyclicBarrier就是通过count加知道任务数(JDK 1.7)
    2、CountDownLatch更多是直接结合AQS来做阻塞使用的是共享锁,而CyclicBarrier是直接用ReentrantLock来实现使用的是排它锁,虽然ReentrantLock也是使用了AQS来实现,
    3、CyclicBarrier的代码结构看起来更简单清晰,CountDownLatch用的很多基础的AQS的方法
    4、CountDownLatch偏向于计数,CyclicBarrier可以指定栅栏结束以后的任务也方便重置当前栅栏

    相关文章

      网友评论

        本文标题:CountDownLatch or CyclicBarrier

        本文链接:https://www.haomeiwen.com/subject/twrcmttx.html