美文网首页
可动态配置的 Schedule 设计

可动态配置的 Schedule 设计

作者: Geekhalo | 来源:发表于2021-10-12 16:52 被阅读0次

    1. 背景

    定时任务是实际开发中常见的一类功能,例如每天早上凌晨对前一天的注册用户数量、渠道来源进行统计,并以邮件报表的方式发送给相关人员。相信这样的需求,每个开发伙伴都处理过。

    你可以使用 Linux 的 Crontab 启动应用程序进行处理,或者直接使用 Spring 的 Schedule 对任务进行调度,还可以使用分布式调度系统,如果 xxl-job 等。相信你已经轻车熟路、习以为常。直到有一天你接到了一个新需求:

    1. 新建一组任务,周期性的执行指定 SQL 并将结果以邮件的方式发送给特定人群;
    2. 比较方便的对任务进行管理,比如 启动、停止,修改调度周期等;
    3. 动态添加、移除任务,不需要频繁的修改、发布程序;

    停顿几分钟,简单思考一下,有哪几种实现思路呢?

    本篇文章将从一下几部分进行讨论:

    1. Spring Schedule 配置和使用。首先我们将介绍 Demo 的骨架,并基于 Spring-Boot 完成 Schedule 的配置;
    2. 数据库定时轮询方案。使用 Spring Schedule 定时轮询 数据库,并执行相应任务。在执行任务策略中,我们将尝试同步和异步执行两种方案,并对其优缺点进行分析;
    3. 基于 TaskScheduler 动态配置方案。基于数据库 轮询 或 配置中心 两种方案动态的对 Spring TaskScheduler 进行配置,以实现动态管理任务的目的;
    4. 我们进入分布式环境,利用多个冗余节点解决系统高可用问题,同时使用分布式锁保障只会有一个任务同时执行;

    2. Spring Schedule

    Spring Boot 上的 Schedule 的使用非常简单,无需增加新的依赖,只需简单配置即可。

    1. 使用 @EnableScheduling 启用 Schedule;
    2. 在要调度的方法上增加 @Scheduled;

    首先,我们需要在启动类上添加 @EnableScheduling 注解,该注解将启用 SchedulingConfiguration 配置类帮我们完成最基本的配置。

    @SpringBootApplication
    @EnableScheduling
    public class ConfigurableScheduleDemoApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(ConfigurableScheduleDemoApplication.class, args);
        }
    
    }
    
    

    启用Schedule配置之后,在需要被调度的方法上增加 @Scheduled 注解。

    @Service
    public class SpringScheduleService {
        @Autowired
        private TaskService taskService;
    
        @Scheduled(fixedDelay = 5 * 1000, initialDelay = 1000)
        public void runTask(){
            TaskConfig taskConfig = TaskConfig.builder()
                    .name("Spring Default Schedule")
                    .build();
            this.taskService.runTask(taskConfig);
        }
    }
    
    

    runTask 任务延迟 1s 进行初始化,并以 5s 为间隔进行调度。

    Scheduled 注解类的详细配置如下:

    配置 含义 样例
    cron linux crontab 表达式 @Scheduled(cron="*/5 * * * * MON-FRI") 工作日,每 5 s 调度一次
    fixedDelay 固定间隔,上次运行结束,与下次启动运行,相隔固定时长 @Scheduled(fixedDelay=5000) 运行结束后,5S 后启动一次调度
    fixedDelayString 与 fixedDelay 一致
    fixedRate 固定周期,前后两次运行相隔固定的时长 @Scheduled(fixedRate=5000) 前后两个任务,间隔 5 秒
    fixedRateString 与 fixedRate 一致
    initialDelay 第一次执行,间隔时间 @Scheduled(initialDelay=1000, fixedRate=5000) 第一次执行,延时 1 秒,以后以 5 秒为周期进行调度
    initialDelayString 与 initialDelay 一致

    环境搭建完成,让我们开始第一个方案。

    3. 数据库定时轮询

    使用数据库来管理任务,通过轮询的方案,进行动态调度。首先,我们看下最简单的方案:串行执行方案。

    3.1. 串行执行方案

    整体思路非常简单,流程如下: [图片上传失败...(image-d1558d-1634028690457)]

    主要分如下几步:

    1. 在应用中启动一个 Schedule 任务(每 1 秒调度一次),定时从 数据库 中获取待执行的任务(状态为可用,下一次执行时间小于当前时间);
    2. 根据数据库的任务配置信息,依次遍历并执行任务;
    3. 任务执行完成后,经过计算获得下一次调度时间,将其写回到数据库;
    4. 等待下一次任务调度。

    核心代码如下:

    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void loadAndRunTask(){
        Date now = new Date();
        // 加载需要运行的任务:
        // 1\. 状态为 ENABLE
        // 2\. 下一次运行时间 小于 当前时间
        List<TaskDefinitionV2> shouldRunTasks = loadShouldRunTasks(now);
    
        // 依次遍历待运行任务,执行对于的任务
        for (TaskDefinitionV2 task : shouldRunTasks){
            // Double Check
            if (task.shouldRun(now)){
                // 执行任务
                runTask(task);
    
                // 更新任务的下一次运行时间
                updateNextRunTime(task, now);
            }
        }
    }
    
    

    方案简单但非常有效,那该方案存在哪些问题呢? 最主要的问题就是:任务串行执行,会导致后面任务出现延时运行;同时,下一轮检查也会被 delay。

    例如,依次加载了待执行任务 task1、task2、task3。其中 task1 耗时 5 秒,task2 耗时 5 秒,task3 耗时 1 秒,由于三个任务串行执行,task2 将延时 5 秒,task3 延时 10秒;下一轮检查距上次启动相差 12 秒。

    究其根本,核心问题是 调度线程 和 运行线程 是同一个线程,调度的运行 和 任务的运行相互影响。

    让我们看一个改进方案:并行执行方案。

    3.2. 并行执行方案

    整体执行流程如下: [图片上传失败...(image-3cdb5-1634028690457)]

    相比之前的方案,新方案引入了线程池,每一个任务对应一个线程池,避免任务间的相互影响;任务在线程池中异步处理,避免了调度线程的延时。具体流程如下:

    1. 步骤一不变,在应用中启动一个 Schedule 任务(每 1 秒调度一次),定时从 数据库 中获取待执行的任务(状态为可用,下一次执行时间小于当前时间);
    2. 依次遍历任务,将任务提交到专有线程池中异步执行,调度线程直接返回;
    3. 任务在线程池中运行,结束后更新下一次的运行时间;
    4. 调度线程重新从数据库中获取待执行任务,在将任务提交至线程池中,如果有任务正在执行,使用线程池拒绝策略,抛弃最老的任务;

    核心代码如下:

    Spring 调度任务,每 1 秒运行一次:

    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void loadAndRunTask(){
        Date now = new Date();
        // 加载所有待运行的任务
        // 1\. 状态为 ENABLE
        // 2\. 下一次运行时间小于 当前时间
        List<TaskDefinitionV2> shouldRunTasks = loadShouldRunTasks(now);
        // 遍历待运行任务
        for (TaskDefinitionV2 task : shouldRunTasks){
            // 1\. 根据 Task Id 获取任务对应的线程池
            // 2\. 将任务提交至线程池中
            this.executorServiceForTask(task.getId())
                    .submit(new TaskRunner(task.getId()));
        }
    }
    
    

    自定义线程池,每个线程池最多只有一个线程,空闲超过 10 秒后,线程自动回收,线程饱和时,直接丢弃最老的任务:

    private ExecutorService executorServiceForTask(Long taskId){
        return this.executorServiceRegistry.computeIfAbsent(taskId, id->{
            BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
                    // 指定线程池名称
                    .namingPattern("Async-Task-"+ taskId +"-Thread-%d")
                    // 设置线程为 后台线程
                    .daemon(true)
                    .build();
            // 线程池核心配置:
            // 1\. 每个线程池最多只有一个线程
            // 2\. 线程空闲超过 10秒 进行自动回收
            // 3\. 直接使用交互器,线程空闲进行任务交互
            // 4\. 使用指定的线程工厂,设置线性名称
            // 5\. 线程池饱和,自动丢弃最老的任务
            return new ThreadPoolExecutor(0, 1,
                    10L, TimeUnit.SECONDS,
                    new SynchronousQueue<>(),
                    threadFactory,
                    new ThreadPoolExecutor.DiscardOldestPolicy()
            );
        });
    }
    
    

    最后,在线程池中运行的 Task 如下:

    private class TaskRunner implements Runnable {
        private final Date now = new Date();
        private final Long taskId;
        public TaskRunner(Long taskId) {
            this.taskId = taskId;
        }
    
        @Override
        public void run() {
            // 重新加载任务,保持最新的任务状态
            TaskDefinitionV2 task = definitionV2Repository.findById(this.taskId).orElse(null);
            if (task != null && task.shouldRun(now)){
                // 运行任务
                runTask(task);
    
                // 更新任务的下一次运行时间
                updateNextRunTime(task, now);
            }
        }
    }
    
    

    4. TaskScheduler 配置方案

    该方案的核心为:绕过 @Schedule 注解,直接对 Spring 底层核心 类 TaskScheduler 进行配置。

    TaskScheduler 接口是 Spring 对调度任务的一个抽象,更是 @Schedule 背后默默的支持者,首先我们看下这个接口定义。

    public interface TaskScheduler {
    
        ScheduledFuture schedule(Runnable task, Trigger trigger);
    
        ScheduledFuture schedule(Runnable task, Instant startTime);
    
        ScheduledFuture schedule(Runnable task, Date startTime);
    
        ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period);
    
        ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period);
    
        ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period);
    
        ScheduledFuture scheduleAtFixedRate(Runnable task, long period);
    
        ScheduledFuture scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay);
    
        ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay);
    
        ScheduledFuture scheduleWithFixedDelay(Runnable task, Duration delay);
    
        ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay);
    }
    
    

    满满的都是 schedule 接口,其他的比较简单就不过多叙述了,重点说下 Trigger 这个接口,首先看下这个接口的定义:

    public interface Trigger {
        Date nextExecutionTime(TriggerContext triggerContext);
    }
    
    

    只有一个方法,获取下次执行的时间。在任务执行完成后,会调用 Trigger 的 nextExecutionTime 获取下一次运行时间,从而实现周期性调度。

    CronTrigger 是 Trigger 的最常见实现,以 linux crontab 的方式配置调度任务,如:

    scheduler.schedule(task, new CronTrigger("0 15 9-17 * * MON-FRI"));
    
    

    基础部分简单介绍到这,让我们看下数据库动态配置方案。

    4.1 数据库动态配置方案

    整体设计如下:

    [图片上传失败...(image-1ae0f0-1634028690457)]

    仍旧是轮询数据库方式,详细流程如下:

    1. 在应用中启动一个 Schedule 任务(每 1 秒调度一次),定时从 数据库 中获取所有任务;
    2. 依次遍历任务,与内存中的 TaskEntry(任务与状态) 进行比对,动态的向 TaskScheduler 中 添加 或 取消 调度任务;
    3. 由 TaskScheduler 负责实际的任务调度;

    核心代码如下:

    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void loadAndConfig(){
        // 加载所有的任务信息
        List<TaskDefinitionV3> tasks = repository.findAll();
        // 遍历任务进行任务检查
        for (TaskDefinitionV3 task : tasks){
            // 获取内存任务状态
            TaskEntry taskEntry = this.taskEntry.computeIfAbsent(task.getId(), TaskEntry::new);
    
            if (task.isEnable() && taskEntry.isStop()){
                // 任务为可用,运行状态为停止,则重新进行 schedule 注册
                ScheduledFuture<?> scheduledFuture = this.taskScheduler.scheduleWithFixedDelay(new TaskRunner(task), task.getDelay() * 1000);
                taskEntry.setScheduledFuture(scheduledFuture);
                log.info("success to start schedule task for {}", task);
    
            }else if (task.isDisable() && taskEntry.isRunning()){
                // 任务为禁用,运行状态为运行中,停止正在运行在任务
                taskEntry.stop();
                log.info("success to stop schedule task for {}", task);
            }
        }
    }
    
    

    核心辅助类:

    @Data
    private class TaskEntry{
        private final Long taskId;
        private ScheduledFuture scheduledFuture;
        private TaskEntry(Long taskId) {
            this.taskId = taskId;
        }
    
        /**
         * 内存状态 scheduledFuture 为 null,则没有运行的任务
         * @return
         */
        public boolean isStop() {
            return scheduledFuture == null;
        }
    
        /**
         * 内存状态 scheduledFuture 不为 null,则存在运行的任务
         * @return
         */
        public boolean isRunning() {
            return scheduledFuture != null;
        }
    
        /**
         * 停止调度任务 <br />
         * 1\. 内存状态设置为 null
         * 2\. 调用 scheduledFuture#cancel 终止正在运行的调度任务
         */
        public void stop() {
            ScheduledFuture scheduledFuture = this.scheduledFuture;
            this.scheduledFuture = null;
    
            scheduledFuture.cancel(true);
        }
    }
    
    

    有没有发现,以上方案都有一个共同的缺陷:基于数据库轮询获取任务,加大了数据库压力。理论上,只有在配置发生变化时才有必要对任务进行更新,接下来让我们看下改进方案:基于配置中心的方案。

    4.2 配置中心通知方案

    整体设计如下:

    [图片上传失败...(image-8b5812-1634028690457)]

    核心流程如下:

    1. 应用启动时,从配置中心中获取 调度的配置信息,并完成对 TaskScheduler 的配置;
    2. 当配置发送变化时,配置中心会主动将配置推送到 应用程序,应用程序在接收到变化通知时,动态的增加或取消调度任务;
    3. 任务的实际调度仍旧由 TaskScheduler 完成。

    由于手底下没有配置中心,暂时没有 coding,思路很简单,有条件的同学可以自行完成。

    5. 分布式环境下应用

    以上方案,都是在单机环境下运行,如果应用程序挂掉了,任务调度也就停止了,为了避免这种情况的发生,需要提升系统的可用性,实现 冗余部署 和 自动化容灾。

    以上方案,如果部署多个节点会发生什么?是的,会出现任务被多次调度的问题,为了保障在同一时刻只有一个任务在运行,需要为任务增加一个排他锁。同时,由于排他锁的存在,当一个节点处问题后,另一个节点在调度时会自动获取锁,从而解系统的单点问题。

    为了简单,我们使用 Redis 的分布式锁。

    5.1. 环境搭建

    Redisson 是 Redis 的一个富客户端,提供了很多高级的数据结构。本次,我们将使用 RLock 对应用进行保护。

    首先,在 pom 中引入 Redisson Starter。

    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson-spring-boot-starter</artifactId>
        <version>3.16.2</version>
    </dependency>
    
    

    然后,在 application.properties 文件中增加 Redis 配置,具体如下:

    spring.redis.host=127.0.0.1
    spring.redis.port=6379
    spring.redis.database=0
    
    

    5.2 引入分布式锁

    最后,就可以直接使用 分布式锁 对任务执行进行保护了,代码如下:

    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void loadAndRunTaskWithLock(){
        Date now = new Date();
        // 加载需要运行的任务:
        // 1\. 状态为 ENABLE
        // 2\. 下一次运行时间 小于 当前时间
        List<TaskDefinitionV2> shouldRunTasks = loadShouldRunTasks(now);
        // 依次遍历待运行任务,执行对于的任务
        for (TaskDefinitionV2 task : shouldRunTasks){
            // Double Check
            if (task.shouldRun(now)){
    
                // 获取分布式锁,用于保证每个任务只能有一个正在运行
                RLock lock = this.redissonClient.getFairLock("LoadAndRunScheduleService-" + task.getId());
                if (lock.tryLock()) {
                    // 成功获取锁,运行任务
                    try {
                        log.info("Success to get Lock, begin to run task {}", task.getId());
    
                        // 执行任务
                        runTask(task);
    
                        // 更新任务的下一次运行时间
                        updateNextRunTime(task, now);
                        log.info("Success to run task {}", task.getId());
                    }finally {
                        // 任务运行解释,释放锁
                        lock.unlock();
                    }
                }else {
                    // 未获取锁,打印日志,不做额外处理
                    log.info("Failed to get Lock!!!!");
                }
            }
        }
    }
    
    

    备注:

    Redis 是典型的 AP 应用,而分布式锁严格意义上来说是 CP。所以基于 Redis 的分布式锁只能使用在非严格环境中,比如我们的数据报表需求。如果设计金钱,需要使用 CP 实现,如 Zookeeper 或 etcd 等。

    6. 小结

    本文从 Spring 的 Schedule 出发,依次对数据库轮询方案、TaskScheduler 配置方案 进行详细讲解,以实现对调度任务的可配置化。最后,使用 Redis 分布式锁有效解决了分布式环境下任务重复调度和自动容灾问题。

    仍旧是那句话,架构设计没有更好,只有最适合。同学们可以根据自己的需求自取。

    最后,附上源码 源码

    相关文章

      网友评论

          本文标题:可动态配置的 Schedule 设计

          本文链接:https://www.haomeiwen.com/subject/bqxjoltx.html