美文网首页
动态定时任务

动态定时任务

作者: 呼噜噜睡 | 来源:发表于2024-07-13 09:00 被阅读0次

在开发中,我们经常会处理一些数据,会用到定时任务,比如半夜同步接口数据,或者处理一些数据到统计表,方便生成报表。

一般的做法就是使用spring的定时任务了,它和quartz有一些结合,使用配xml配置定时任务,或者使用注解来做定时任务,需要说明的是,这里的定时是静态的,也就是在配置上或者注解上写上了cron表达式,一般情况下,如果没有遇到变态的客户,基本就能够满足大部分的开发需求了。

首先讲讲静态定时任务的不足,那就是写死了cron表达式,如果客户需要改定时时间,那就要停服务,改配置,再重启;如果因为其它原因导致服务不可用或者停服务,导致错过了此次同步时间,那就只有等下一次的同步时间了;有一些同步接口过时了,需要删除该定时任务,那么就从代码和配置上删除,再来一遍重启服务;更关键的是,我不知道当前定时任务的状态,比如这个机器有多少定时任务正在跑,他们分别花了多少时间,错误了几次。你要问我这些,我只能盯着控制台,看日志输出,这好惨啊。

所以,是时候升级一下了,来个动态定时任务。有配置的界面,可以随时停止,随时启动,设置最大失败重试次数,随时删除定时任务,随时加入定时任务,每一次的定时运行都有日志可以查询,特别是错误日志,统计当前机器运行的定时任务,耗时之类的统计数据。

好,开干吧。在撸之前,请用香皂把手洗干净,然后保持温暖。。。,停停停,你在说些什么?我不懂,这里可是学习知识的地方啊,请不要开车。

咳,首先我想说的是,我假设你已经在用springboot进行开发了,这会省去很多不必要的说明。

第一步呢,就是引入依赖:

pom.xml加入:

<!--动态定时任务框架quartz-->
<dependency>
  <groupId>org.quartz-scheduler</groupId>
  <artifactId>quartz</artifactId>
  <version>2.3.2</version>
</dependency>

其余的springboot依赖以及数据源配置,略去。
先来配置一个bean吧,当springboot启动之后,会把ApplicationContext放入该类,用它我们就可以获取任意的bean:

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

@Component
public class ApplicationContextHolder implements ApplicationContextAware {

    /**
     * 上下文对象实例
     */
    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    /**
     * 获取applicationContext
     * @return
     */
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    /**
     * 通过name获取 Bean.
     * @param name
     * @return
     */
    public static Object getBean(String name){
        return getApplicationContext().getBean(name);
    }

    /**
     * 通过class获取Bean.
     * @param clazz
     * @param <T>
     * @return
     */
    public static <T> T getBean(Class<T> clazz){
        return getApplicationContext().getBean(clazz);
    }

    /**
     * 通过name,以及Clazz返回指定的Bean
     * @param name
     * @param clazz
     * @param <T>
     * @return
     */
    public static <T> T getBean(String name,Class<T> clazz){
        return getApplicationContext().getBean(name, clazz);
    }
}

好了,准备工作已经妥当,先来讲一下原理,原理就是我将定时任务cron 执行业务的bean以及参数之类的,放入表中,以后对该表进行操作,就可以管理定时任务了。至于添加定时任务、启动、停止、删除定时任务的具体执行,交给quartz来帮助我们执行。每一次定时任务的执行,都会插入日志,插入日志的方法一般选取无事务方式,保证日志一定插入成功,即使业务方法运行报错。
好了,下面再来准备两张表:

-- 定时任务表
DROP TABLE IF EXISTS `t_schedule_task`;
CREATE TABLE `t_schedule_task` (
   `id` varchar(32) NOT NULL COMMENT '主键',
   `business_id` varchar(64) DEFAULT NULL COMMENT '业务编号',
   `business_type` varchar(32) DEFAULT NULL COMMENT '业务类型  1:xxx  2:yyy',
   `name` varchar(64) DEFAULT NULL COMMENT '任务名称',
   `group_id` varchar(64) DEFAULT NULL COMMENT '任务组ID',
   `cron` varchar(64) DEFAULT NULL COMMENT 'cron表达式,',
   `job_status` varchar(10) DEFAULT NULL COMMENT '定时任务状态  1:未运行 2:正在运行  3:已停止 4:运行失败次数太多而停止',
   `is_concurrent` varchar(10) DEFAULT '1' COMMENT '定时是否并行运行 1:串行 2:并行',
   `bean_name` varchar(32) DEFAULT NULL COMMENT '任务执行需要反射调用的service bean名称',
   `method_name` varchar(32) DEFAULT NULL COMMENT '调用的bean的方法名称',
   `params` varchar(128) DEFAULT NULL COMMENT '调用方法参数 json字符串',
   `des` varchar(255) DEFAULT NULL COMMENT '任务描述',
   `valid` smallint(6) unsigned DEFAULT '1' COMMENT '有效标记(1:有效,0:无效)',
   `max_try_times` int(6) unsigned DEFAULT '0' COMMENT '失败之后,最大重试次数',
   `error_times` int(6) unsigned DEFAULT '0' COMMENT '已经失败次数',
   `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
   `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
   `create_user_id` varchar(40) DEFAULT NULL COMMENT '创建者ID',
   `update_user_id` varchar(40) DEFAULT NULL COMMENT '修改者ID',
   PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='定时任务表';

-- 定时任务日志表
DROP TABLE IF EXISTS `t_schedule_task_log`;
CREATE TABLE `t_schedule_task_log` (
   `id` varchar(32) NOT NULL COMMENT '主键',
   `task_id` varchar(32) DEFAULT NULL COMMENT 't_schedule_task 主键',
   `status` varchar(10) DEFAULT NULL COMMENT '任务状态  1:成功 2:失败 ',
   `valid` smallint(6) unsigned DEFAULT 0 COMMENT '有效标记(1:有效,0:无效)',
   `error_cause` varchar(1024) DEFAULT NULL COMMENT '错误原因',
   `start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
   `finish_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
   `exec_time` int unsigned DEFAULT 0 COMMENT '执行时长  秒',
   `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
   `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
   `create_user_id` varchar(40) DEFAULT NULL COMMENT '创建者ID',
   `update_user_id` varchar(40) DEFAULT NULL COMMENT '修改者ID',
   PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='定时任务日志表';

对应的mapper.xml和mapper接口比较简单,此处略过。
定时任务的Service,该类只是更新定时任务表、记录日志,同时把定时任务加入quartz中:

@Slf4j
@Service
public class ScheduleTaskService {

    @Autowired
    private ScheduleTaskLogService scheduleTaskLogService;

    @Autowired
    private LogService logService;

    //这是quartz的bean  是需要自己配置额,下文中有说
    @Autowired
    private SchedulerFactoryBean schedulerFactoryBean;

    @Autowired
    private ScheduleTaskMapper scheduleTaskMapper;

    public List<ScheduleTask> getUnStartedOrRunningTask() {
        //xxx 业务逻辑自己写 主要是查询未运行或者正在运行的任务
    }

    @Transactional(propagation = Propagation.NOT_SUPPORTED)
    public void updateForError(String taskId,String taskLogId,String errorCause,Date beginDate,Date endDate){
        try {
            //同时停止定时任务
            JobKey jobKey = this.getJobKeyByTaskId(taskId);
            if (jobKey != null) {
                Scheduler scheduler = schedulerFactoryBean.getScheduler();
                scheduler.deleteJob(jobKey);
            }
            //记录日志  更新定时任务状态
            //添加定时任务日志
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void updateForSuccess(String taskId,String taskLogId,Date beginDate,Date endDate){
        //成功的时候更改定时任务状态  添加日志
    }

    @Override
    public void stopById(String taskId) throws Exception{
        //同时停止定时任务
        JobKey jobKey = this.getJobKeyByTaskId(taskId);
        if(jobKey != null){
            Scheduler scheduler = schedulerFactoryBean.getScheduler();
            scheduler.deleteJob(jobKey);
        }
        //跟新定时任务表
    }

    @Override
    public JobKey getJobKeyByTaskId(String taskId) throws Exception{
        if(taskId == null || taskId.trim().isEmpty()){
            throw new RuntimeException("定时任务Id为空");
        }
        JobKey jobKey = null;
        Scheduler scheduler = schedulerFactoryBean.getScheduler();
        Set<TriggerKey> triggerKeySet = scheduler.getTriggerKeys(GroupMatcher.anyGroup());
        if(CollectionUtils.isNotEmpty(triggerKeySet)){
            Iterator<TriggerKey> it = triggerKeySet.iterator();
            while (it.hasNext()) {
                TriggerKey triggerKey = it.next();
                //通过triggerKey在scheduler中获取trigger对象
                CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
                String strggerName = triggerKey.getName();
                if(taskId.equals(strggerName)){
                    //获取trigger拥有的Job
                    jobKey = trigger.getJobKey();
                    break;
                }
            }
        }
        return jobKey;
    }

    @Override
    public boolean addOrUpdateTaskToQuartz(ScheduleTask task){
        boolean isSuccess = false;
        try {
            if (task == null) {
                throw new RuntimeException("task为空");
            }
            Scheduler scheduler = schedulerFactoryBean.getScheduler();
            TriggerKey triggerKey = TriggerKey.triggerKey(task.getId(), task.getGroupId());
            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
            if (null == trigger) {
                String isConcurrent = task.getIsConcurrent();
                Class clazz;
                if (isConcurrent != null && ScheduleTaskIsConcurrentEnum.PARALLEL.getCode().equals(isConcurrent)) {
                    clazz = QuartzParallelJob.class;
                } else {
                    clazz = QuartzSerialJob.class;
                }
                JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(task.getId(), task.getGroupId()).build();
                jobDetail.getJobDataMap().put(ScheduleTaskConst.SCHEDULE_TASK_DATA_MAP_KEY, task);
                CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCron());
                trigger = TriggerBuilder.newTrigger().withDescription(task.getId()).withIdentity(task.getId(), task.getGroupId())
                        .withSchedule(scheduleBuilder).build();
                scheduler.scheduleJob(jobDetail, trigger);
            } else {
                //删除任务,再添加
                scheduler.deleteJob(trigger.getJobKey());
                addOrUpdateTaskToQuartz(task);
            }
            isSuccess = true;
        }catch (Exception e){
            e.printStackTrace();
            //这里不要抛出异常,单个任务的失败,不要影响后续的执行
            isSuccess = false;
            e.printStackTrace();
            log.error("添加单个定时任务失败,任务:" + JSON.toJSONString(task));
            //此处需要记录失败到表中
            logService.errorLog("添加单个定时任务失败,任务:" + JSON.toJSONString(task) + ExceptionUtil.getStackTrace(e));
        }finally {
            return isSuccess;
        }
    }

    @Override
    public void launchById(String id) throws Exception{
        //加入到定时
        this.addOrUpdateTaskToQuartz(scheduleTask);
        //更新定时任务表
    }

    @Override
    public void deleteById(String id) throws Exception{
        //更新定时任务表
        //同时停止定时任务
        JobKey jobKey = this.getJobKeyByTaskId(taskId);
        if (jobKey != null) {
            Scheduler scheduler = schedulerFactoryBean.getScheduler();
            scheduler.deleteJob(jobKey);
        }
    }

    @Override
    public void update(ScheduleTaskUpdateDTO scheduleTaskUpdateDTO){
        // 更新
    }

    @Override
    public void add(ScheduleTaskAddDTO scheduleTaskAddDTO){
        //新增
    }
}

定时任务配置类:
@Configuration
public class QuartzConfig {
    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(){
        return new SchedulerFactoryBean();
    }
}

那具体的业务执行呢,上面只是基础,需要有东西将它们串联起来:

@Slf4j
public class QuartzParallelJob implements Job {

    public void execute(JobExecutionContext context){
        ScheduleTask scheduleTask = (ScheduleTask) context.getMergedJobDataMap().get(ScheduleTaskConst.SCHEDULE_TASK_DATA_MAP_KEY);
        ScheduleTaskService scheduleTaskService = ApplicationContextHolder.getBean(ScheduleTaskService.class);
        String taskLogId = RandomUtil.getUUID();
        //任务的起始和结束时间  对于异步的任务,这么记录就不行了
        Date beginTime = new Date();
        try {
            if(scheduleTask == null){
                return;
            }
            String beanName = scheduleTask.getBeanName();
            if(beanName == null || beanName.trim().isEmpty()){
                throw new RuntimeException("task的beanName为空");
            }
            TaskService taskService = ApplicationContextHolder.getBean(beanName,TaskService.class);
            if(taskService == null){
                throw new RuntimeException("未找到beanName对应的bean,bean:" + beanName);
            }
            taskService.run(scheduleTask,taskLogId);
            //成功更新
            scheduleTaskService.updateForSuccess(scheduleTask.getId(),taskLogId,beginTime,new Date());
        }catch (Exception e){
            e.printStackTrace();
            //失败更新
            scheduleTaskService.updateForError(scheduleTask.getId(),taskLogId,ExceptionUtil.getStackTrace(e), beginTime,new Date());
        }
    }
}

另一个:

@Slf4j
@DisallowConcurrentExecution//跟上面唯一的区别就是多了这个注解
public class QuartzSerialJob implements Job {

    public void execute(JobExecutionContext context) {
        ScheduleTask scheduleTask = (ScheduleTask) context.getMergedJobDataMap().get(ScheduleTaskConst.SCHEDULE_TASK_DATA_MAP_KEY);
        ScheduleTaskService scheduleTaskService = ApplicationContextHolder.getBean(ScheduleTaskService.class);
        String taskLogId = RandomUtil.getUUID();
        //任务的起始和结束时间  对于异步的任务,这么记录就不行了
        Date beginTime = new Date();
        try {
            if(scheduleTask == null){
                return;
            }
            String beanName = scheduleTask.getBeanName();
            if(beanName == null || beanName.trim().isEmpty()){
                throw new RuntimeException("task的beanName为空");
            }
            TaskService taskService = ApplicationContextHolder.getBean(beanName,TaskService.class);

            if(taskService == null){
                throw new RuntimeException("未找到beanName对应的bean,bean:" + beanName);
            }
            taskService.run(scheduleTask,taskLogId);
            //成功更新
            scheduleTaskService.updateForSuccess(scheduleTask.getId(),taskLogId,beginTime,new Date());
        }catch (Exception e){
            e.printStackTrace();
            //失败更新
            scheduleTaskService.updateForError(scheduleTask.getId(),taskLogId,ExceptionUtil.getStackTrace(e), beginTime,new Date());
        }
    }
}

上面这两个类,实现了quartz的job,是替我们执行定时任务的,里面用applicationContext.getBean的方式,拿到了我们的业务逻辑bean,就可以执行了。
业务逻辑bean示意:

@Service
public class YYYTaskService implements TaskService {
    
    @Override
    public Object run(ScheduleTask task,String taskLogId) throws Exception{
        //1 校验
        if(task == null){
            throw new RuntimeException("task实体类为空,无法执行定时任务");
        }
        String id = task.getId();
        if(id == null || id.trim().isEmpty()){
            throw new RuntimeException("定时任务ID为空,无法执行定时任务");
        }
        String businessId = task.getBusinessId();
        if(businessId == null || businessId.isEmpty()){
            throw new RuntimeException("定时任务businessId为空,无法执行定时任务");
        }
        //具体业务逻辑
        return null;
    }
}

还差一个问题,那就是如果服务突然挂掉,那定时任务就全没有了,需要在控制界面挨个启动?no no no,我们可以直接在项目启动完毕,将没有启动和正在运行的定时任务直接加入定时quartz中:

/**
 * @Author 
 * @Date 
 * @Description 定时任务的初始化    就是从数据表中获取还没有执行的定时任务,加入定时任务  在系统启动的时候,只获取一次
 *              在项目运行期间的定时任务,是动态的添加到quartz中去执行的
 */
@Slf4j
@Component
@DependsOn({"applicationContextHolder","scheduleTaskService"})
public class QuartzJobInit {

    @Autowired
    private ApplicationContextHolder applicationContextHolder;

    @Autowired
    private ScheduleTaskService scheduleTaskService;

    @PostConstruct
    public void afterPropertiesSet(){
        try {
            // 这里从数据库中获取要运行的任务
            List<ScheduleTask> list = scheduleTaskService.getUnStartedOrRunningTask();
            if(CollectionUtils.isEmpty(list)){
                return;
            }
            for(ScheduleTask task : list){
                scheduleTaskService.addOrUpdateTaskToQuartz(task);
            }
        }catch (Exception e){
            e.printStackTrace();
            throw new RuntimeException("初始化定时任务失败");
        }
    }
}

至于界面和统计数据,就看定时任务表、定时任务日志表、以及各自业务逻辑日志表中去获取数据,展现给控制界面吧。

相关文章

网友评论

      本文标题:动态定时任务

      本文链接:https://www.haomeiwen.com/subject/fpxqhjtx.html