美文网首页Java
JAVA concurrency -- CyclicBarrie

JAVA concurrency -- CyclicBarrie

作者: 骑牛上青山 | 来源:发表于2020-04-23 20:12 被阅读0次

    概述

    CountDownLatchCyclicBarrier有着相似之处,并且也常常有人将他们拿出来进行比较,这次,笔者试着从源码的角度分别解析这两个类,并且从源码的角度出发,看看两个类的不同之处。

    CountDownLatch

    CountDownLatch从字面上来看是一个计数工具类,实际上这个类是用来进行多线程计数的JAVA方法。

    CountDownLatch内部的实现主要是依靠AQS的共享模式。当一个线程把CountDownLatch初始化了一个count之后,其他的线程调用await就会阻塞住,直到其他的线程一个一个调用countDown方法进行release操作,把count的值减到0,即把同步锁释放掉,await才会进行下去。

    Sync

    内部主要还是实现了一个继承自AQS的同步器SyncSync源码如下:

        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            // 构造方法,参数是count的数值
            Sync(int count) {
                // 内部使用state来存储count
                setState(count);
            }
    
            // 获取count的值
            int getCount() {
                return getState();
            }
    
            // 尝试获取分享模式同步器
            protected int tryAcquireShared(int acquires) {
                // 判断state的值,如果为0则获取成功,否则获取失败
                // 继承自AQS,根据AQS中的注释我们可以知道如果返回结果
                // 大于0则说明获取成功,如果小于0则说明获取失败
                // 此处不会返回0,因为没有意义
                return (getState() == 0) ? 1 : -1;
            }
    
            // 释放同步器
            protected boolean tryReleaseShared(int releases) {
                // 自选操作
                for (;;) {
                    // 获取state
                    int c = getState();
                    // 如果state为0,直接返回false
                    if (c == 0)
                        return false;
                    // 计算state-1的结果
                    int nextc = c-1;
                    // CAS操作将这个值同步到state上
                    if (compareAndSetState(c, nextc))
                        // 如果同步成功,则判断是否此时state为0
                        return nextc == 0;
                }
            }
        }
    

    Sync是继承自AQS的同步器,这段代码中值得拿出来讨论的有以下几点:

    1. 为什么用state来存储count的数值?

      因为state和count其实上是一个概念,当state为0的时候说明资源是空闲的,当count为0时,说明所有的CountDownLatch线程都已经完成,所以两者虽然说不是同样的意义,但是在代码实现层面的表现是完全一致的,因此可以将count记录在state中。

    2. 为什么tryAcquireShared不会返回0?

      首先需要解释下tryAcquireShared在AQS中可能的返回值:负数说明是不可以获取共享锁,0说明是可以获取共享锁,但是当前线程获取后已经把所有的共享锁资源占完了,接下来的线程将不会再有多余资源可以获取了,正数则说明了你可以获取共享锁,并且之后还有余量可以给其他线程提供共享锁。然后我们回过来看CountDownLatch内部的tryAcquireShared,我们在实现上完全不关注后续线程,后续的资源占用状况,我只要当前状态,那么这个0的返回值实际上是没有必要的。

    3. 为什么tryReleaseShared中的参数不被使用到?

      根据这个类的实现方式,我们可以知道tryReleaseShared的参数一定是1,因为线程的完成一定是一个一个倒数完成的。实际上我们去看countDown方法内部调用到了sync.releaseShared方法的时候可以发现他写死了参数为1,所以实际上tryReleaseShared中的参数不被使用到的原因是因为参数值固定为1.

    构造函数和方法

        // 构造方法
        public CountDownLatch(int count) {
            // count必须大于0
            if (count < 0) throw new IllegalArgumentException("count < 0");
            // 初始化Sync
            this.sync = new Sync(count);
        }
    
    
        // 等待获取锁(可被打断)
        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        // 等待获取锁(延迟)
        public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
        // 计数器降低(释放同步器)
        // 每次调用减少1
        public void countDown() {
            sync.releaseShared(1);
        }
    
        // 获取count
        public long getCount() {
            return sync.getCount();
        }
    
        // toString
        public String toString() {
            return super.toString() + "[Count = " + sync.getCount() + "]";
        }
    

    CyclicBarrier

    CyclicBarrier从字面上看是循环栅栏,在JAVA中的作用是让所有的线程完成后进行等待,直到所有的线程全部完成,再进行接下来的操作。

    CyclicBarrier并没有直接继承AQS实现同步,而是借助了可重入锁ReentrantLock以及Condition来完成自己的内部逻辑。

    成员变量

        // 锁
        private final ReentrantLock lock = new ReentrantLock();
    
        // 条件
        private final Condition trip = lock.newCondition();
    
        // 线程数
        private final int parties;
    
        // 执行完所有线程后执行的Runnable方法,可以为空
        private final Runnable barrierCommand;
    
        // 分组
        private Generation generation = new Generation();
    
        // 未完成的线程数
        private int count;
    
        private static class Generation {
            boolean broken = false;
        }
    

    我们可以看到成员变量中有一个很陌生的类Generation,这个是CyclicBarrier内部声明的一个static类,作用是帮助区分线程的分组分代,使得CyclicBarrier可以被复用,如果这个简单的解释不能够让你很好地理解的话可以看接下来的源码解析,通过实现来理解它的用途。

    构造函数

        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        
    
        public CyclicBarrier(int parties) {
            this(parties, null);
        }
    

    很常规的构造函数,只是简单的初始化成员变量,没有特别的地方。

    核心方法

        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe);
            }
        }
    
        public int await(long timeout, TimeUnit unit)
            throws InterruptedException,
                   BrokenBarrierException,
                   TimeoutException {
            return dowait(true, unit.toNanos(timeout));
        }
    

    awaitCyclicBarrier的核心方法,就是靠着这个方法来实现线程的统一规划的,其中调用的是内部实现的doWait,我们来看下代码:

        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            // 常规的加锁操作,至于为什么要用本地变量操作,
            // 可以去看下我写的另一篇ArrayBlockingQueue的相关文章
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                // 获取Generation类
                final Generation g = generation;
    
                // 查看generation是否是broken,如果是broken的,
                // 那说明之前可能因为某些线程中断或者是一些意外状态导致没有办法
                // 完成所有线程到达终点(tripped)的目标而只能报错
                if (g.broken)
                    throw new BrokenBarrierException();
    
                // 如果线程被外部中断需要报错,并且在内部需要将
                // generation的broken置为true来让其他线程能够感知到中断
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
    
                // 将线程未完成数减1
                int index = --count;
                // 如果此时剩余线程数为0,则说明所有的线程均已完成,即到达tripped状态
                if (index == 0) {
                    boolean ranAction = false;
                    try {
                        // 如果有预设完成后执行的方法,则执行
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;
                        // 此时由于这一个轮回的线程已经全部完成,
                        // 所以调用nextGeneration方法开启一个新的轮回
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // 如果此时还有其他的线程未完成,则当前线程开启自旋模式
                for (;;) {
                    try {
                        if (!timed)
                            // 如果timed为false,trip则阻塞住直到被唤醒
                            trip.await();
                        else if (nanos > 0L)
                            // 如果timed为true,则调用awaitNanos设定时间
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    // 查看generation是否是broken,如果是broken的抛出异常
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    // 如果g != generation意味着generation
                    // 已经被赋予了一个新的对象,这说明要么是所有线程已经完成任务开启下一个轮回,
                    // 要么是已经失败了,然后开启的下一个轮回,无论是哪一种情况,都return
                    if (g != generation)
                        return index;
    
                    // 如果已经超时,则强制打断
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    

    看完这段核心代码之后我们回头再来反思Generation的意义,我们已经可以大致的给出使用Generation的理由了:

    不同于CountDownLatch的实现,CyclicBarrier采取了更加复杂的方式,原因便是因为内部涉及到了多线程之间的干预与通信,CountDownLatch不关心线程的实现与进程,他只是一个计数器,而CyclicBarrier则需要知道线程是否正常的完结,是否被中断,如果用其他的方式代价会比较高,因此,CyclicBarrier的作者通过静态内部类的方式将整个分代的状态共享于多个线程之间,保证每个线程能够获取到栅栏的状态以及能够将自身的状态更好的反馈回去。同时,这种方式便于重置,也使得CyclicBarrier可以高效的重用。至于为什么broken没有用volatile修饰,因为类的方法内部全部都上了锁,所以不会出现数据不同步的问题。

    总结

    CountDownLatchCyclicBarrier从使用上来说可能会有一些相似之处,但是在我们看完源码之后我们会发现两者可以说是天差地别,实现原理,实现方式,应用场景均不相同,总结下来有以下几点:

    1. CountDownLatch实现直接依托于AQSCyclicBarrier则是借助了ReentrantLock以及Condition
    2. CountDownLatch是作为计数器存在的,因此采取了讨巧的设计,源码结构清晰并且简单,同样功能也较为简单;CyclicBarrier则为了实现多线程的掌控,采用了比较复杂的设计,在代码实现上也显得比较弯弯绕绕。
    3. 由于CyclicBarrier采用的实现方式,相比一次性的CountDownLatchCyclicBarrier可以多次重复使用
    4. 计数方式的不同:CountDownLatch采用累加计数, CyclicBarrier则使用倒数计数

    相关文章

      网友评论

        本文标题:JAVA concurrency -- CyclicBarrie

        本文链接:https://www.haomeiwen.com/subject/zkdzihtx.html