美文网首页
动态定时任务

动态定时任务

作者: 呼噜噜睡 | 来源:发表于2024-07-13 09:00 被阅读0次

    在开发中,我们经常会处理一些数据,会用到定时任务,比如半夜同步接口数据,或者处理一些数据到统计表,方便生成报表。

    一般的做法就是使用spring的定时任务了,它和quartz有一些结合,使用配xml配置定时任务,或者使用注解来做定时任务,需要说明的是,这里的定时是静态的,也就是在配置上或者注解上写上了cron表达式,一般情况下,如果没有遇到变态的客户,基本就能够满足大部分的开发需求了。

    首先讲讲静态定时任务的不足,那就是写死了cron表达式,如果客户需要改定时时间,那就要停服务,改配置,再重启;如果因为其它原因导致服务不可用或者停服务,导致错过了此次同步时间,那就只有等下一次的同步时间了;有一些同步接口过时了,需要删除该定时任务,那么就从代码和配置上删除,再来一遍重启服务;更关键的是,我不知道当前定时任务的状态,比如这个机器有多少定时任务正在跑,他们分别花了多少时间,错误了几次。你要问我这些,我只能盯着控制台,看日志输出,这好惨啊。

    所以,是时候升级一下了,来个动态定时任务。有配置的界面,可以随时停止,随时启动,设置最大失败重试次数,随时删除定时任务,随时加入定时任务,每一次的定时运行都有日志可以查询,特别是错误日志,统计当前机器运行的定时任务,耗时之类的统计数据。

    好,开干吧。在撸之前,请用香皂把手洗干净,然后保持温暖。。。,停停停,你在说些什么?我不懂,这里可是学习知识的地方啊,请不要开车。

    咳,首先我想说的是,我假设你已经在用springboot进行开发了,这会省去很多不必要的说明。

    第一步呢,就是引入依赖:

    pom.xml加入:

    <!--动态定时任务框架quartz-->
    <dependency>
      <groupId>org.quartz-scheduler</groupId>
      <artifactId>quartz</artifactId>
      <version>2.3.2</version>
    </dependency>
    

    其余的springboot依赖以及数据源配置,略去。
    先来配置一个bean吧,当springboot启动之后,会把ApplicationContext放入该类,用它我们就可以获取任意的bean:

    import org.springframework.beans.BeansException;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    import org.springframework.stereotype.Component;
    
    @Component
    public class ApplicationContextHolder implements ApplicationContextAware {
    
        /**
         * 上下文对象实例
         */
        private static ApplicationContext applicationContext;
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    
        /**
         * 获取applicationContext
         * @return
         */
        public static ApplicationContext getApplicationContext() {
            return applicationContext;
        }
    
        /**
         * 通过name获取 Bean.
         * @param name
         * @return
         */
        public static Object getBean(String name){
            return getApplicationContext().getBean(name);
        }
    
        /**
         * 通过class获取Bean.
         * @param clazz
         * @param <T>
         * @return
         */
        public static <T> T getBean(Class<T> clazz){
            return getApplicationContext().getBean(clazz);
        }
    
        /**
         * 通过name,以及Clazz返回指定的Bean
         * @param name
         * @param clazz
         * @param <T>
         * @return
         */
        public static <T> T getBean(String name,Class<T> clazz){
            return getApplicationContext().getBean(name, clazz);
        }
    }
    

    好了,准备工作已经妥当,先来讲一下原理,原理就是我将定时任务cron 执行业务的bean以及参数之类的,放入表中,以后对该表进行操作,就可以管理定时任务了。至于添加定时任务、启动、停止、删除定时任务的具体执行,交给quartz来帮助我们执行。每一次定时任务的执行,都会插入日志,插入日志的方法一般选取无事务方式,保证日志一定插入成功,即使业务方法运行报错。
    好了,下面再来准备两张表:

    -- 定时任务表
    DROP TABLE IF EXISTS `t_schedule_task`;
    CREATE TABLE `t_schedule_task` (
       `id` varchar(32) NOT NULL COMMENT '主键',
       `business_id` varchar(64) DEFAULT NULL COMMENT '业务编号',
       `business_type` varchar(32) DEFAULT NULL COMMENT '业务类型  1:xxx  2:yyy',
       `name` varchar(64) DEFAULT NULL COMMENT '任务名称',
       `group_id` varchar(64) DEFAULT NULL COMMENT '任务组ID',
       `cron` varchar(64) DEFAULT NULL COMMENT 'cron表达式,',
       `job_status` varchar(10) DEFAULT NULL COMMENT '定时任务状态  1:未运行 2:正在运行  3:已停止 4:运行失败次数太多而停止',
       `is_concurrent` varchar(10) DEFAULT '1' COMMENT '定时是否并行运行 1:串行 2:并行',
       `bean_name` varchar(32) DEFAULT NULL COMMENT '任务执行需要反射调用的service bean名称',
       `method_name` varchar(32) DEFAULT NULL COMMENT '调用的bean的方法名称',
       `params` varchar(128) DEFAULT NULL COMMENT '调用方法参数 json字符串',
       `des` varchar(255) DEFAULT NULL COMMENT '任务描述',
       `valid` smallint(6) unsigned DEFAULT '1' COMMENT '有效标记(1:有效,0:无效)',
       `max_try_times` int(6) unsigned DEFAULT '0' COMMENT '失败之后,最大重试次数',
       `error_times` int(6) unsigned DEFAULT '0' COMMENT '已经失败次数',
       `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
       `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
       `create_user_id` varchar(40) DEFAULT NULL COMMENT '创建者ID',
       `update_user_id` varchar(40) DEFAULT NULL COMMENT '修改者ID',
       PRIMARY KEY (`id`) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='定时任务表';
    
    -- 定时任务日志表
    DROP TABLE IF EXISTS `t_schedule_task_log`;
    CREATE TABLE `t_schedule_task_log` (
       `id` varchar(32) NOT NULL COMMENT '主键',
       `task_id` varchar(32) DEFAULT NULL COMMENT 't_schedule_task 主键',
       `status` varchar(10) DEFAULT NULL COMMENT '任务状态  1:成功 2:失败 ',
       `valid` smallint(6) unsigned DEFAULT 0 COMMENT '有效标记(1:有效,0:无效)',
       `error_cause` varchar(1024) DEFAULT NULL COMMENT '错误原因',
       `start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
       `finish_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
       `exec_time` int unsigned DEFAULT 0 COMMENT '执行时长  秒',
       `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
       `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
       `create_user_id` varchar(40) DEFAULT NULL COMMENT '创建者ID',
       `update_user_id` varchar(40) DEFAULT NULL COMMENT '修改者ID',
       PRIMARY KEY (`id`) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='定时任务日志表';
    

    对应的mapper.xml和mapper接口比较简单,此处略过。
    定时任务的Service,该类只是更新定时任务表、记录日志,同时把定时任务加入quartz中:

    @Slf4j
    @Service
    public class ScheduleTaskService {
    
        @Autowired
        private ScheduleTaskLogService scheduleTaskLogService;
    
        @Autowired
        private LogService logService;
    
        //这是quartz的bean  是需要自己配置额,下文中有说
        @Autowired
        private SchedulerFactoryBean schedulerFactoryBean;
    
        @Autowired
        private ScheduleTaskMapper scheduleTaskMapper;
    
        public List<ScheduleTask> getUnStartedOrRunningTask() {
            //xxx 业务逻辑自己写 主要是查询未运行或者正在运行的任务
        }
    
        @Transactional(propagation = Propagation.NOT_SUPPORTED)
        public void updateForError(String taskId,String taskLogId,String errorCause,Date beginDate,Date endDate){
            try {
                //同时停止定时任务
                JobKey jobKey = this.getJobKeyByTaskId(taskId);
                if (jobKey != null) {
                    Scheduler scheduler = schedulerFactoryBean.getScheduler();
                    scheduler.deleteJob(jobKey);
                }
                //记录日志  更新定时任务状态
                //添加定时任务日志
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    
        @Override
        public void updateForSuccess(String taskId,String taskLogId,Date beginDate,Date endDate){
            //成功的时候更改定时任务状态  添加日志
        }
    
        @Override
        public void stopById(String taskId) throws Exception{
            //同时停止定时任务
            JobKey jobKey = this.getJobKeyByTaskId(taskId);
            if(jobKey != null){
                Scheduler scheduler = schedulerFactoryBean.getScheduler();
                scheduler.deleteJob(jobKey);
            }
            //跟新定时任务表
        }
    
        @Override
        public JobKey getJobKeyByTaskId(String taskId) throws Exception{
            if(taskId == null || taskId.trim().isEmpty()){
                throw new RuntimeException("定时任务Id为空");
            }
            JobKey jobKey = null;
            Scheduler scheduler = schedulerFactoryBean.getScheduler();
            Set<TriggerKey> triggerKeySet = scheduler.getTriggerKeys(GroupMatcher.anyGroup());
            if(CollectionUtils.isNotEmpty(triggerKeySet)){
                Iterator<TriggerKey> it = triggerKeySet.iterator();
                while (it.hasNext()) {
                    TriggerKey triggerKey = it.next();
                    //通过triggerKey在scheduler中获取trigger对象
                    CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
                    String strggerName = triggerKey.getName();
                    if(taskId.equals(strggerName)){
                        //获取trigger拥有的Job
                        jobKey = trigger.getJobKey();
                        break;
                    }
                }
            }
            return jobKey;
        }
    
        @Override
        public boolean addOrUpdateTaskToQuartz(ScheduleTask task){
            boolean isSuccess = false;
            try {
                if (task == null) {
                    throw new RuntimeException("task为空");
                }
                Scheduler scheduler = schedulerFactoryBean.getScheduler();
                TriggerKey triggerKey = TriggerKey.triggerKey(task.getId(), task.getGroupId());
                CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
                if (null == trigger) {
                    String isConcurrent = task.getIsConcurrent();
                    Class clazz;
                    if (isConcurrent != null && ScheduleTaskIsConcurrentEnum.PARALLEL.getCode().equals(isConcurrent)) {
                        clazz = QuartzParallelJob.class;
                    } else {
                        clazz = QuartzSerialJob.class;
                    }
                    JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(task.getId(), task.getGroupId()).build();
                    jobDetail.getJobDataMap().put(ScheduleTaskConst.SCHEDULE_TASK_DATA_MAP_KEY, task);
                    CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCron());
                    trigger = TriggerBuilder.newTrigger().withDescription(task.getId()).withIdentity(task.getId(), task.getGroupId())
                            .withSchedule(scheduleBuilder).build();
                    scheduler.scheduleJob(jobDetail, trigger);
                } else {
                    //删除任务,再添加
                    scheduler.deleteJob(trigger.getJobKey());
                    addOrUpdateTaskToQuartz(task);
                }
                isSuccess = true;
            }catch (Exception e){
                e.printStackTrace();
                //这里不要抛出异常,单个任务的失败,不要影响后续的执行
                isSuccess = false;
                e.printStackTrace();
                log.error("添加单个定时任务失败,任务:" + JSON.toJSONString(task));
                //此处需要记录失败到表中
                logService.errorLog("添加单个定时任务失败,任务:" + JSON.toJSONString(task) + ExceptionUtil.getStackTrace(e));
            }finally {
                return isSuccess;
            }
        }
    
        @Override
        public void launchById(String id) throws Exception{
            //加入到定时
            this.addOrUpdateTaskToQuartz(scheduleTask);
            //更新定时任务表
        }
    
        @Override
        public void deleteById(String id) throws Exception{
            //更新定时任务表
            //同时停止定时任务
            JobKey jobKey = this.getJobKeyByTaskId(taskId);
            if (jobKey != null) {
                Scheduler scheduler = schedulerFactoryBean.getScheduler();
                scheduler.deleteJob(jobKey);
            }
        }
    
        @Override
        public void update(ScheduleTaskUpdateDTO scheduleTaskUpdateDTO){
            // 更新
        }
    
        @Override
        public void add(ScheduleTaskAddDTO scheduleTaskAddDTO){
            //新增
        }
    }
    
    定时任务配置类:
    @Configuration
    public class QuartzConfig {
        @Bean
        public SchedulerFactoryBean schedulerFactoryBean(){
            return new SchedulerFactoryBean();
        }
    }
    

    那具体的业务执行呢,上面只是基础,需要有东西将它们串联起来:

    @Slf4j
    public class QuartzParallelJob implements Job {
    
        public void execute(JobExecutionContext context){
            ScheduleTask scheduleTask = (ScheduleTask) context.getMergedJobDataMap().get(ScheduleTaskConst.SCHEDULE_TASK_DATA_MAP_KEY);
            ScheduleTaskService scheduleTaskService = ApplicationContextHolder.getBean(ScheduleTaskService.class);
            String taskLogId = RandomUtil.getUUID();
            //任务的起始和结束时间  对于异步的任务,这么记录就不行了
            Date beginTime = new Date();
            try {
                if(scheduleTask == null){
                    return;
                }
                String beanName = scheduleTask.getBeanName();
                if(beanName == null || beanName.trim().isEmpty()){
                    throw new RuntimeException("task的beanName为空");
                }
                TaskService taskService = ApplicationContextHolder.getBean(beanName,TaskService.class);
                if(taskService == null){
                    throw new RuntimeException("未找到beanName对应的bean,bean:" + beanName);
                }
                taskService.run(scheduleTask,taskLogId);
                //成功更新
                scheduleTaskService.updateForSuccess(scheduleTask.getId(),taskLogId,beginTime,new Date());
            }catch (Exception e){
                e.printStackTrace();
                //失败更新
                scheduleTaskService.updateForError(scheduleTask.getId(),taskLogId,ExceptionUtil.getStackTrace(e), beginTime,new Date());
            }
        }
    }
    

    另一个:

    @Slf4j
    @DisallowConcurrentExecution//跟上面唯一的区别就是多了这个注解
    public class QuartzSerialJob implements Job {
    
        public void execute(JobExecutionContext context) {
            ScheduleTask scheduleTask = (ScheduleTask) context.getMergedJobDataMap().get(ScheduleTaskConst.SCHEDULE_TASK_DATA_MAP_KEY);
            ScheduleTaskService scheduleTaskService = ApplicationContextHolder.getBean(ScheduleTaskService.class);
            String taskLogId = RandomUtil.getUUID();
            //任务的起始和结束时间  对于异步的任务,这么记录就不行了
            Date beginTime = new Date();
            try {
                if(scheduleTask == null){
                    return;
                }
                String beanName = scheduleTask.getBeanName();
                if(beanName == null || beanName.trim().isEmpty()){
                    throw new RuntimeException("task的beanName为空");
                }
                TaskService taskService = ApplicationContextHolder.getBean(beanName,TaskService.class);
    
                if(taskService == null){
                    throw new RuntimeException("未找到beanName对应的bean,bean:" + beanName);
                }
                taskService.run(scheduleTask,taskLogId);
                //成功更新
                scheduleTaskService.updateForSuccess(scheduleTask.getId(),taskLogId,beginTime,new Date());
            }catch (Exception e){
                e.printStackTrace();
                //失败更新
                scheduleTaskService.updateForError(scheduleTask.getId(),taskLogId,ExceptionUtil.getStackTrace(e), beginTime,new Date());
            }
        }
    }
    

    上面这两个类,实现了quartz的job,是替我们执行定时任务的,里面用applicationContext.getBean的方式,拿到了我们的业务逻辑bean,就可以执行了。
    业务逻辑bean示意:

    @Service
    public class YYYTaskService implements TaskService {
        
        @Override
        public Object run(ScheduleTask task,String taskLogId) throws Exception{
            //1 校验
            if(task == null){
                throw new RuntimeException("task实体类为空,无法执行定时任务");
            }
            String id = task.getId();
            if(id == null || id.trim().isEmpty()){
                throw new RuntimeException("定时任务ID为空,无法执行定时任务");
            }
            String businessId = task.getBusinessId();
            if(businessId == null || businessId.isEmpty()){
                throw new RuntimeException("定时任务businessId为空,无法执行定时任务");
            }
            //具体业务逻辑
            return null;
        }
    }
    

    还差一个问题,那就是如果服务突然挂掉,那定时任务就全没有了,需要在控制界面挨个启动?no no no,我们可以直接在项目启动完毕,将没有启动和正在运行的定时任务直接加入定时quartz中:

    /**
     * @Author 
     * @Date 
     * @Description 定时任务的初始化    就是从数据表中获取还没有执行的定时任务,加入定时任务  在系统启动的时候,只获取一次
     *              在项目运行期间的定时任务,是动态的添加到quartz中去执行的
     */
    @Slf4j
    @Component
    @DependsOn({"applicationContextHolder","scheduleTaskService"})
    public class QuartzJobInit {
    
        @Autowired
        private ApplicationContextHolder applicationContextHolder;
    
        @Autowired
        private ScheduleTaskService scheduleTaskService;
    
        @PostConstruct
        public void afterPropertiesSet(){
            try {
                // 这里从数据库中获取要运行的任务
                List<ScheduleTask> list = scheduleTaskService.getUnStartedOrRunningTask();
                if(CollectionUtils.isEmpty(list)){
                    return;
                }
                for(ScheduleTask task : list){
                    scheduleTaskService.addOrUpdateTaskToQuartz(task);
                }
            }catch (Exception e){
                e.printStackTrace();
                throw new RuntimeException("初始化定时任务失败");
            }
        }
    }
    

    至于界面和统计数据,就看定时任务表、定时任务日志表、以及各自业务逻辑日志表中去获取数据,展现给控制界面吧。

    相关文章

      网友评论

          本文标题:动态定时任务

          本文链接:https://www.haomeiwen.com/subject/fpxqhjtx.html