美文网首页
JUC原理之CyclicBarrier

JUC原理之CyclicBarrier

作者: AlienPaul | 来源:发表于2020-03-20 13:00 被阅读0次

什么是CyclicBarrier

CyclicBarrier是一个多线程协调工具。每个工作线程处理完逻辑后阻塞等待,当所有线程(parties个数)都处于阻塞(await)状态的时候,执行特定的动作(barrierAction)。CyclicBarrier特别适合用于在工作线程继续执行之前更新共享资源。

使用方法

CyclicBarrier使用起来比较简单。它的构造函数如下:

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

其中parties的含义为越过barrier是需要有多少个线程调用await方法(进入阻塞状态)。
barrierAction的含义为越过barrier的时候需要执行的动作(比如更新共享资源)。

工作线程执行完业务逻辑需要等待的时候,调用await方法即可。

下面介绍下CyclicBarrier的源代码。

await方法

await方法作用是让工作线程进入等待状态。代码如下:

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

注意:

  1. 如果线程被中断,抛出InterruptedException
  2. 如果本线程在await阻塞的时候其他线程中断或等待超时,或者是CyclicBarrier被reset,或者是await状态的时候barrier被突破,或者是barrierAction执行时候抛出异常,这时候抛出BrokenBarrierException

还有另一个版本的await方法如下所示,它规定了线程处于等待状态的最长时间。

public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
            BrokenBarrierException,
            TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

注意:如果等待超时,抛出TimeoutException

CyclicBarrier内部有一个ReentrantLock,用于在操作CyclicBarrier内部变量的时候加锁,防止造成并发问题。除此之外还有一个Condition对象,用于协调所有线程的等待和唤醒操作。代码如下所示:

/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();

CyclicBarrier内部还有一个很重要的类Generation。一个Generation可以认为是CyclicBarrier的一个轮回,即从各个线程调用await方法到触发barrierAction整个过程。

遇到如下的情况之一,CyclicBarrier会进入新的Generation

  • 调用了重置(reset)方法。
  • 共有parties个线程进入了await状态,barrierAction成功执行完成。
  • barrier被突破的时候(等待线程被中断,barrierAction执行过程抛出异常,有线程等待超时,CyclicBarrierreset)。

接下来我们看看dowait方法。该方法有两个参数,分别为是否有等待时间限制和等待时间。

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    // 获取持有的可重入锁
    final ReentrantLock lock = this.lock;
    
    // 获得锁
    lock.lock();
    try {
        // 获取generation
        final Generation g = generation;
        // 如果barrier已经被突破
        if (g.broken)
            throw new BrokenBarrierException();

        // 如果线程被中断,突破屏障
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        // 需要wait的线程数量减1
        int index = --count;
        // 如果wait状态的线程数量足够,可以触发barrierCommand执行
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                // 运行barrierCommand
                final Runnable command = barrierCommand;
                if (command != null)
                    // 这里可以得出一个结论,barrierAction由最后一个调用await方法的线程执行
                    command.run();
                ranAction = true;
                // 进入下一代
                nextGeneration();
                return 0;
            } finally {
                // 如果执行barrierCommand出现错误,所有等待的线程直接突破屏障继续执行
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        // 如果--count不为0
        for (;;) {
            try {
                // 根据是否设定await时间调用condition对应的等待方法
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 如果线程被中断,没有进入下一代也没有breakBarrier
                if (g == generation && ! g.broken) {
                    // 所有等待线程突破屏障
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    // 中断当前线程
                    Thread.currentThread().interrupt();
                }
            }

            // 如果已调用过breakBarrier
            if (g.broken)
                throw new BrokenBarrierException();

            // 如果进入了新一代(调用reset方法或者是触发的barrierCommand执行完毕)
            if (g != generation)
                return index;

            // 如果是定时任务并且定时时间小于等于0,报超时异常
            // 或者说线程调用的是限时await方法,等待超时后也会抛出超时异常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 解锁
        lock.unlock();
    }
}

CyclicBarrier内部有一个变量叫做count,用来表示还有多少个线程到达await状态。在初始化的时候设置为parties。每一个线程调用了await方法,count会减1。一旦count为0的时候,说明有parties个线程都进入了await状态,开始执行barrierAction。执行完毕之后,唤醒所有await的线程。

breakBarrier突破屏障的方法代码如下:
注意:只有在持有锁的时候才可以调用。

private void breakBarrier() {
    // 设置generation已突破状态为true
    generation.broken = true;
    // 重置count数量为parties数
    count = parties;
    // 唤醒所有等待的线程
    trip.signalAll();
}

Generation

每次CyclicBarrier等待中线程数量到达parties数,触发barrierCommand,或者是调用了reset方法重置了CyclicBarrier,会进入一个新的generation。

Generation中只有一个状态量broken,用来表示是否调用了breakBarrier方法。

private static class Generation {
    boolean broken = false;
}

nextGeneration方法,用于控制CyclicBarrier进入新的generation。

在下面2种情形会进入到新的generation:

  • 足够数量的线程(parties个)进入await状态后触发barrierCommandbarrierCommand执行完毕,进入新的generation
  • 调用了reset方法
private void nextGeneration() {
    // signal completion of last generation
    // 唤醒所有等待的线程
    trip.signalAll();
    // set up next generation
    // 重置count为parties
    count = parties;
    // 生成新的generation并替换
    generation = new Generation();
}

reset方法

用于重置CyclicBarrier到初始状态。这时候如果有线程处于等待状态,会抛出BrokenBarrierException

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

和CountDownLatch的不同之处

  • 等待逻辑不同。CountDownLatch为工作线程不阻塞,需要操作共享资源的线程阻塞(await),工作线程调用countDownn次之后操作共享资源的线程脱离阻塞恢复执行。CyclicBarrier则相反,工作线程阻塞等待(调用await),没有专门的线程执行共享资源操作(barrierAction)。当进入await状态的线程达到预定数量(parties)的时候,谁最后一个进入await状态谁执行barrierAction,然后所有线程恢复执行。
  • CountDownLatch的内部状态(count)不可以重置,使用后必须重新创建。但是CyclicBarrier的状态是可以重置的。

本文为原创内容,欢迎大家讨论、批评指正与转载。转载时请注明出处。

相关文章

  • JUC原理之CyclicBarrier

    什么是CyclicBarrier CyclicBarrier是一个多线程协调工具。每个工作线程处理完逻辑后阻塞等待...

  • 并发编程之 Exchanger 源码分析

    前言 JUC 包中除了 CountDownLatch, CyclicBarrier, Semaphore, 还有一...

  • JUC之AQS—Cyclicbarrier

    导读:这篇文章介绍的是java并发组件aqs之CyclicBarrier Cyclicbarrier概念: Cyc...

  • JUC工具类实例

    描述 本文描述了JUC中CountDownLatch、CyclicBarrier、Semaphore、Exchan...

  • Exchanger

    JUC 包中除了 CountDownLatch, CyclicBarrier, Semaphore, 还有一个重要...

  • JUC锁框架_CyclicBarrier原理分析

    想想一下这样一个场景,有多个人需要过河,河上有一条船,船要等待满10个人才过河,过完河后每个人又各自行动。 这里的...

  • JUC源码分析之CyclicBarrier

    简介 一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)...

  • 并发编程之 Semaphore 源码分析

    前言 并发 JUC 包提供了很多工具类,比如之前说的 CountDownLatch,CyclicBarrier ,...

  • JUC-CyclicBarrier

    一.什么是CyclicBarrier?CyclicBarrier就是循环屏障二.CyclicBarrier的用途?...

  • JUC-CyclicBarrier

      CyclicBarrier:【可重复使用的栅栏】,让所有线程都等待完成后才会继续下一步行动。示例代码如下所示:...

网友评论

      本文标题:JUC原理之CyclicBarrier

      本文链接:https://www.haomeiwen.com/subject/iovdyhtx.html