美文网首页程序员小天地
Java中的CountDownLatch

Java中的CountDownLatch

作者: 小草莓子桑 | 来源:发表于2019-11-17 23:45 被阅读0次

    今天给大家介绍一个并发包中的线程工具CountDownLatch,有的人把它叫做闭锁,有的人把它叫做计数锁,我们先从他使用场景来说吧。

    怎么让三个线程按顺序执行?

    今天看到这么一篇文章,就从这个标题,引出主题吧。

    第一时间想到了使用join
    join方法

    join是线程Thread类的方法,如果在T1线程中,使用T2线程调用了join方法,会阻塞T1线程执行,等T2线程执行完毕后,T1线程在执行。那么讲道理说,定义三个线程对象T1、T2、T3,在三个线程里面,分别调用别的线程对象的join方法,那么就可以实现三个线程按顺序执行。

    举个join栗子

    线上代码

    public class Demo1 {
    
        public static void main(String[] args) {
            //定义T1线程
            Thread t1 = new Thread(new JoinRunner(null),"T1");
            //定义T2线程,T1线程执行完了,在执行线程T2
            Thread t2 = new Thread(new JoinRunner(t1),"T2");
            //定义T3线程,T2线程执行完了。在执行线程T3
            Thread t3 = new Thread(new JoinRunner(t2),"T3");
            t1.start();
            t2.start();
            t3.start();
        }
    
        static class JoinRunner implements Runnable {
    
            /**
             * 先要执行的线程
             */
            private Thread before;
    
            public JoinRunner(Thread before) {
                this.before = before;
            }
    
            @Override
            public void run() {
                if (before != null) {
                    try {
                        //使用join方法,先排在前面的线程先执行完了,
                        // 在执行该线程
                        before.join();
                        System.out.println("目前执行的线程是:"
                                + Thread.currentThread().getId() + "====="
                                + Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    System.out.println("目前执行的线程是:"
                            + Thread.currentThread().getId() + "====="
                            + Thread.currentThread().getName());            }
            }
        }
    
    }
    

    看看执行结果,是不是按照T1、T2、T3顺序执行的。


    代码执行结果

    刚讲了join实现线程顺序执行的功能,其实用CountDownLatch也可以实现

    CountDownLatch简介

    CountDownLatch有的人把它叫做闭锁,有的人把它叫做计数锁,它能够使一个线程等待其他线程执行完毕以后再执行。CountDownLatch是基于AbstractQueuedSynchronizer实现的,大致原理是,定义一个计数器state。在调用构造方法创建CountDownLatch对象时,通过入参初始化state,然后调用countDown方法,state会减1,当调用次数让state变为0,在唤醒被挂起在await上的线程。我是喜欢叫做计数锁,当state大于0时,说明没有获取到锁,当state变成0时,获取到锁,我是这么理解的。

    举个CountDownLatch栗子
    public class Demo2 {
    
        public static void main(String[] args) {
    
            //计数锁C1 state 为1
            CountDownLatch c1 = new CountDownLatch(1);
            //计数锁C2 state 为1
            CountDownLatch c2 = new CountDownLatch(1);
            //计数锁C3 state 为1
            CountDownLatch c3 = new CountDownLatch(1);
    
            //T1线程中 C2 调用await挂起
            //然后让C1 调用countDown减一
            Thread t1 = new Thread(new CountDownLatchRunner(c2, c1),"T1");
    
            //T2线程中 C3 调用await挂起
            //然后让C2 调用countDown减一
            Thread t2 = new Thread(new CountDownLatchRunner(c3, c2),"T2");
    
            //T3线程中 C3 调用await挂起
            //然后让C3 调用countDown减一
            Thread t3 = new Thread(new CountDownLatchRunner(c3, c3),"T3");
    
            //目前这三个线程中的定义中
            // T1线程的计数锁C1的state变成0,
            // 才能执行 被 C2挂起的线程T2
            // T2线程的计数锁 C2的state变成0,
            // 才能执行线程T3
            t1.start();
            t2.start();
            t3.start();
            
            //先线程T1的计数锁state变成0,开始顺序执行
            c1.countDown();
        }
    
        static class CountDownLatchRunner implements Runnable {
    
            /**
             * 定义计数器
             */
            CountDownLatch countDownLatch1;
            CountDownLatch countDownLatch2;
    
            public CountDownLatchRunner(CountDownLatch c1,CountDownLatch c2) {
                countDownLatch1 = c1;
                countDownLatch2 = c2;
            }
    
            @Override
            public void run() {
                try {
                    //先挂起排在后面的线程
                    countDownLatch2.await();
                    System.out.println("目前执行的线程是:"
                            + Thread.currentThread().getId() + "====="
                            + Thread.currentThread().getName());
                    //让本线程计数锁 state减1
                    countDownLatch1.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
        }
    }
    

    看看执行结果,是不是按照T1、T2、T3顺序执行的。


    执行结果
    CountDownLatch关键方法

    看到了CountDownLatch使用方法,我们来看看使用中的关键方法

    构造方法
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    

    从构造方法上,创建了一个Sync对象,来看看Sync对象。

    Sync类
    /**
         * Synchronization control For CountDownLatch.
         * Uses AQS state to represent count.
         */
        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    

    Sync是继承了AbstractQueuedSynchronizer的,有时间在来讲AbstractQueuedSynchronizer。

    await方式
    public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    

    await方法的作用是,挂起线程,看到他其实是调用了Sync的acquireSharedInterruptibly方法,简单的说,该方法的主要作用是让获取不到锁的线程挂起。详细的分析,我们放到AbstractQueuedSynchronizer的文章中专门说。

    countDown方式
    public void countDown() {
            sync.releaseShared(1);
        }
    

    countDown方法的作用是state减一,也是调用了Sync的releaseShared方法实现。

    public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    

    看代码大致可以看出来,tryReleaseShared是进行state减一,如果减到0了,就返回true,然后,在调用doReleaseShared方法来唤醒挂起在await的线程
    tryReleaseShared还是个抽象方法

    tryReleaseShared方法
    来看一眼CountDownLatch中的实现
    protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
    

    看来和字面意思差不多,getState()是返回成员变量state的值,看下state

    /**
         * The synchronization state.
         */
        private volatile int state;
    

    是一个用volatile修饰的int字段。
    再来看doReleaseShared方法

    private void doReleaseShared() {
            /*
             * Ensure that a release propagates, even if there are other
             * in-progress acquires/releases.  This proceeds in the usual
             * way of trying to unparkSuccessor of head if it needs
             * signal. But if it does not, status is set to PROPAGATE to
             * ensure that upon release, propagation continues.
             * Additionally, we must loop in case a new node is added
             * while we are doing this. Also, unlike other uses of
             * unparkSuccessor, we need to know if CAS to reset status
             * fails, if so rechecking.
             */
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    

    稍有点复杂,doReleaseShared是在AbstractQueuedSynchronizer中实现的,我们放到AbstractQueuedSynchronizer的文章中专门说,大家大致明白他的作用,就是唤醒挂起的线程。
    关键方法就为大家分析到这里。

    CountDownLatch的使用场景

    CountDownLatch是一种同步结构,和我们之前讲过的ReentrantLock很像,他们都是被用来解决线程之间的调度、交互问题的,列一些自己用过的场景和别人分享的场景给大家吧。

    • 微服务架构中,一个接口大致是这样的,请求了一个实体E的列表数据,然后E的字段A、字段B要调用不同的服务中获取,可以使用CountDownLatch来通过两个线程去获取字段A、字段B,最后组装返回。
    • 使用CountDownLatch模拟高并发场景。
    • 使用CountDownLatch进行多线程运算组装。
    • 配置加载,一些服务中,我们经常要在初始化加载一些配置,只有这些配置加载完了,才能启动对外提供服务,经常发生一些配置没有加载完毕,请求就已经打进来了,可以使用CountDownLatch来进行控制。
    • 缓存预热,和上面配置一样的原理,可能需要实现加载很多缓存,才能对外提供服务。

    CountDownLatch就为大家讲到这,源码涉及到AbstractQueuedSynchronizer的,我们留到以后,专门写一个AbstractQueuedSynchronizer的文章,来统一说吧。欢迎大家来交流,指出文中一些说错的地方,让我加深认识,愿大家没有bug,谢谢!

    相关文章

      网友评论

        本文标题:Java中的CountDownLatch

        本文链接:https://www.haomeiwen.com/subject/nkqsxqtx.html