CountDownLatch

作者: RojerAlone | 来源:发表于2017-08-23 16:21 被阅读78次

    CountDownLatch

    CountDownLatch 直译过来为“倒数阀门”,顾名思义,它是起到一个阀门的作用,实际上是用来控制线程的执行过程。来看一下这个类提供了哪些方法:

    CountDownLatch-Outline

    从图中可以看出,CountDownLatch 的构造方法是传入一个整数,然后提供了 await()countDown()getCount() 等方法,还有一个内部类 Sync,这个内部类继承了 AbstractQueuedSynchronizer
    整个 CountDownLatch 的核心是 Sync 类,先来看这个类。

    Sync

    private static final class Sync extends AbstractQueuedSynchronizer { // 继承了 AQS 类
        private static final long serialVersionUID = 4982264981922014374L;
    
        Sync(int count) { // 构造函数,将 AQS 中的 state 设置为 count 的值
            setState(count);
        }
    
        int getCount() { // 获取 state 的值
            return getState();
        }
        /**
         * 重写 AQS 中的 tryAcquireShared
         * 尝试获取共享锁,如果当前 state 等于 0 返回 1 表示获取成功,否则返回 -1 表示失败
         */
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        /**
         * 重写 AQS 中的 tryReleaseShared
         */
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) { // 死循环,保证 cas 更改成功
                int c = getState();
                if (c == 0) // 如果当前 state 为 0,释放失败
                    return false;
                int nextc = c-1; // state - 1
                if (compareAndSetState(c, nextc)) // 更改最新的 state 值
                    return nextc == 0; // 如果 state - 1 后为 0 返回 true,否则返回 false
            }
        }
    }
    

    Sync 类重写了 AQS 中的 tryAcquireSharedtryReleaseShared 方法,tryAcquireShared 判断当前的 state 是否为 0,tryReleaseShared 中对 state 进行减一操作,如果减过之后 state 为 0 则返回 true。
    CountDownLatch 中的方法都是调用了 Sync 中的方法,接下来看一下 CountDownLatch 的实现。

    CountDownLatch(int)

    // 构造方法,初始化 sync
    public CountDownLatch(int count) {
        // 如果 count 小于 0 就抛出非法参数异常
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    

    await

    await 有两种,一种是没有时间限制的,一种是有时间限制的。

    /**
     * 调用 AQS 的 acquireSharedInterruptibly
     * 内部还是调用了 tryAcquireShared 方法,也就是判断当前 state 是否为 0
     * 如果不为 0 那么当前线程加入等待队列,被阻塞
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1); 
    }
    /**
     * 调用 AQS 的 tryAcquireSharedNanos
     * 在指定时间内没有获取锁,返回 false
     */
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    

    countDown

    /**
     * 调用 AQS 的 releaseShared
     * releaseShared 内部调用了 Sync 中重写的 tryReleaseShared 方法,将 state - 1
     * 如果 state - 1 后为 0 ,返回 true,那么就会唤醒等待的线程
     */
    public void countDown() {
        sync.releaseShared(1);
    }
    

    总结

    根据对 AbstractQueuedSynchronizer 的了解,加上看完上面的源码,我们已经清楚地了解了 CountDownLatch 的作用以及原理:

    • CountDownLatch 中的内部类 Sync 继承了 AQS,并重写了 tryAcquireSharedtryReleaseShared 方法
    • CountDownLatch 实现了对 AQS 共享锁相关方法的封装,相当于换了个直观的名字
    • 执行 await 方法判断 state,相当于计数器,是否为 0,如果不为 0,当前线程执行到这里以后阻塞,直到 state 为 0
    • 执行 countDown 方法对 state 进行减一操作,如果减一后为 0 ,那么唤醒等待的线程

    简单归纳一下就是,CountDownLatch 是阀门,初始化时设置的 count 表示容量,线程执行 await 方法等待阀门内没有东西(state == 0),如果阀门内有东西(state > 0),那么执行 await 的方法被阻塞,直到阀门内没有东西 (state == 0)才被唤醒。每当一个线程执行一次 countDown 方法,阀门内的东西就减少一个 (state = state - 1),如果减少一个以后阀门内没有东西了,就唤醒等待阀门为空的线程。
    更生动一点来说,打个比方,有一个公共澡堂,澡堂内有一定数量的淋浴头(count),这些淋浴头都已经有人在使用了,每当有人想要洗澡的时候,这些人都要等待所有的洗澡的人都已经洗完了(state == 0),因此他们要在外面等待(await),每当有一个人洗完了(countDown),都会看一下是不是所有人都洗完了,如果所有人都洗完了,那么给外面等待的人说,你们可以进来了,这时候等待的人才能进来(也就是线程被唤醒,继续执行)。

    应用 Demo

    import java.util.concurrent.CountDownLatch;
    
    public class CountDownLatchTest {
    
        private static final int numOfThreads = 10; // 线程数
    
        private static final int sleepTime = 3000; // 睡眠时间
    
        public static void main(String[] args) {
            CountDownLatch startLatch = new CountDownLatch(1); // 只有执行 start.countDown() 之后线程才开始执行
            CountDownLatch threadLatch = new CountDownLatch(numOfThreads);
    
            for (int i = 0; i < numOfThreads; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            startLatch.await(); // 等待 startLatch.countDown 才开始执行
                            Thread.sleep(sleepTime);
                            System.out.println(Thread.currentThread().getName());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        threadLatch.countDown(); // 完成的线程数 -1
                    }
                }).start();
            }
    
            startLatch.countDown(); // 所有线程开始执行
            long start = System.currentTimeMillis();
            try {
                threadLatch.await(); // 等待所有线程执行完毕
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(numOfThreads + " 个线程执行完花费时间为 : " + (System.currentTimeMillis() - start) + " ms");
        }
    
    }
    

    执行结果为:

    Thread-8
    Thread-9
    Thread-5
    Thread-3
    Thread-1
    Thread-0
    Thread-2
    Thread-4
    Thread-7
    Thread-6
    10 个线程执行完花费时间为 : 3003 ms
    

    相关文章

      网友评论

        本文标题:CountDownLatch

        本文链接:https://www.haomeiwen.com/subject/cvtjdxtx.html