美文网首页JAVA之多线程系列
CyclicBarrier源码分析

CyclicBarrier源码分析

作者: VayneP | 来源:发表于2020-02-24 23:52 被阅读0次

    1. 简介

    CyclicBarrier字面意思是可循环使用(cyclic)的屏障(barrier),它要做的事情是,让一组线程到达一个屏障时阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才会继续运行

    2. 源码分析

    2.1 类的属性

    public class CyclicBarrier {
        
        /** The lock for guarding barrier entry */
        // 可重入锁
        private final ReentrantLock lock = new ReentrantLock();
        /** Condition to wait on until tripped */
        // 条件队列
        private final Condition trip = lock.newCondition();
        /** The number of parties */
        // 参与的线程数量
        private final int parties;
        /* The command to run when tripped */
        // 由最后一个进入 barrier 的线程执行的操作
        private final Runnable barrierCommand;
        /** The current generation */
        // 当前代
        private Generation generation = new Generation();
        // 正在等待进入屏障的线程数量
        private int count;
    }
    

    说明:该属性有一个为ReentrantLock对象,有一个为Condition对象,而Condition对象又是基于AQS的,所以,归根到底,底层还是由AQS提供支持。

    2.2 内部类

    private static class Generation {
        boolean broken = false;
    }
    

    Generation表示代,例如第1代同步器,在reset() 或nextGeneration() 会更新成员变量 generation

    2.3 构造函数

    1. CyclicBarrier(int, Runnable)型构造函数
    public CyclicBarrier(int parties, Runnable barrierAction) {
            // 参与的线程数量小于等于0,抛出异常
            if (parties <= 0) throw new IllegalArgumentException();
            // 设置parties
            this.parties = parties;
            // 设置count
            this.count = parties;
            // 设置barrierCommand
            this.barrierCommand = barrierAction;
        }
    

    说明:该构造函数可以指定关联该CyclicBarrier的线程数量,并且可以指定在所有线程都进入屏障后的执行动作,该执行动作由最后一个进行屏障的线程执行。

    1. CyclicBarrier(int)型构造函数

      public CyclicBarrier(int parties) {
              // 调用含有两个参数的构造函数
              this(parties, null);
          }
      

    2.4 核心函数分析

    1. dowait()---CyclicBarrier类的核心函数,CyclicBarrier类对外提供的await函数在底层都是调用该了doawait函数

      private int dowait(boolean timed, long nanos)
              throws InterruptedException, BrokenBarrierException,
                     TimeoutException {
              // 保存当前锁
              final ReentrantLock lock = this.lock;
              // 锁定
              lock.lock();
              try {
                  // 保存当前代
                  final Generation g = generation;
                  
                  if (g.broken) // 屏障被破坏,抛出异常
                      throw new BrokenBarrierException();
      
                  if (Thread.interrupted()) { // 线程被中断
                      // 损坏当前屏障,并且唤醒所有的线程,只有拥有锁的时候才会调用
                      breakBarrier();
                      // 抛出异常
                      throw new InterruptedException();
                  }
                  
                  // 减少正在等待进入屏障的线程数量
                  int index = --count;
                  if (index == 0) {  // 正在等待进入屏障的线程数量为0,所有线程都已经进入
                      // 运行的动作标识
                      boolean ranAction = false;
                      try {
                          // 保存运行动作
                          final Runnable command = barrierCommand;
                          if (command != null) // 动作不为空
                              // 运行
                              command.run();
                          // 设置ranAction状态
                          ranAction = true;
                          // 进入下一代
                          nextGeneration();
                          return 0;
                      } finally {
                          if (!ranAction) // 没有运行的动作
                              // 损坏当前屏障
                              breakBarrier();
                      }
                  }
      
                  // loop until tripped, broken, interrupted, or timed out
                  // 无限循环
                  for (;;) {
                      try {
                          if (!timed) // 没有设置等待时间
                              // 等待
                              trip.await(); 
                          else if (nanos > 0L) // 设置了等待时间,并且等待时间大于0
                              // 等待指定时长
                              nanos = trip.awaitNanos(nanos);
                      } catch (InterruptedException ie) { 
                          if (g == generation && ! g.broken) { // 等于当前代并且屏障没有被损坏
                              // 损坏当前屏障
                              breakBarrier();
                              // 抛出异常
                              throw ie;
                          } else { // 不等于当前带后者是屏障被损坏
                              // We're about to finish waiting even if we had not
                              // been interrupted, so this interrupt is deemed to
                              // "belong" to subsequent execution.
                              // 中断当前线程
                              Thread.currentThread().interrupt();
                          }
                      }
      
                      if (g.broken) // 屏障被损坏,抛出异常
                          throw new BrokenBarrierException();
      
                      if (g != generation) // 不等于当前代
                          // 返回索引
                          return index;
      
                      if (timed && nanos <= 0L) { // 设置了等待时间,并且等待时间小于0
                          // 损坏屏障
                          breakBarrier();
                          // 抛出异常
                          throw new TimeoutException();
                      }
                  }
              } finally {
                  // 释放锁
                  lock.unlock();
              }
          }
      
    2. nextGeneration()

      此函数在所有线程进入屏障后会被调用,即生成下一个版本,所有线程又可以重新进入到屏障中

      private void nextGeneration() {
              // signal completion of last generation
              // 唤醒所有线程
              trip.signalAll();
              // set up next generation
              // 恢复正在等待进入屏障的线程数量
              count = parties;
              // 新生一代
              generation = new Generation();
          }
      
    3. breakBarrier()

      private void breakBarrier() {
              // 设置状态
              generation.broken = true;
              // 恢复正在等待进入屏障的线程数量
              count = parties;
              // 唤醒所有线程
              trip.signalAll();
          }
      

    3. CountDownLatch与CyclicBarrier的比较

    1. CountDownLatch() 的计数器只能用一次,而CyclicBarrier可以使用多次使用,可以使用reset() 方法重置
    2. CyclicBarrier支持在线程全部到达后执行一次任务,CountDownLatch只有计数器的效果

    相关文章

      网友评论

        本文标题:CyclicBarrier源码分析

        本文链接:https://www.haomeiwen.com/subject/jfcnqhtx.html