美文网首页
Spring Boot集成quartz实现定时任务并支持切换任务

Spring Boot集成quartz实现定时任务并支持切换任务

作者: 我的小熊不见了 | 来源:发表于2019-08-28 18:02 被阅读0次

    org.quartz实现定时任务并自定义切换任务数据源

    在工作中经常会需要使用到定时任务处理各种周期性的任务,org.quartz是处理此类定时任务的一个优秀框架。随着项目一点点推进,此时我们并不满足于任务仅仅是定时执行,我们还想要对任务进行更多的控制,随时能对任务进行人为干预,就需要对quartz有更深入的了解。而随着微服务的流行,项目中多数据源的情况也越来越常见,在定时任务中集成多数据源切换的功能也需要集成进来。

    集成quartz实现定时任务

    集成quartz实现定时任务

    quartz中实现定时任务需要了解的基本概念

    Job

    通过实现Job类,在实现方法中写我们具体想要定时任务完成的工作,然后交给quartz管理。

    JobDetail

    Job只负责实现具体任务,所以还需要借助JobDetail来存储一些描述Job的基本信息。

    Quartz JobBuilder

    为构造JobDetail实体提供的builder-style API。你可以这样使用它来构建一个JobDetail

    @Bean
    public JobDetail jobDetail() {
     return JobBuilder.newJob().ofType(SampleJob.class)
     .storeDurably()
     .withIdentity("Qrtz_Job_Detail")
     .withDescription("Invoke Sample Job service...")
     .build();
    }
    

    Spring JobDetailFactoryBean

    Spring中配置JobDetail的方式:

    @Bean
    public JobDetailFactoryBean jobDetail() {
     JobDetailFactoryBean jobDetailFactory = new JobDetailFactoryBean();
     jobDetailFactory.setJobClass(SampleJob.class);
     jobDetailFactory.setDescription("Invoke Sample Job service...");
     jobDetailFactory.setDurability(true);
     return jobDetailFactory;
    }
    

    Trigger

    触发器,代表一个调度参数的配置,什么时候去调度:

    @Bean
    public Trigger trigger(JobDetail job) {
     return TriggerBuilder.newTrigger().forJob(job)
     .withIdentity("Qrtz_Trigger")
     .withDescription("Sample trigger")
     .withSchedule(simpleSchedule().repeatForever().withIntervalInHours(1))
     .build();
    }
    

    Scheduler

    调度器,通过JobTrigger来注册一个调度器:

    @Bean
    public Scheduler scheduler(Trigger trigger, JobDetail job) {
     StdSchedulerFactory factory = new StdSchedulerFactory();
     factory.initialize(new ClassPathResource("quartz.properties").getInputStream());
    
     Scheduler scheduler = factory.getScheduler();
     scheduler.setJobFactory(springBeanJobFactory());
     scheduler.scheduleJob(job, trigger);
    
     scheduler.start();
     return scheduler;
    }
    

    给系统添加一个Job

    quartzJob就是我们需要去执行的任务,由Scheduler调度器负责调度任务们依靠制定好的Trigger来定时执行任务。

    因此首先我们需要结合以上基础给系统添加一个Job。

    addJob

        public void addJob(BaseJob job) throws SchedulerException {
            /** 创建JobDetail实例,绑定Job实现类
            * JobDetail 表示一个具体的可执行的调度程序,job是这个可执行调度程序所要执行的内容
            * 另外JobDetail还包含了这个任务调度的方案和策略**/
            // 指明job的名称,所在组的名称,以及绑定job类
            JobDetail jobDetail = JobBuilder.newJob(job.getBeanClass())
                    .withIdentity(job.getJobKey())
                    .withDescription(job.getDescription())
                    .usingJobData(job.getDataMap())
                    .build();
    
            /**
             * Trigger代表一个调度参数的配置,什么时候去调度
             */
            //定义调度触发规则, 使用cronTrigger规则
            Trigger trigger = TriggerBuilder.newTrigger()
                    .withIdentity(job.getJobName(),job.getJobGroup())
                    .withSchedule(CronScheduleBuilder.cronSchedule(job.getCronExpression()))
                    .startNow()
                    .build();
            //将任务和触发器注册到任务调度中去
            scheduler.scheduleJob(jobDetail,trigger);
            //判断调度器是否启动
            if(!scheduler.isStarted()){
                scheduler.start();
            }
            log.info(String.format("定时任务:%s.%s-已添加到调度器!", job.getJobGroup(),job.getJobName()));
        }
    

    首先需要定义好我们的Job,之后通过Job初始化JobDetailTrigger,最后将JobDetailTrigger注册到调度器中。

    BaseJob

    Job的结构如下:

    public abstract class BaseJob implements Job,Serializable {
        private static final long serialVersionUID = 1L;
        private static final String JOB_MAP_KEY = "self";
        /**
         * 任务名称
         */
        private String jobName;
        /**
         * 任务分组
         */
        private String jobGroup;
        /**
         * 任务状态 是否启动任务
         */
        private String jobStatus;
        /**
         * cron表达式
         */
        private String cronExpression;
        /**
         * 描述
         */
        private String description;
        /**
         * 任务执行时调用哪个类的方法 包名+类名
         */
        private Class beanClass = this.getClass();
        /**
         * 任务是否有状态
         */
        private String isConcurrent;
    
        /**
         * Spring bean
         */
        private String springBean;
    
        /**
         * 任务调用的方法名
         */
        private String methodName;
    
         /**
         * 该任务所使用的数据源
         */
        private String dataSource = DataSourceEnum.DB1.getName();
    
        /**
         * 为了将执行后的任务持久化到数据库中
         */
        @JsonIgnore
        private JobDataMap dataMap = new JobDataMap();
    
        public JobKey getJobKey(){
            return JobKey.jobKey(jobName, jobGroup);// 任务名称和组构成任务key
        }
        ...
    }
    

    可以看到Job中定义了任务的一些基本信息,重点关注其中的dataSourcedataMap属性。其中dataSource是任务所使用的数据源,并给了一个默认值;由于任务在添加后会持久化到数据库中,之后解析任务就会用到dataMap

    SchedulerConfig

    在添加Job的时候,JobDetailTrigger都是通过关键字new生成的,而调度器Scheduler则需要放在容器中维护。

    @Configuration
    @Order
    public class SchedulerConfig {
        @Autowired
        private MyJobFactory myJobFactory;
    
        @Value("${spring.profiles.active}")
        private String profile;
    
        /*
         * 通过SchedulerFactoryBean获取Scheduler的实例
         */
        @Bean(name = "scheduler")
        public Scheduler scheduler() throws Exception {
            return schedulerFactoryBean().getScheduler();
        }
        
        @Bean
        public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
            SchedulerFactoryBean factory = new SchedulerFactoryBean();
    
            factory.setOverwriteExistingJobs(true);
    
            // 延时启动
            factory.setStartupDelay(20);
    
            // 加载quartz数据源配置
            factory.setQuartzProperties(quartzProperties());
    
            // 自定义Job Factory,用于Spring注入
            factory.setJobFactory(myJobFactory);
            /*********全局监听器配置************/
            JobListener myJobListener = new SchedulerListener();
            factory.setGlobalJobListeners(myJobListener);//直接添加为全局监听器
            return factory;
        }
    
        @Bean
        public Properties quartzProperties() throws IOException {
            PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
            if (Util.PRODUCT.equals(profile)) {//正式环境
                System.out.println("正式环境quartz配置");
                propertiesFactoryBean.setLocation(new ClassPathResource("/quartz-prod.properties"));
            } else {
                System.out.println("测试环境quartz配置");
                propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
            }
            //在quartz.properties中的属性被读取并注入后再初始化对象
            propertiesFactoryBean.afterPropertiesSet();
            return propertiesFactoryBean.getObject();
        }
    
        /*
         * quartz初始化监听器
         */
        @Bean
        public QuartzInitializerListener executorListener() {
            return new QuartzInitializerListener();
        }
    }
    

    上述代码中,将scheduler加入到Spring容器中。scheduler是由SchedulerFactoryBean进行维护的,在SchedulerFactoryBean中对调度器工厂做了一些基本设置并从配置文件中加载了quartz数据源配置(配置文件的读取会根据运行环境profile来进行自动切换),配置了一个全局监听器用以监听任务的执行过程。

    MyJobFactory

    使用Spring提供的JobFactory

    @Component
    public class MyJobFactory extends AdaptableJobFactory {
    
        @Autowired
        private AutowireCapableBeanFactory capableBeanFactory;
    
        @Override
        protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
            // 调用父类的方法
            Object jobInstance = super.createJobInstance(bundle);
            // 进行注入
            capableBeanFactory.autowireBean(jobInstance);
            return jobInstance;
        }
    }
    

    quartz.properties

    quartz.properties中是quartz连接数据库的一些配置信息。

    # \u56FA\u5B9A\u524D\u7F00org.quartz
    # \u4E3B\u8981\u5206\u4E3Ascheduler\u3001threadPool\u3001jobStore\u3001plugin\u7B49\u90E8\u5206
    #
    #
    org.quartz.scheduler.instanceName = DefaultQuartzScheduler
    org.quartz.scheduler.rmi.export = false
    org.quartz.scheduler.rmi.proxy = false
    org.quartz.scheduler.wrapJobExecutionInUserTransaction = false
    
    # \u5B9E\u4F8B\u5316ThreadPool\u65F6\uFF0C\u4F7F\u7528\u7684\u7EBF\u7A0B\u7C7B\u4E3ASimpleThreadPool
    org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
    
    # threadCount\u548CthreadPriority\u5C06\u4EE5setter\u7684\u5F62\u5F0F\u6CE8\u5165ThreadPool\u5B9E\u4F8B
    # \u5E76\u53D1\u4E2A\u6570
    org.quartz.threadPool.threadCount = 5
    # \u4F18\u5148\u7EA7
    org.quartz.threadPool.threadPriority = 5
    org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true
    
    org.quartz.jobStore.misfireThreshold = 5000
    
    # \u9ED8\u8BA4\u5B58\u50A8\u5728\u5185\u5B58\u4E2D
    #org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore
    
    #\u6301\u4E45\u5316
    org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
    
    #org.quartz.jobStore.useProperties=false
    
    org.quartz.jobStore.tablePrefix = QRTZ_
    
    org.quartz.jobStore.dataSource = qzDS
    
    org.quartz.dataSource.qzDS.driver = com.mysql.jdbc.Driver
    org.quartz.dataSource.qzDS.URL=jdbc:mysql://127.0.0.1:3306/quartz?characterEncoding=UTF-8&useSSL=false&testOnBorrow=true&testWhileIdle=true
    org.quartz.dataSource.qzDS.user=quartz
    org.quartz.dataSource.qzDS.password=123456
    
    org.quartz.dataSource.qzDS.maxConnections = 30
    
    org.quartz.dataSource.qzDS.validationQuery = SELECT 1 FROM DUAL
    
    org.quartz.dataSource.qzDS.validateOnCheckout = true
    org.quartz.dataSource.qzDS.idleConnectionValidationSeconds = 40
    
    
    #org.quartz.dataSource.qzDS.discardIdleConnectionsSeconds = 60
    
    

    quartz会根据这个配置文件将Job持久化到数据库中,也因此quartz会需要初始化一些数据库表,表结构文件在文末。

    SchedulerListener

    调度器监听器用以监听任务的执行状态。

    public class SchedulerListener implements JobListener {
    
        private final Logger LOG = LoggerFactory.getLogger(SchedulerListener.class);
    
        public static final String LISTENER_NAME = "QuartSchedulerListener";
    
        @Override
        public String getName() {
            return LISTENER_NAME; //must return a name
        }
    
        //任务被调度前
        @Override
        public void jobToBeExecuted(JobExecutionContext context) {
            String dataSource = context.getJobDetail().getJobDataMap().getString("dataSource");
            // 切换任务的数据源
            DataSourceContextHolder.setDB(dataSource);
            String jobName = context.getJobDetail().getKey().toString();
            LOG.info("Job {} is going to start,switch dataSource to {},Thread name {}", jobName, dataSource, Thread.currentThread().getName());
        }
    
        //任务调度被拒了
        @Override
        public void jobExecutionVetoed(JobExecutionContext context) {
            String jobName = context.getJobDetail().getKey().toString();
            LOG.error("job {} is jobExecutionVetoed", jobName);
            //可以做一些日志记录原因
    
        }
    
        //任务被调度后
        @Override
        public void jobWasExecuted(JobExecutionContext context,
                                   JobExecutionException jobException) {
            // 清空存储的数据源
            String jobName = context.getJobDetail().getKey().toString();
            DataSourceContextHolder.clearDB();
            LOG.info("Job : {} is finished", jobName);
            if (jobException != null && !jobException.getMessage().equals("")) {
                LOG.error("Exception thrown by: " + jobName
                        + " Exception: " + jobException.getMessage());
            }
    
        }
    }
    

    SchedulerListener监听任务被调度前、调度后和调度被拒绝时的状态,在任务被调度之前和之后对任务所使用的数据源进行了处理。如果项目中不需要数据源切换的话,这个监听器是不需要的,到此已经完成了quartz的集成。

    多数据源切换

    多数据源切换

    通过自定义DynamicDataSource来覆盖Spring Boot中原有的数据源。

    DataSourceConfig

    通过读取配置文件中不同的数据源,初始化项目中可能用到的数据源用以切换。

    /**
     * 多数据源配置类
     */
    @Configuration
    public class DataSourceConfig {
        //数据源1
        @Bean(name = "datasource1")
        @ConfigurationProperties(prefix = "spring.datasource.db1") // application.properteis中对应属性的前缀
        public DataSource dataSource1() {
            return DataSourceBuilder.create().build();
        }
    
        //数据源2
        @Bean(name = "datasource2")
        @ConfigurationProperties(prefix = "spring.datasource.db2") // application.properteis中对应属性的前缀
        public DataSource dataSource2() {
            return DataSourceBuilder.create().build();
        }
    
        /**
         * 动态数据源: 通过AOP在不同数据源之间动态切换
         *
         * @return
         */
        @Primary
        @Bean(name = "dynamicDataSource")
        public DataSource dynamicDataSource() {
            DynamicDataSource dynamicDataSource = new DynamicDataSource();
            // 默认数据源
            dynamicDataSource.setDefaultTargetDataSource(dataSource1());
            // 配置多数据源
            Map<Object, Object> dsMap = new HashMap();
            dsMap.put(DataSourceEnum.DB1.getName(), dataSource1());
            dsMap.put(DataSourceEnum.DB2.getName(), dataSource2());
    
            dynamicDataSource.setTargetDataSources(dsMap);
            return dynamicDataSource;
        }
    
        @Bean
        public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
            SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
            //设置数据源
            sqlSessionFactoryBean.setDataSource(dataSource);
            return sqlSessionFactoryBean.getObject();
        }
    
        /**
         * 配置@Transactional注解事物
         *
         * @return
         */
        @Bean
        public PlatformTransactionManager transactionManager() {
            return new DataSourceTransactionManager(dynamicDataSource());
        }
    }
    

    数据源配置

    spring:
      datasource:
        db1:
          driver-class-name: com.mysql.cj.jdbc.Driver
          username: doctor
          password: 123456
          type: com.zaxxer.hikari.HikariDataSource
          jdbc-url: jdbc:mysql://127.0.0.1:3306/doctor?useSSL=false&testOnBorrow=true&testWhileIdle=true
        db2:
          driver-class-name: com.mysql.cj.jdbc.Driver
          username: quartz
          password: 123456
          type: com.zaxxer.hikari.HikariDataSource
          jdbc-url: jdbc:mysql://127.0.0.1:3307/quartz?useSSL=false&testOnBorrow=true&testWhileIdle=true
    

    DataSourceContextHolder

    由于quartz在执行过程中是通过不同的线程来执行Job的,因此此处通过ThreadLocal来保存线程所使用的数据源情况。

    /**
     * 保存本地数据源
     */
    public class DataSourceContextHolder {
        private static final Logger LOG = LoggerFactory.getLogger(DataSourceContextHolder.class);
        /**
         * 默认数据源
         */
        public static final String DEFAULT_DS = DataSourceEnum.DB1.getName();
        /**
         * ThreadLocal之后会进行讲解
         */
        private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
    
        // 设置数据源名
        public static void setDB(String dbType) {
            LOG.info("切换到{}数据源", dbType);
            contextHolder.set(dbType);
        }
    
        // 获取数据源名
        public static String getDB() {
            return (contextHolder.get());
        }
    
        // 清除数据源名
        public static void clearDB() {
            contextHolder.remove();
        }
    }
    

    DynamicDataSource

    获取执行中所使用的数据源。由于数据源被保存在了DataSourceContextHolder中的ThreadLocal中,所以直接获取就行了。

    /**
     * 获取本地数据源
     */
    public class DynamicDataSource extends AbstractRoutingDataSource {
        private static final Logger LOG = LoggerFactory.getLogger(DynamicDataSource.class);
    
        @Override
        protected Object determineCurrentLookupKey() {
            LOG.info("数据源为{}", DataSourceContextHolder.getDB());
            return DataSourceContextHolder.getDB();
        }
    }
    

    至此就完成了集成quartz及数据源切换的功能。然后就是具体的任务了。

    执行任务

    具体的任务需要继承BaseJob并在execute方法中重写具体需要执行的任务。

    execute

    @Slf4j
    @Service
    public class ReadNumJob extends BaseJob {
    
        @Autowired
        private RedisService redisService;
    
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        private final Logger LOG = LoggerFactory.getLogger(ReadNumJob.class);
    
        @Override
        public void execute(JobExecutionContext context) {
           doSomething();
        }
    }
    

    指定数据源

    然后在添加任务时指定任务所使用的数据源

    ReadNumJob job = new ReadNumJob();
    job.setJobName("test");
    job.setJobGroup("hys");
    job.setDescription("test");
    // 指定数据源
    job.getDataMap().put("dataSource", DataSourceEnum.DB1.getName());
    job.setCronExpression(
    "0 */1 * * * ?"
    );
    try {
    jobAndTriggerService.addJob(job);
    } catch (SchedulerException e) {
    e.printStackTrace();
    }
    

    源码

    转评赞就是最大的鼓励

    相关文章

      网友评论

          本文标题:Spring Boot集成quartz实现定时任务并支持切换任务

          本文链接:https://www.haomeiwen.com/subject/xubmectx.html