美文网首页
Java concurrent包并发工具——CountDownL

Java concurrent包并发工具——CountDownL

作者: m2fox | 来源:发表于2018-03-22 07:58 被阅读0次

参考:http://blog.csdn.net/qq_30739519/article/details/51350527

很多系统在启动运行之前都会有一个称为"健康检查"的阶段,比如会去检查数据库、网络、虚拟机等基础组件服务的状态是否正常,当确认这些服务都正常之后,才会继续做其他的启动工作(否则地基不牢,后面的事情做了也是白搭)。这种场景简而言之就是,后面的任务对于前面的一个或多个任务有前后依赖关系,只有当前面的任务完成了,后面的任务才能开始,而这样的场景正好可以由CountDownLatch这样一个并发辅助工具来完成。

CountDownLatch,按字面意思理解,就是:计数闩(shuan),这个就是门闩的意思,可以理解为一个开关,通过一个计数开关来控制并发任务的执行。该类位于java.util.concurrent.CountDownLatch,从JDK 1.5版本引入。

本文就以一个简单的健康检查场景为例子,来理一理CountDownLatch这样一个并发工具的用法。

场景简要描述

假设有一个系统,在系统启动之前需要先检查数据库、网络、虚拟机三个组件服务是否正常,如果都正常,才继续启动;否则启动失败。

可以用一个图示来表示:


创建一个健康检查任务基类

每个健康检查任务都是跑在子线程中的,所以要实现Runnable接口:

package com.countdownlatch;

import java.util.concurrent.CountDownLatch;

/**
 * 系统健康检查任务基类
 * 
 * @author Administrator
 *
 */
public abstract class BaseHealthCheckTask implements Runnable {

    // 同步计数闩对象
    private CountDownLatch latch;

    // 要进行健康检查的服务名称
    protected String serviceName;

    // 健康检查是否完成
    protected boolean checkOk;

    public BaseHealthCheckTask(CountDownLatch latch, String serviceName) {
        this.latch = latch;
        this.serviceName = serviceName;
        // checkOk初始化为false,等正常完成健康检查操作之后赋值true
        this.checkOk = false;
    }

    public String getServiceName() {
        return serviceName;
    }

    public boolean isCheckOk() {
        return checkOk;
    }

    @Override
    public void run() {
        try {
            // 执行具体的健康检查工作
            doCheck();
            // 检查成功,给检查成功标志对象赋值true
            this.checkOk = true;
        } catch (Exception e) {
            // 如果执行健康检查任务过程中发生异常,不再给checkOk赋值true,表示健康检查失败
            e.printStackTrace();
        }

        // 健康检查完成之后(不管成功还是失败),将同步计数闩的计数值减1
        this.latch.countDown();
    }

    /**
     * 需要由子类实现的抽象方法,用于做具体的健康检查工作
     */
    protected abstract void doCheck() throws Exception;
}

创建不同服务的健康检查任务子类

  • DBHealthCheckTask
    假设数据库健康检查任务需要耗时3秒完成:
package com.countdownlatch;

import java.util.concurrent.CountDownLatch;

/**
 * 数据库服务健康检查任务类
 * 
 * @author Administrator
 *
 */
public class DBHealthCheckTask extends BaseHealthCheckTask {

    public DBHealthCheckTask(CountDownLatch latch) {
        super(latch, "DBService");
    }

    @Override
    public void doCheck() throws InterruptedException {
        System.out
                .println("start to check: " + this.serviceName + " health...");
        // sleep 3秒,模拟执行数据库健康检查任务
        Thread.sleep(3000);
        System.out.println("finish to check: " + this.serviceName + " health!");
    }

}

  • NetworkHealthCheckTask
    假设网络健康检查任务需要耗时5秒完成:
package com.countdownlatch;

import java.util.concurrent.CountDownLatch;

/**
 * 网络服务健康检查任务类
 * 
 * @author Administrator
 *
 */
public class NetworkHealthCheckTask extends BaseHealthCheckTask {

    public NetworkHealthCheckTask(CountDownLatch latch) {
        super(latch, "NetworkService");
    }

    @Override
    public void doCheck() throws InterruptedException {
        System.out
                .println("start to check: " + this.serviceName + " health...");
        // sleep 5秒,模拟执行网络健康检查任务
        Thread.sleep(5000);
        System.out.println("finish to check: " + this.serviceName + " health!");
    }

}

  • VmHealthCheckTask
    假设虚拟机健康检查任务需要耗时4秒完成:
package com.countdownlatch;

import java.util.concurrent.CountDownLatch;

/**
 * 虚拟机服务健康检查任务类
 * 
 * @author Administrator
 *
 */
public class VmHealthCheckTask extends BaseHealthCheckTask {

    public VmHealthCheckTask(CountDownLatch latch) {
        super(latch, "VmService");
    }

    @Override
    public void doCheck() throws InterruptedException {
        System.out
                .println("start to check: " + this.serviceName + " health...");
        // sleep 4秒,模拟执行虚拟机健康检查任务
        Thread.sleep(4000);
        System.out.println("finish to check: " + this.serviceName + " health!");
    }

}

创建一个系统启动器类

用单例模式创建一个系统启动器,在启动器主线程中,先启动并等待3个健康检查任务的完成,才能接着进行其他启动工作。如果有健康检查任务的检查结果是失败的,那么终止启动主线程。

package com.countdownlatch;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 系统启动器
 * 
 * @author Administrator
 *
 */
public class SystemStarter {
    // 单例化,因为系统启动器对象全局只能有一个
    private static SystemStarter INSTANCE = new SystemStarter();

    // 同步计数闩对象
    private CountDownLatch latch;

    // 设置默认的同步计数闩所允许的线程数为3
    private static final int LATCH_COUNT = 3;

    // 默认的超时等待描述
    private static final int DEFAULT_LATCH_TIMEOUT_SECOND = 60;

    private SystemStarter() {
        // 初始化启动器的同步闩对象
        this.latch = new CountDownLatch(LATCH_COUNT);
    }

    public static SystemStarter getInstance() {
        return INSTANCE;
    }

    public void startUp() throws InterruptedException {
        // 用于函数计时
        long start = System.currentTimeMillis();

        // 执行器对象,用于执行3个健康检查线程任务
        Executor executor = Executors.newFixedThreadPool(LATCH_COUNT);
        List<BaseHealthCheckTask> tasks = new ArrayList<BaseHealthCheckTask>();
        // 使用同步计数闩对象初始化3个健康检查任务对象:数据库、网络和虚拟机健康检查
        tasks.add(new DBHealthCheckTask(this.latch));
        tasks.add(new NetworkHealthCheckTask(this.latch));
        tasks.add(new VmHealthCheckTask(this.latch));

        // 并发执行健康检查任务
        for (BaseHealthCheckTask task : tasks) {
            executor.execute(task);
        }

        // 同步计数闩阻塞主线程进行等待,直到上面的3个健康检查任务全部执行完成,或超过默认超时时间才继续往下执行主线程
        this.latch.await(DEFAULT_LATCH_TIMEOUT_SECOND, TimeUnit.SECONDS);

        System.out.println("check system health FINISH!");

        // 输出每个健康检查任务的检查结果(是否检查通过)
        for (BaseHealthCheckTask task : tasks) {
            System.out.println("health check result (is passed): "
                    + task.getServiceName() + " - " + task.isCheckOk());
            // 一旦有一个任务失败,显示系统启动失败,退出启动器
            if (!task.isCheckOk()) {
                System.out.println("start up the system FAILED! "
                        + task.getServiceName() + " is NOT OK!");
                return;
            }
        }

        System.out.println("-----");

        // 在全部健康检查完成后,才开始执行启动过程中的其他任务
        doOtherStartupWork();

        System.out.println("-----");

        long end = System.currentTimeMillis();
        System.out.println("start up the system FINISH! totally spent "
                + (end - start) + "ms.");
    }

    /**
     * 其他任务,依赖于所有健康检查完成之后,才能执行这些任务
     * 
     * @throws InterruptedException
     */
    private void doOtherStartupWork() throws InterruptedException {
        System.out.println("do some other works...");
        Thread.sleep(1500);
    }
}

在main方法中调用启动器启动系统

package com.countdownlatch;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        SystemStarter startup = SystemStarter.getInstance();
        startup.startUp();
    }
}

执行结果:

start to check: DBService health...
start to check: VmService health...
start to check: NetworkService health...
finish to check: DBService health!
finish to check: VmService health!
finish to check: NetworkService health!
check system health FINISH!
health check result (is passed): DBService - true
health check result (is passed): NetworkService - true
health check result (is passed): VmService - true
-----
do some other works...
-----
start up the system FINISH! totally spent 6520ms.

可见启动器最终耗时为3个健康检查任务中最长耗时的任务的耗时(网络健康检查的5秒)+做其他启动工作的耗时(1.5秒),即6.5秒。

附:CountDownLatch类方法梳理

CountDownLatch类位于java.util.concurrent.CountDownLatch,是JDK并发工具包concurrent中的一个辅助工具类。

  • 查看CountDownLatch类的源码,可以看到有1个成员变量sync、1个带参构造方法和5个方法。

  • 该类的核心功能由一个Sync类型的成员变量sync实现,Sync是一个静态内部类,继承自AbstractQueuedSynchronizer类。

  • await()方法会让调用该方法的当前线程阻塞等待,直到latch的计数值变为0,或者有线程抛出中断异常。下面是方法的文档说明:

Causes the current thread to wait until the latch has counted down to zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
  • await(long timeout, TimeUnit unit)方法是和await()方法类似的一个方法,只不过可以设置一个阻塞等待的超时时间,当超过该超时时间后,会结束阻塞等待(不管子线程有没有执行完),接着往下执行。下面是方法的文档说明:
Causes the current thread to wait until the latch has counted down to zero, unless the thread is {@linkplain Thread#interrupt interrupted}, or the specified waiting time elapses.
  • countDown()方法由子线程调用,会对latch的计数值减1,直到减到0为止。下面是方法的文档说明:
Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
  • getCount()方法用于获取当前latch的计数值,下面是方法的文档说明:
Returns the current count.
This method is typically used for debugging and testing purposes.
  • toString()方法会打印出当前latch的计数值:
Returns a string identifying this latch, as well as its state.
The state, in brackets, includes the String {@code "Count ="}
followed by the current count.
  • 最后看一看该类的作者:


Doug Lea正是java.util.concurrent包的作者,是一位对Java影响力深远的人,可以看一看百度百科的介绍:Doug Lea

本文源码地址

我的GitHub

相关文章

网友评论

      本文标题:Java concurrent包并发工具——CountDownL

      本文链接:https://www.haomeiwen.com/subject/daloqftx.html