1. 背景
定时任务是实际开发中常见的一类功能,例如每天早上凌晨对前一天的注册用户数量、渠道来源进行统计,并以邮件报表的方式发送给相关人员。相信这样的需求,每个开发伙伴都处理过。
你可以使用 Linux 的 Crontab 启动应用程序进行处理,或者直接使用 Spring 的 Schedule 对任务进行调度,还可以使用分布式调度系统,如果 xxl-job 等。相信你已经轻车熟路、习以为常。直到有一天你接到了一个新需求:
- 新建一组任务,周期性的执行指定 SQL 并将结果以邮件的方式发送给特定人群;
- 比较方便的对任务进行管理,比如 启动、停止,修改调度周期等;
- 动态添加、移除任务,不需要频繁的修改、发布程序;
停顿几分钟,简单思考一下,有哪几种实现思路呢?
本篇文章将从一下几部分进行讨论:
- Spring Schedule 配置和使用。首先我们将介绍 Demo 的骨架,并基于 Spring-Boot 完成 Schedule 的配置;
- 数据库定时轮询方案。使用 Spring Schedule 定时轮询 数据库,并执行相应任务。在执行任务策略中,我们将尝试同步和异步执行两种方案,并对其优缺点进行分析;
- 基于 TaskScheduler 动态配置方案。基于数据库 轮询 或 配置中心 两种方案动态的对 Spring TaskScheduler 进行配置,以实现动态管理任务的目的;
- 我们进入分布式环境,利用多个冗余节点解决系统高可用问题,同时使用分布式锁保障只会有一个任务同时执行;
2. Spring Schedule
Spring Boot 上的 Schedule 的使用非常简单,无需增加新的依赖,只需简单配置即可。
- 使用 @EnableScheduling 启用 Schedule;
- 在要调度的方法上增加 @Scheduled;
首先,我们需要在启动类上添加 @EnableScheduling 注解,该注解将启用 SchedulingConfiguration 配置类帮我们完成最基本的配置。
@SpringBootApplication
@EnableScheduling
public class ConfigurableScheduleDemoApplication {
public static void main(String[] args) {
SpringApplication.run(ConfigurableScheduleDemoApplication.class, args);
}
}
启用Schedule配置之后,在需要被调度的方法上增加 @Scheduled 注解。
@Service
public class SpringScheduleService {
@Autowired
private TaskService taskService;
@Scheduled(fixedDelay = 5 * 1000, initialDelay = 1000)
public void runTask(){
TaskConfig taskConfig = TaskConfig.builder()
.name("Spring Default Schedule")
.build();
this.taskService.runTask(taskConfig);
}
}
runTask 任务延迟 1s 进行初始化,并以 5s 为间隔进行调度。
Scheduled 注解类的详细配置如下:
配置 | 含义 | 样例 |
---|---|---|
cron | linux crontab 表达式 | @Scheduled(cron="*/5 * * * * MON-FRI") 工作日,每 5 s 调度一次 |
fixedDelay | 固定间隔,上次运行结束,与下次启动运行,相隔固定时长 | @Scheduled(fixedDelay=5000) 运行结束后,5S 后启动一次调度 |
fixedDelayString | 与 fixedDelay 一致 | |
fixedRate | 固定周期,前后两次运行相隔固定的时长 | @Scheduled(fixedRate=5000) 前后两个任务,间隔 5 秒 |
fixedRateString | 与 fixedRate 一致 | |
initialDelay | 第一次执行,间隔时间 | @Scheduled(initialDelay=1000, fixedRate=5000) 第一次执行,延时 1 秒,以后以 5 秒为周期进行调度 |
initialDelayString | 与 initialDelay 一致 |
环境搭建完成,让我们开始第一个方案。
3. 数据库定时轮询
使用数据库来管理任务,通过轮询的方案,进行动态调度。首先,我们看下最简单的方案:串行执行方案。
3.1. 串行执行方案
整体思路非常简单,流程如下: [图片上传失败...(image-d1558d-1634028690457)]
主要分如下几步:
- 在应用中启动一个 Schedule 任务(每 1 秒调度一次),定时从 数据库 中获取待执行的任务(状态为可用,下一次执行时间小于当前时间);
- 根据数据库的任务配置信息,依次遍历并执行任务;
- 任务执行完成后,经过计算获得下一次调度时间,将其写回到数据库;
- 等待下一次任务调度。
核心代码如下:
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void loadAndRunTask(){
Date now = new Date();
// 加载需要运行的任务:
// 1\. 状态为 ENABLE
// 2\. 下一次运行时间 小于 当前时间
List<TaskDefinitionV2> shouldRunTasks = loadShouldRunTasks(now);
// 依次遍历待运行任务,执行对于的任务
for (TaskDefinitionV2 task : shouldRunTasks){
// Double Check
if (task.shouldRun(now)){
// 执行任务
runTask(task);
// 更新任务的下一次运行时间
updateNextRunTime(task, now);
}
}
}
方案简单但非常有效,那该方案存在哪些问题呢? 最主要的问题就是:任务串行执行,会导致后面任务出现延时运行;同时,下一轮检查也会被 delay。
例如,依次加载了待执行任务 task1、task2、task3。其中 task1 耗时 5 秒,task2 耗时 5 秒,task3 耗时 1 秒,由于三个任务串行执行,task2 将延时 5 秒,task3 延时 10秒;下一轮检查距上次启动相差 12 秒。
究其根本,核心问题是 调度线程 和 运行线程 是同一个线程,调度的运行 和 任务的运行相互影响。
让我们看一个改进方案:并行执行方案。
3.2. 并行执行方案
整体执行流程如下: [图片上传失败...(image-3cdb5-1634028690457)]
相比之前的方案,新方案引入了线程池,每一个任务对应一个线程池,避免任务间的相互影响;任务在线程池中异步处理,避免了调度线程的延时。具体流程如下:
- 步骤一不变,在应用中启动一个 Schedule 任务(每 1 秒调度一次),定时从 数据库 中获取待执行的任务(状态为可用,下一次执行时间小于当前时间);
- 依次遍历任务,将任务提交到专有线程池中异步执行,调度线程直接返回;
- 任务在线程池中运行,结束后更新下一次的运行时间;
- 调度线程重新从数据库中获取待执行任务,在将任务提交至线程池中,如果有任务正在执行,使用线程池拒绝策略,抛弃最老的任务;
核心代码如下:
Spring 调度任务,每 1 秒运行一次:
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void loadAndRunTask(){
Date now = new Date();
// 加载所有待运行的任务
// 1\. 状态为 ENABLE
// 2\. 下一次运行时间小于 当前时间
List<TaskDefinitionV2> shouldRunTasks = loadShouldRunTasks(now);
// 遍历待运行任务
for (TaskDefinitionV2 task : shouldRunTasks){
// 1\. 根据 Task Id 获取任务对应的线程池
// 2\. 将任务提交至线程池中
this.executorServiceForTask(task.getId())
.submit(new TaskRunner(task.getId()));
}
}
自定义线程池,每个线程池最多只有一个线程,空闲超过 10 秒后,线程自动回收,线程饱和时,直接丢弃最老的任务:
private ExecutorService executorServiceForTask(Long taskId){
return this.executorServiceRegistry.computeIfAbsent(taskId, id->{
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
// 指定线程池名称
.namingPattern("Async-Task-"+ taskId +"-Thread-%d")
// 设置线程为 后台线程
.daemon(true)
.build();
// 线程池核心配置:
// 1\. 每个线程池最多只有一个线程
// 2\. 线程空闲超过 10秒 进行自动回收
// 3\. 直接使用交互器,线程空闲进行任务交互
// 4\. 使用指定的线程工厂,设置线性名称
// 5\. 线程池饱和,自动丢弃最老的任务
return new ThreadPoolExecutor(0, 1,
10L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
threadFactory,
new ThreadPoolExecutor.DiscardOldestPolicy()
);
});
}
最后,在线程池中运行的 Task 如下:
private class TaskRunner implements Runnable {
private final Date now = new Date();
private final Long taskId;
public TaskRunner(Long taskId) {
this.taskId = taskId;
}
@Override
public void run() {
// 重新加载任务,保持最新的任务状态
TaskDefinitionV2 task = definitionV2Repository.findById(this.taskId).orElse(null);
if (task != null && task.shouldRun(now)){
// 运行任务
runTask(task);
// 更新任务的下一次运行时间
updateNextRunTime(task, now);
}
}
}
4. TaskScheduler 配置方案
该方案的核心为:绕过 @Schedule 注解,直接对 Spring 底层核心 类 TaskScheduler 进行配置。
TaskScheduler 接口是 Spring 对调度任务的一个抽象,更是 @Schedule 背后默默的支持者,首先我们看下这个接口定义。
public interface TaskScheduler {
ScheduledFuture schedule(Runnable task, Trigger trigger);
ScheduledFuture schedule(Runnable task, Instant startTime);
ScheduledFuture schedule(Runnable task, Date startTime);
ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period);
ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period);
ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period);
ScheduledFuture scheduleAtFixedRate(Runnable task, long period);
ScheduledFuture scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay);
ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay);
ScheduledFuture scheduleWithFixedDelay(Runnable task, Duration delay);
ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay);
}
满满的都是 schedule 接口,其他的比较简单就不过多叙述了,重点说下 Trigger 这个接口,首先看下这个接口的定义:
public interface Trigger {
Date nextExecutionTime(TriggerContext triggerContext);
}
只有一个方法,获取下次执行的时间。在任务执行完成后,会调用 Trigger 的 nextExecutionTime 获取下一次运行时间,从而实现周期性调度。
CronTrigger 是 Trigger 的最常见实现,以 linux crontab 的方式配置调度任务,如:
scheduler.schedule(task, new CronTrigger("0 15 9-17 * * MON-FRI"));
基础部分简单介绍到这,让我们看下数据库动态配置方案。
4.1 数据库动态配置方案
整体设计如下:
[图片上传失败...(image-1ae0f0-1634028690457)]
仍旧是轮询数据库方式,详细流程如下:
- 在应用中启动一个 Schedule 任务(每 1 秒调度一次),定时从 数据库 中获取所有任务;
- 依次遍历任务,与内存中的 TaskEntry(任务与状态) 进行比对,动态的向 TaskScheduler 中 添加 或 取消 调度任务;
- 由 TaskScheduler 负责实际的任务调度;
核心代码如下:
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void loadAndConfig(){
// 加载所有的任务信息
List<TaskDefinitionV3> tasks = repository.findAll();
// 遍历任务进行任务检查
for (TaskDefinitionV3 task : tasks){
// 获取内存任务状态
TaskEntry taskEntry = this.taskEntry.computeIfAbsent(task.getId(), TaskEntry::new);
if (task.isEnable() && taskEntry.isStop()){
// 任务为可用,运行状态为停止,则重新进行 schedule 注册
ScheduledFuture<?> scheduledFuture = this.taskScheduler.scheduleWithFixedDelay(new TaskRunner(task), task.getDelay() * 1000);
taskEntry.setScheduledFuture(scheduledFuture);
log.info("success to start schedule task for {}", task);
}else if (task.isDisable() && taskEntry.isRunning()){
// 任务为禁用,运行状态为运行中,停止正在运行在任务
taskEntry.stop();
log.info("success to stop schedule task for {}", task);
}
}
}
核心辅助类:
@Data
private class TaskEntry{
private final Long taskId;
private ScheduledFuture scheduledFuture;
private TaskEntry(Long taskId) {
this.taskId = taskId;
}
/**
* 内存状态 scheduledFuture 为 null,则没有运行的任务
* @return
*/
public boolean isStop() {
return scheduledFuture == null;
}
/**
* 内存状态 scheduledFuture 不为 null,则存在运行的任务
* @return
*/
public boolean isRunning() {
return scheduledFuture != null;
}
/**
* 停止调度任务 <br />
* 1\. 内存状态设置为 null
* 2\. 调用 scheduledFuture#cancel 终止正在运行的调度任务
*/
public void stop() {
ScheduledFuture scheduledFuture = this.scheduledFuture;
this.scheduledFuture = null;
scheduledFuture.cancel(true);
}
}
有没有发现,以上方案都有一个共同的缺陷:基于数据库轮询获取任务,加大了数据库压力。理论上,只有在配置发生变化时才有必要对任务进行更新,接下来让我们看下改进方案:基于配置中心的方案。
4.2 配置中心通知方案
整体设计如下:
[图片上传失败...(image-8b5812-1634028690457)]
核心流程如下:
- 应用启动时,从配置中心中获取 调度的配置信息,并完成对 TaskScheduler 的配置;
- 当配置发送变化时,配置中心会主动将配置推送到 应用程序,应用程序在接收到变化通知时,动态的增加或取消调度任务;
- 任务的实际调度仍旧由 TaskScheduler 完成。
由于手底下没有配置中心,暂时没有 coding,思路很简单,有条件的同学可以自行完成。
5. 分布式环境下应用
以上方案,都是在单机环境下运行,如果应用程序挂掉了,任务调度也就停止了,为了避免这种情况的发生,需要提升系统的可用性,实现 冗余部署 和 自动化容灾。
以上方案,如果部署多个节点会发生什么?是的,会出现任务被多次调度的问题,为了保障在同一时刻只有一个任务在运行,需要为任务增加一个排他锁。同时,由于排他锁的存在,当一个节点处问题后,另一个节点在调度时会自动获取锁,从而解系统的单点问题。
为了简单,我们使用 Redis 的分布式锁。
5.1. 环境搭建
Redisson 是 Redis 的一个富客户端,提供了很多高级的数据结构。本次,我们将使用 RLock 对应用进行保护。
首先,在 pom 中引入 Redisson Starter。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.2</version>
</dependency>
然后,在 application.properties 文件中增加 Redis 配置,具体如下:
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.database=0
5.2 引入分布式锁
最后,就可以直接使用 分布式锁 对任务执行进行保护了,代码如下:
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void loadAndRunTaskWithLock(){
Date now = new Date();
// 加载需要运行的任务:
// 1\. 状态为 ENABLE
// 2\. 下一次运行时间 小于 当前时间
List<TaskDefinitionV2> shouldRunTasks = loadShouldRunTasks(now);
// 依次遍历待运行任务,执行对于的任务
for (TaskDefinitionV2 task : shouldRunTasks){
// Double Check
if (task.shouldRun(now)){
// 获取分布式锁,用于保证每个任务只能有一个正在运行
RLock lock = this.redissonClient.getFairLock("LoadAndRunScheduleService-" + task.getId());
if (lock.tryLock()) {
// 成功获取锁,运行任务
try {
log.info("Success to get Lock, begin to run task {}", task.getId());
// 执行任务
runTask(task);
// 更新任务的下一次运行时间
updateNextRunTime(task, now);
log.info("Success to run task {}", task.getId());
}finally {
// 任务运行解释,释放锁
lock.unlock();
}
}else {
// 未获取锁,打印日志,不做额外处理
log.info("Failed to get Lock!!!!");
}
}
}
}
备注:
Redis 是典型的 AP 应用,而分布式锁严格意义上来说是 CP。所以基于 Redis 的分布式锁只能使用在非严格环境中,比如我们的数据报表需求。如果设计金钱,需要使用 CP 实现,如 Zookeeper 或 etcd 等。
6. 小结
本文从 Spring 的 Schedule 出发,依次对数据库轮询方案、TaskScheduler 配置方案 进行详细讲解,以实现对调度任务的可配置化。最后,使用 Redis 分布式锁有效解决了分布式环境下任务重复调度和自动容灾问题。
仍旧是那句话,架构设计没有更好,只有最适合。同学们可以根据自己的需求自取。
最后,附上源码 源码
网友评论