美文网首页
多线程基础(二十):CyclicBarrier源码分析

多线程基础(二十):CyclicBarrier源码分析

作者: 冬天里的懒喵 | 来源:发表于2020-11-16 15:06 被阅读0次

    CyclicBarrier是java并发包中的常用工具之一,通常被用来与CountDownlatch对比。CyclicBarrier能够实现CountDownLatch的效果,此外还能重复使用,而CountDownLatch则只能做一次计数。但是对于实现的源码而言,CyclicBarrier对于CountDowLatch有着本质的不同。CountDownLatch基于AQS的等待队列实现。而CyclicBarrier则依赖于ReentrantLock的Condition。我们前面在介绍AQS的时候知道,Condition与AQS本身的等待队列是采用的不同的队列。

    1.类注释

    CyclicBarrier是一种同步工具,它允许一组线程全部互相等待以到达一个公共的障碍点。CyclicBarrier在固定线程数量的程序中很有用。该线程有时会必须互相等待,该屏障称为cyclic,因为其可以在释放等待线程之后重新使用。
    CyclicBarrier支持可选的Runnable命令,该命令在屏障的最后一个线程到达之后,在释放任何线程之前,每个屏障点操作一次,屏障操作对于在任何一方继续之前更新共享状态很有用。
    示例如下:

    class Solver {
        final int N;
        final float[][] data;
        final CyclicBarrier barrier;
     
        class Worker implements Runnable {
          int myRow;
          Worker(int row) { myRow = row; }
          public void run() {
            while (!done()) {
              processRow(myRow);
     
              try {
                barrier.await();
              } catch (InterruptedException ex) {
                return;
              } catch (BrokenBarrierException ex) {
                return;
              }
            }
          }
        }
     
        public Solver(float[][] matrix) {
          data = matrix;
          N = matrix.length;
          Runnable barrierAction =
            new Runnable() { public void run() { mergeRows(...); }};
          barrier = new CyclicBarrier(N, barrierAction);
     
          List<Thread> threads = new ArrayList<Thread>(N);
          for (int i = 0; i < N; i++) {
            Thread thread = new Thread(new Worker(i));
            threads.add(thread);
            thread.start();
          }
     
          // wait until done
          for (Thread thread : threads)
            thread.join();
        }
      }}
    

    在这个代码中,每个工作线程处理将会处理矩阵的一行,然后在内存的屏障处等待,直到所有的行都处理完毕,处理所有的行之后,将执行Runnable屏障操作并将其合并,如果合并确定已找到解决方案,则执行done(),并将返回true。并且每个worker将终止。
    如果屏障操作不依赖于执行时暂停的各方,则该方中的任何线程都可以在释放该操作时执行该操作。为方便起见,每次调用await都会返回该线程在屏障处的到达索引。然后,您可以选择哪个线程应执行屏障操作,例如:

      if (barrier.await() == 0) {
        // log the completion of this iteration
      }}
    

    CyclicBarrier对于失败的同步尝试使用全无损模型,如果线程由于中断,失败,或者超时而过早离开屏障点,则在该屏障点等待的其他所有线程也将通过异常离开,BrokenBarrierException或者InterruptedException。(如果它们几乎同时被中断)

    2.类结构及构造函数

    CyclicBarrier的类结构如下:


    image.png

    在CyclicBarrier内部,存在一个Generation内部类。

     private static class Generation {
            boolean broken = false;
        }
    

    这个类的结构非常简单。但是在CyclicBarrier中作用非常关键,在Barrier的每次使用的时候,都会生成一个实例,每单CyclicBarrier被触发或者重置的时候,生成都会更改,这使得Barrier可以与线程关联产生很多Generation。由于这种不确定性,可以将锁分配给等待的线程,但是一次只能激活其中之一,其余的需要全部等待。如果有中断但是没有后续的重置,则不需要可用的Generation。
    提供的构造函数如下:

    2.1 CyclicBarrier(int parties, Runnable barrierAction)

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    

    2.2 CyclicBarrier(int parties)

    此方法实际上是调用的前面的方法。无command处理。

    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    

    3.成员变量及常量

    变量及常量 类型 说明
    lock private final ReentrantLock 用于保护屏障的锁
    trip private final Condition 等待条件,直到条件满足。
    parties private final int 参与方数量。
    barrierCommand private final Runnable 条件达到之后需要允许的线程。
    generation private Generation generation 当前的Generation
    count private int 仍在等待的参与方数量。按照参与方数量不断倒数,直到为0,则产生一个新的Generation。

    实际上通过变量表可以看出,Generation是一代的意思,那么也就是说,每次产生一个Generation,当这个Generation对应的count为0之后,再产生一个新的Genrtation。

    4.重要方法

    4.1 nextGeneration

    此方法将更新barrier的状态,并将等待的线程都唤醒。但是调用这个方法需要持有锁。

    private void nextGeneration() {
        // signal completion of last generation
        //调用trip的signalAll方法
        trip.signalAll();
        // set up next generation
        //重置count,count随着到达的parties而减少
        count = parties;
        //产生新的gereration
        generation = new Generation();
    }
    

    4.2 breakBarrier

    设置当前的barrier的broken状态为true,并唤醒所有人。仅在锁定时调用。

    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
    

    4.3 dowait

    主要的屏障代码,涵盖了各种策略。

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        //持有的锁
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            //g为当前的generation
            final Generation g = generation;
            //如果g的briken为true,则屏障失效,并抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
            //如果线程被重点,则break屏障,且抛出异常
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            //index为count减去1,index从0开始
            int index = --count;
            //如果index为0 说明已经截止,需要重新产生
            if (index == 0) {  // tripped
                //如果ranAction为false
                boolean ranAction = false;
                try {
                    //定义command 
                    final Runnable command = barrierCommand;
                    //如果command不为空,则执行run
                    if (command != null)
                        command.run();
                    //之后将runActrion修改为true
                    ranAction = true;
                    //产生下一个Generation
                    nextGeneration();
                    //返回0
                    return 0;
                } finally {
                    //如果ranAtion不为true,则强行执行berak
                    if (!ranAction)
                        breakBarrier();
                }
            }
    
            // loop until tripped, broken, interrupted, or timed out
            //循环,直到条件被触发
            //死循环
            for (;;) {
                try {
                    //如果传入没有等待时间,则await无限期等待
                    if (!timed)
                        trip.await();
                    //反之根据传入的nanos判断
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                //异常处理
                } catch (InterruptedException ie) {
                    //如果g还是原有的generation,且没有被broken,则抛出异常
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    //反之
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        //执行中断 Thread.currentThread().interrupt();
                    }
                }
                //中断抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();
                 //g没有更新 则返回index
                if (g != generation)
                    return index;
                 //如果timed且nanos小于0 
                if (timed && nanos <= 0L) {
                    //将barrier berak
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            //解锁
            lock.unlock();
        }
    }
    
    

    4.4 await

    调用dowait方法,不带时间,无限期等待。

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    

    4.5 await(long timeout, TimeUnit unit)

    携带等待时间,调用dowait方法。

    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
    

    4.6 isBroken

    判断当前barrier是否被broken

    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            //返回状态
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }
    

    4.7 reset

    重置。

    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        //加锁,break 当前barrier
        try {
            breakBarrier();   // break the current generation
            //产生下一个Generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
    

    4.8 getNumberWaiting

    public int getNumberWaiting() {
        //加锁 
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //用parties减去count得到当前的waiting
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
    

    5.总结

    本文对CyclicBarrier的源码进行了分析,可以看到,CyclicBarrier是在AQS之上,基于ReentrantLock和Condition的综合应用。其核心是,每次计数,都产生一个Generate。之后根据参与者个数,计算阻塞的数量,当阻塞的线程达到参与者数量之后,就唤醒全部等待线程,然后产生一个新的Generate。而Generate内部只有一个boolean的变量,可以通过修改这个boolean的状态将Generate break。

    相关文章

      网友评论

          本文标题:多线程基础(二十):CyclicBarrier源码分析

          本文链接:https://www.haomeiwen.com/subject/odfzbktx.html