CountDownLatch

作者: 码农历险记 | 来源:发表于2017-07-29 23:24 被阅读3964次

    CountDownLatch介绍

    CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。

    CountDownLatch原理

    CountDownLatch是通过一个计数器来实现的,计数器的初始化值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就相应得减1。当计数器到达0时,表示所有的线程都已完成任务,然后在闭锁上等待的线程就可以恢复执行任务。

    CountDownLatch原理示意图

    CountDownLatch的伪代码

    Main thread start
    Create CountDownLatch for N threads
    Create and start N threads
    Main thead wait on latch
    N threads completes there tasks are returns
    Main thread resume execution

    CountDownLatch.java中定义的构造函数

    //用等待的线程数量来进行初始化
    public void CountDownLatch(int count){...}
    

    计数器count是闭锁需要等待的线程数量,只能被设置一次,且CountDownLatch没有提供任何机制去重新设置计数器count。

    与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。

    其他N个线程必须引用CountDownLatch闭锁对象,因为它们需要通知CountDownLatch对象,它们各自完成了任务;这种通知机制是通过CountDownLatch.countDown()方法来完成的;每调用一次,count的值就减1,因此当N个线程都调用这个方法,count的值就等于0,然后主线程就可以通过await()方法,恢复执行自己的任务。

    在实时系统中的使用场景

    1. 实现最大的并行性:有时我们想同时启动多个线程,实现最大程度的并行性。例如,我们想测试一个单例类。如果我们创建一个初始计数器为1的CountDownLatch,并让其他所有线程都在这个锁上等待,只需要调用一次countDown()方法就可以让其他所有等待的线程同时恢复执行。
    2. 开始执行前等待N个线程完成各自任务:例如应用程序启动类要确保在处理用户请求前,所有N个外部系统都已经启动和运行了。
    3. 死锁检测:一个非常方便的使用场景是你用N个线程去访问共享资源,在每个测试阶段线程数量不同,并尝试产生死锁。

    CountDownLatch使用例子

    模拟一个应用程序启动类,开始就启动N个线程,去检查N个外部服务是否正常并通知闭锁;启动类一直在闭锁上等待,一旦验证和检查了所有外部服务,就恢复启动类执行。

    BaseHealthChecker.java :这个类是实现了Runnable接口,负责所有特定的外部服务健康检查的基类。

    import java.util.concurrent.CountDownLatch;
    
    public abstract class BaseHealthChecker implements Runnable {
        
        private CountDownLatch _latch;
        private String _serviceName;
        private boolean _serviceUp;
        
        public BaseHealthChecker(String serviceName, CountDownLatch latch)
        {
            super();
            this._latch = latch;
            this._serviceName = serviceName;
            this._serviceUp = false;
        }
    
        @Override
        public void run() {
            try {
                verifyService();
                _serviceUp = true;
            } catch (Throwable t) {
                t.printStackTrace(System.err);
                _serviceUp = false;
            } finally {
                if(_latch != null) {
                    _latch.countDown();
                }
            }
        }
    
        public String getServiceName() {
            return _serviceName;
        }
    
        public boolean isServiceUp() {
            return _serviceUp;
        }
        
        public abstract void verifyService();
    }
    

    NetworkHealthChecker.java,DatabaseHealthChecker.java和CacheHealthChecker.java都继承自BaseHealthChecker,引用CountDownLatch实例,除了服务名和休眠时间不同外,都实现各自的verifyService方法。

    NetworkHealthChecker.java类

    import java.util.concurrent.CountDownLatch;
    
    public class NetworkHealthChecker extends BaseHealthChecker
    {
        public NetworkHealthChecker (CountDownLatch latch)
        {
            super("Network Service", latch);
        }
        
        @Override
        public void verifyService() 
        {
            System.out.println("Checking " + this.getServiceName());
            try 
            {
                Thread.sleep(7000);
            } 
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            System.out.println(this.getServiceName() + " is UP");
        }
    }
    

    DatabaseHealthChecker.java类

    import java.util.concurrent.CountDownLatch;
    
    public class DatabaseHealthChecker extends BaseHealthChecker
    {
        public DatabaseHealthChecker (CountDownLatch latch)
        {
            super("Database Service", latch);
        }
        
        @Override
        public void verifyService() 
        {
            System.out.println("Checking " + this.getServiceName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(this.getServiceName() + " is UP");
        }
    }
    

    CacheHealthChecker.java类

    import java.util.concurrent.CountDownLatch;
    
    public class CacheHealthChecker extends BaseHealthChecker
    {
        public CacheHealthChecker (CountDownLatch latch)
        {
            super("Cache Service", latch);
        }
        
        @Override
        public void verifyService() 
        {
            System.out.println("Checking " + this.getServiceName());
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(this.getServiceName() + " is UP");
        }
    }
    

    ApplicationStartupUtil.java:是一个主启动类,它负责初始化闭锁,然后等待所有服务都被检查完成,再恢复执行。

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    
    public class ApplicationStartupUtil 
    {
        private static List<BaseHealthChecker> _services;
        private static CountDownLatch _latch;
        
        private ApplicationStartupUtil()
        {
        }
        
        private final static ApplicationStartupUtil INSTANCE = new ApplicationStartupUtil();
        
        public static ApplicationStartupUtil getInstance()
        {
            return INSTANCE;
        }
        
        public static boolean checkExternalServices() throws Exception
        {
            _latch = new CountDownLatch(3);
            _services = new ArrayList<BaseHealthChecker>();
            _services.add(new NetworkHealthChecker(_latch));
            _services.add(new CacheHealthChecker(_latch));
            _services.add(new DatabaseHealthChecker(_latch));
            
            Executor executor = Executors.newFixedThreadPool(_services.size());
            
            for(final BaseHealthChecker v : _services) 
            {
                executor.execute(v);
            }
            
            _latch.await();
            
            for(final BaseHealthChecker v : _services) 
            {
                if( ! v.isServiceUp())
                {
                    return false;
                }
            }
            return true;
        }
    }
    

    测试代码检测闭锁功能:

    public class Main {
        public static void main(String[] args) 
        {
            boolean result = false;
            try {
                result = ApplicationStartupUtil.checkExternalServices();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("External services validation completed !! Result was :: "+ result);
        }
    }
    

    执行结果:

    Checking Network Service
    Checking Cache Service
    Checking Database Service
    Database Service is UP
    Cache Service is UP
    Network Service is UP
    External services validation completed !! Result was :: true

    工作中的使用(优雅地完成初始化)

    在移动应用开发中(以Android为例),随着功能的增多,应用初始化工作开始增多,网络,账号,推送服务,预加载数据等依次登场,开发人员都会临时在Application中找到现有初始化逻辑,将自己的代码插在其中。随着版本的迭代,新老员工的交替,几乎没人能对应用的初始化过程完全了解,删除一行初始化代码甚至移动位置都可能造成严重的后果。

    应用初始化过程极其重要,它是应用后续平稳运行的前提和保证。开发初始化配置模块(公司内部开源不宜公开),更好地管理初始化逻辑,对初始化地工作进行分层,分优先级,多线程地规划,进而在大幅提升初始化效率,同时还有完整地日志监控体系功能。有了它,规划整个初始化工作将简单而优雅

    参考

    1. 什么时候使用CountDownLatch
    2. Java concurrency – CountDownLatch Example

    相关文章

      网友评论

      • 8b4bbe5729eb:写的非常棒,有种让人身临其境的感觉。

      本文标题:CountDownLatch

      本文链接:https://www.haomeiwen.com/subject/sswclxtx.html