在开发中,我们经常会处理一些数据,会用到定时任务,比如半夜同步接口数据,或者处理一些数据到统计表,方便生成报表。
一般的做法就是使用spring的定时任务了,它和quartz有一些结合,使用配xml配置定时任务,或者使用注解来做定时任务,需要说明的是,这里的定时是静态的,也就是在配置上或者注解上写上了cron表达式,一般情况下,如果没有遇到变态的客户,基本就能够满足大部分的开发需求了。
首先讲讲静态定时任务的不足,那就是写死了cron表达式,如果客户需要改定时时间,那就要停服务,改配置,再重启;如果因为其它原因导致服务不可用或者停服务,导致错过了此次同步时间,那就只有等下一次的同步时间了;有一些同步接口过时了,需要删除该定时任务,那么就从代码和配置上删除,再来一遍重启服务;更关键的是,我不知道当前定时任务的状态,比如这个机器有多少定时任务正在跑,他们分别花了多少时间,错误了几次。你要问我这些,我只能盯着控制台,看日志输出,这好惨啊。
所以,是时候升级一下了,来个动态定时任务。有配置的界面,可以随时停止,随时启动,设置最大失败重试次数,随时删除定时任务,随时加入定时任务,每一次的定时运行都有日志可以查询,特别是错误日志,统计当前机器运行的定时任务,耗时之类的统计数据。
好,开干吧。在撸之前,请用香皂把手洗干净,然后保持温暖。。。,停停停,你在说些什么?我不懂,这里可是学习知识的地方啊,请不要开车。
咳,首先我想说的是,我假设你已经在用springboot进行开发了,这会省去很多不必要的说明。
第一步呢,就是引入依赖:
pom.xml加入:
<!--动态定时任务框架quartz-->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.2</version>
</dependency>
其余的springboot依赖以及数据源配置,略去。
先来配置一个bean吧,当springboot启动之后,会把ApplicationContext放入该类,用它我们就可以获取任意的bean:
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class ApplicationContextHolder implements ApplicationContextAware {
/**
* 上下文对象实例
*/
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
/**
* 获取applicationContext
* @return
*/
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
/**
* 通过name获取 Bean.
* @param name
* @return
*/
public static Object getBean(String name){
return getApplicationContext().getBean(name);
}
/**
* 通过class获取Bean.
* @param clazz
* @param <T>
* @return
*/
public static <T> T getBean(Class<T> clazz){
return getApplicationContext().getBean(clazz);
}
/**
* 通过name,以及Clazz返回指定的Bean
* @param name
* @param clazz
* @param <T>
* @return
*/
public static <T> T getBean(String name,Class<T> clazz){
return getApplicationContext().getBean(name, clazz);
}
}
好了,准备工作已经妥当,先来讲一下原理,原理就是我将定时任务cron 执行业务的bean以及参数之类的,放入表中,以后对该表进行操作,就可以管理定时任务了。至于添加定时任务、启动、停止、删除定时任务的具体执行,交给quartz来帮助我们执行。每一次定时任务的执行,都会插入日志,插入日志的方法一般选取无事务方式,保证日志一定插入成功,即使业务方法运行报错。
好了,下面再来准备两张表:
-- 定时任务表
DROP TABLE IF EXISTS `t_schedule_task`;
CREATE TABLE `t_schedule_task` (
`id` varchar(32) NOT NULL COMMENT '主键',
`business_id` varchar(64) DEFAULT NULL COMMENT '业务编号',
`business_type` varchar(32) DEFAULT NULL COMMENT '业务类型 1:xxx 2:yyy',
`name` varchar(64) DEFAULT NULL COMMENT '任务名称',
`group_id` varchar(64) DEFAULT NULL COMMENT '任务组ID',
`cron` varchar(64) DEFAULT NULL COMMENT 'cron表达式,',
`job_status` varchar(10) DEFAULT NULL COMMENT '定时任务状态 1:未运行 2:正在运行 3:已停止 4:运行失败次数太多而停止',
`is_concurrent` varchar(10) DEFAULT '1' COMMENT '定时是否并行运行 1:串行 2:并行',
`bean_name` varchar(32) DEFAULT NULL COMMENT '任务执行需要反射调用的service bean名称',
`method_name` varchar(32) DEFAULT NULL COMMENT '调用的bean的方法名称',
`params` varchar(128) DEFAULT NULL COMMENT '调用方法参数 json字符串',
`des` varchar(255) DEFAULT NULL COMMENT '任务描述',
`valid` smallint(6) unsigned DEFAULT '1' COMMENT '有效标记(1:有效,0:无效)',
`max_try_times` int(6) unsigned DEFAULT '0' COMMENT '失败之后,最大重试次数',
`error_times` int(6) unsigned DEFAULT '0' COMMENT '已经失败次数',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
`create_user_id` varchar(40) DEFAULT NULL COMMENT '创建者ID',
`update_user_id` varchar(40) DEFAULT NULL COMMENT '修改者ID',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='定时任务表';
-- 定时任务日志表
DROP TABLE IF EXISTS `t_schedule_task_log`;
CREATE TABLE `t_schedule_task_log` (
`id` varchar(32) NOT NULL COMMENT '主键',
`task_id` varchar(32) DEFAULT NULL COMMENT 't_schedule_task 主键',
`status` varchar(10) DEFAULT NULL COMMENT '任务状态 1:成功 2:失败 ',
`valid` smallint(6) unsigned DEFAULT 0 COMMENT '有效标记(1:有效,0:无效)',
`error_cause` varchar(1024) DEFAULT NULL COMMENT '错误原因',
`start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`finish_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
`exec_time` int unsigned DEFAULT 0 COMMENT '执行时长 秒',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
`create_user_id` varchar(40) DEFAULT NULL COMMENT '创建者ID',
`update_user_id` varchar(40) DEFAULT NULL COMMENT '修改者ID',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='定时任务日志表';
对应的mapper.xml和mapper接口比较简单,此处略过。
定时任务的Service,该类只是更新定时任务表、记录日志,同时把定时任务加入quartz中:
@Slf4j
@Service
public class ScheduleTaskService {
@Autowired
private ScheduleTaskLogService scheduleTaskLogService;
@Autowired
private LogService logService;
//这是quartz的bean 是需要自己配置额,下文中有说
@Autowired
private SchedulerFactoryBean schedulerFactoryBean;
@Autowired
private ScheduleTaskMapper scheduleTaskMapper;
public List<ScheduleTask> getUnStartedOrRunningTask() {
//xxx 业务逻辑自己写 主要是查询未运行或者正在运行的任务
}
@Transactional(propagation = Propagation.NOT_SUPPORTED)
public void updateForError(String taskId,String taskLogId,String errorCause,Date beginDate,Date endDate){
try {
//同时停止定时任务
JobKey jobKey = this.getJobKeyByTaskId(taskId);
if (jobKey != null) {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
scheduler.deleteJob(jobKey);
}
//记录日志 更新定时任务状态
//添加定时任务日志
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void updateForSuccess(String taskId,String taskLogId,Date beginDate,Date endDate){
//成功的时候更改定时任务状态 添加日志
}
@Override
public void stopById(String taskId) throws Exception{
//同时停止定时任务
JobKey jobKey = this.getJobKeyByTaskId(taskId);
if(jobKey != null){
Scheduler scheduler = schedulerFactoryBean.getScheduler();
scheduler.deleteJob(jobKey);
}
//跟新定时任务表
}
@Override
public JobKey getJobKeyByTaskId(String taskId) throws Exception{
if(taskId == null || taskId.trim().isEmpty()){
throw new RuntimeException("定时任务Id为空");
}
JobKey jobKey = null;
Scheduler scheduler = schedulerFactoryBean.getScheduler();
Set<TriggerKey> triggerKeySet = scheduler.getTriggerKeys(GroupMatcher.anyGroup());
if(CollectionUtils.isNotEmpty(triggerKeySet)){
Iterator<TriggerKey> it = triggerKeySet.iterator();
while (it.hasNext()) {
TriggerKey triggerKey = it.next();
//通过triggerKey在scheduler中获取trigger对象
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
String strggerName = triggerKey.getName();
if(taskId.equals(strggerName)){
//获取trigger拥有的Job
jobKey = trigger.getJobKey();
break;
}
}
}
return jobKey;
}
@Override
public boolean addOrUpdateTaskToQuartz(ScheduleTask task){
boolean isSuccess = false;
try {
if (task == null) {
throw new RuntimeException("task为空");
}
Scheduler scheduler = schedulerFactoryBean.getScheduler();
TriggerKey triggerKey = TriggerKey.triggerKey(task.getId(), task.getGroupId());
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
if (null == trigger) {
String isConcurrent = task.getIsConcurrent();
Class clazz;
if (isConcurrent != null && ScheduleTaskIsConcurrentEnum.PARALLEL.getCode().equals(isConcurrent)) {
clazz = QuartzParallelJob.class;
} else {
clazz = QuartzSerialJob.class;
}
JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(task.getId(), task.getGroupId()).build();
jobDetail.getJobDataMap().put(ScheduleTaskConst.SCHEDULE_TASK_DATA_MAP_KEY, task);
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCron());
trigger = TriggerBuilder.newTrigger().withDescription(task.getId()).withIdentity(task.getId(), task.getGroupId())
.withSchedule(scheduleBuilder).build();
scheduler.scheduleJob(jobDetail, trigger);
} else {
//删除任务,再添加
scheduler.deleteJob(trigger.getJobKey());
addOrUpdateTaskToQuartz(task);
}
isSuccess = true;
}catch (Exception e){
e.printStackTrace();
//这里不要抛出异常,单个任务的失败,不要影响后续的执行
isSuccess = false;
e.printStackTrace();
log.error("添加单个定时任务失败,任务:" + JSON.toJSONString(task));
//此处需要记录失败到表中
logService.errorLog("添加单个定时任务失败,任务:" + JSON.toJSONString(task) + ExceptionUtil.getStackTrace(e));
}finally {
return isSuccess;
}
}
@Override
public void launchById(String id) throws Exception{
//加入到定时
this.addOrUpdateTaskToQuartz(scheduleTask);
//更新定时任务表
}
@Override
public void deleteById(String id) throws Exception{
//更新定时任务表
//同时停止定时任务
JobKey jobKey = this.getJobKeyByTaskId(taskId);
if (jobKey != null) {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
scheduler.deleteJob(jobKey);
}
}
@Override
public void update(ScheduleTaskUpdateDTO scheduleTaskUpdateDTO){
// 更新
}
@Override
public void add(ScheduleTaskAddDTO scheduleTaskAddDTO){
//新增
}
}
定时任务配置类:
@Configuration
public class QuartzConfig {
@Bean
public SchedulerFactoryBean schedulerFactoryBean(){
return new SchedulerFactoryBean();
}
}
那具体的业务执行呢,上面只是基础,需要有东西将它们串联起来:
@Slf4j
public class QuartzParallelJob implements Job {
public void execute(JobExecutionContext context){
ScheduleTask scheduleTask = (ScheduleTask) context.getMergedJobDataMap().get(ScheduleTaskConst.SCHEDULE_TASK_DATA_MAP_KEY);
ScheduleTaskService scheduleTaskService = ApplicationContextHolder.getBean(ScheduleTaskService.class);
String taskLogId = RandomUtil.getUUID();
//任务的起始和结束时间 对于异步的任务,这么记录就不行了
Date beginTime = new Date();
try {
if(scheduleTask == null){
return;
}
String beanName = scheduleTask.getBeanName();
if(beanName == null || beanName.trim().isEmpty()){
throw new RuntimeException("task的beanName为空");
}
TaskService taskService = ApplicationContextHolder.getBean(beanName,TaskService.class);
if(taskService == null){
throw new RuntimeException("未找到beanName对应的bean,bean:" + beanName);
}
taskService.run(scheduleTask,taskLogId);
//成功更新
scheduleTaskService.updateForSuccess(scheduleTask.getId(),taskLogId,beginTime,new Date());
}catch (Exception e){
e.printStackTrace();
//失败更新
scheduleTaskService.updateForError(scheduleTask.getId(),taskLogId,ExceptionUtil.getStackTrace(e), beginTime,new Date());
}
}
}
另一个:
@Slf4j
@DisallowConcurrentExecution//跟上面唯一的区别就是多了这个注解
public class QuartzSerialJob implements Job {
public void execute(JobExecutionContext context) {
ScheduleTask scheduleTask = (ScheduleTask) context.getMergedJobDataMap().get(ScheduleTaskConst.SCHEDULE_TASK_DATA_MAP_KEY);
ScheduleTaskService scheduleTaskService = ApplicationContextHolder.getBean(ScheduleTaskService.class);
String taskLogId = RandomUtil.getUUID();
//任务的起始和结束时间 对于异步的任务,这么记录就不行了
Date beginTime = new Date();
try {
if(scheduleTask == null){
return;
}
String beanName = scheduleTask.getBeanName();
if(beanName == null || beanName.trim().isEmpty()){
throw new RuntimeException("task的beanName为空");
}
TaskService taskService = ApplicationContextHolder.getBean(beanName,TaskService.class);
if(taskService == null){
throw new RuntimeException("未找到beanName对应的bean,bean:" + beanName);
}
taskService.run(scheduleTask,taskLogId);
//成功更新
scheduleTaskService.updateForSuccess(scheduleTask.getId(),taskLogId,beginTime,new Date());
}catch (Exception e){
e.printStackTrace();
//失败更新
scheduleTaskService.updateForError(scheduleTask.getId(),taskLogId,ExceptionUtil.getStackTrace(e), beginTime,new Date());
}
}
}
上面这两个类,实现了quartz的job,是替我们执行定时任务的,里面用applicationContext.getBean的方式,拿到了我们的业务逻辑bean,就可以执行了。
业务逻辑bean示意:
@Service
public class YYYTaskService implements TaskService {
@Override
public Object run(ScheduleTask task,String taskLogId) throws Exception{
//1 校验
if(task == null){
throw new RuntimeException("task实体类为空,无法执行定时任务");
}
String id = task.getId();
if(id == null || id.trim().isEmpty()){
throw new RuntimeException("定时任务ID为空,无法执行定时任务");
}
String businessId = task.getBusinessId();
if(businessId == null || businessId.isEmpty()){
throw new RuntimeException("定时任务businessId为空,无法执行定时任务");
}
//具体业务逻辑
return null;
}
}
还差一个问题,那就是如果服务突然挂掉,那定时任务就全没有了,需要在控制界面挨个启动?no no no,我们可以直接在项目启动完毕,将没有启动和正在运行的定时任务直接加入定时quartz中:
/**
* @Author
* @Date
* @Description 定时任务的初始化 就是从数据表中获取还没有执行的定时任务,加入定时任务 在系统启动的时候,只获取一次
* 在项目运行期间的定时任务,是动态的添加到quartz中去执行的
*/
@Slf4j
@Component
@DependsOn({"applicationContextHolder","scheduleTaskService"})
public class QuartzJobInit {
@Autowired
private ApplicationContextHolder applicationContextHolder;
@Autowired
private ScheduleTaskService scheduleTaskService;
@PostConstruct
public void afterPropertiesSet(){
try {
// 这里从数据库中获取要运行的任务
List<ScheduleTask> list = scheduleTaskService.getUnStartedOrRunningTask();
if(CollectionUtils.isEmpty(list)){
return;
}
for(ScheduleTask task : list){
scheduleTaskService.addOrUpdateTaskToQuartz(task);
}
}catch (Exception e){
e.printStackTrace();
throw new RuntimeException("初始化定时任务失败");
}
}
}
至于界面和统计数据,就看定时任务表、定时任务日志表、以及各自业务逻辑日志表中去获取数据,展现给控制界面吧。
网友评论