美文网首页
SpringBoot整合任务调度框架Quartz的基础搭建

SpringBoot整合任务调度框架Quartz的基础搭建

作者: 林千景 | 来源:发表于2019-02-13 22:43 被阅读0次

    Quartz的整体概括

    什么是quartz

    何为quartz,请看官网的说法:

    Quartz is a richly featured, open source job scheduling library that can be integrated within virtually any Java application - from the smallest stand-alone application to the largest e-commerce system. Quartz can be used to create simple or complex schedules for executing tens, hundreds, or even tens-of-thousands of jobs; jobs whose tasks are defined as standard Java components that may execute virtually anything you may program them to do. The Quartz Scheduler includes many enterprise-class features, such as support for JTA transactions and clustering.

    简单来说,quartz是一个开源任务调度库,可以用来创建简单或复杂的调度,低至十个多至数百万个。它是一个标准的java组件,支持JTA,集群等多种企业级功能。

    市面上有很多定时任务框架在quartz的基础上做了二次开发,xxl-job(基于quartz),elastic-job(基于quartz和zk),所以quartz到底是怎么玩的,它有哪些特性,下面来聊一聊。

    quartz的基本概念

    • 任务(Job):实际要触发的事件
    • 触发器(Trigger):用于设定时间规则
    • 调度器(Scheduler):组合任务与触发器

    quartz就这三样东西,我们新建作业,通过trigger设置规则触发,由scheduler进行整合,非常简单。

    Springboot整合quartz的基础搭建

    一般企业级项目开发都用的Springboot,下面就来讲一讲quartz整合Springboot的一些要点。

    依赖

    quartz版本2.3.0,springboot版本1.5.18.RELEASE

    <properties>
            <java.version>1.8</java.version>
            <druid.version>1.1.5</druid.version>
            <quartz.version>2.3.0</quartz.version>
            <fastjson.version>1.2.40</fastjson.version>
            <mybatis.version>1.3.0</mybatis.version>
            <log4j.version>1.2.16</log4j.version>
            <slf4j-api.version>1.7.7</slf4j-api.version>
            <slf4j-log4j12.version>1.7.7</slf4j-log4j12.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jdbc</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <scope>runtime</scope>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid-spring-boot-starter</artifactId>
                <version>${druid.version}</version>
            </dependency>
    
            <!--quartz相关依赖-->
            <dependency>
                <groupId>org.quartz-scheduler</groupId>
                <artifactId>quartz</artifactId>
                <version>${quartz.version}</version>
            </dependency>
            <dependency>
                <groupId>org.quartz-scheduler</groupId>
                <artifactId>quartz-jobs</artifactId>
                <version>${quartz.version}</version>
            </dependency>
            <!--定时任务需要依赖context模块-->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context-support</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>${fastjson.version}</version>
            </dependency>
            <dependency>
                <groupId>org.mybatis.spring.boot</groupId>
                <artifactId>mybatis-spring-boot-starter</artifactId>
                <version>${mybatis.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.3.2</version>
            </dependency>
    
            <!-- log4j日志 -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>${slf4j-api.version}</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>${slf4j-log4j12.version}</version>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>${log4j.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-aop</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    

    Configuration

    通过AutowireCapableBeanFactory,使用spring注入的方式实现在job里注入springbean

    /**
         * 继承org.springframework.scheduling.quartz.SpringBeanJobFactory
         * 实现任务实例化方式
         */
        public static class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements
                ApplicationContextAware {
    
            private transient AutowireCapableBeanFactory beanFactory;
    
            @Override
            public void setApplicationContext(final ApplicationContext context) {
                beanFactory = context.getAutowireCapableBeanFactory();
            }
    
            /**
             * 将job实例交给spring ioc托管
             * 我们在job实例实现类内可以直接使用spring注入的调用被spring ioc管理的实例
             *
             * @param bundle
             * @return
             * @throws Exception
             */
            @Override
            protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
                final Object job = super.createJobInstance(bundle);
                /**
                 * 将job实例交付给spring ioc
                 */
                beanFactory.autowireBean(job);
                return job;
            }
        }
    
        /**
         * 配置任务工厂实例
         *
         * @param applicationContext spring上下文实例
         * @return
         */
        @Bean
        public JobFactory jobFactory(ApplicationContext applicationContext) {
            /**
             * 采用自定义任务工厂 整合spring实例来完成构建任务
             * see {@link AutowiringSpringBeanJobFactory}
             */
            AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
            jobFactory.setApplicationContext(applicationContext);
            return jobFactory;
        }
    
        /**
         * 配置任务调度器
         * 使用项目数据源作为quartz数据源
         *
         * @param jobFactory 自定义配置任务工厂
         * @param dataSource 数据源实例
         * @return
         * @throws Exception
         */
        @Bean(destroyMethod = "destroy", autowire = Autowire.NO)
        public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory, DataSource dataSource) throws Exception {
            SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
            //将spring管理job自定义工厂交由调度器维护
            schedulerFactoryBean.setJobFactory(jobFactory);
            //设置覆盖已存在的任务
            schedulerFactoryBean.setOverwriteExistingJobs(true);
            //项目启动完成后,等待2秒后开始执行调度器初始化
            schedulerFactoryBean.setStartupDelay(2);
            //设置调度器自动运行
            schedulerFactoryBean.setAutoStartup(true);
            //设置数据源,使用与项目统一数据源
            schedulerFactoryBean.setDataSource(dataSource);
            //设置上下文spring bean name
            schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContext");
            //设置配置文件位置
            schedulerFactoryBean.setConfigLocation(new ClassPathResource("/quartz.properties"));
            return schedulerFactoryBean;
        }
    

    这里需要提到一点,由于job的初始化时是通过new出来的,不受spring的管理,无法接受业务相关的bean,故这里使用AutowireCapableBeanFactory实现了new出来的对象通过注解可注入受spring管理的bean了。

    AbstractAutowireCapableBeanFactory#autowireBean

    @Override
        public void autowireBean(Object existingBean) {
            // Use non-singleton bean definition, to avoid registering bean as dependent bean.
            RootBeanDefinition bd = new RootBeanDefinition(ClassUtils.getUserClass(existingBean));
            bd.setScope(BeanDefinition.SCOPE_PROTOTYPE);
            bd.allowCaching = ClassUtils.isCacheSafe(bd.getBeanClass(), getBeanClassLoader());
            BeanWrapper bw = new BeanWrapperImpl(existingBean);
            initBeanWrapper(bw);
            populateBean(bd.getBeanClass().getName(), bd, bw);
        }
    

    由源码可知,此类调用了populateBean的方法用来装配bean。具体spring的bean的加载注册过程可参考spring.io

    通过schedulerFactoryBeanConfigLocation来读取quartz的基本配置信息,注意quartz.properties配置文件一定要放在classpath下。

    #调度器实例名称
    org.quartz.scheduler.instanceName = quartzScheduler
    
    #调度器实例编号自动生成
    org.quartz.scheduler.instanceId = AUTO
    
    #持久化方式配置
    org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
    
    #持久化方式配置数据驱动,MySQL数据库
    org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
    #quartz相关数据表前缀名
    org.quartz.jobStore.tablePrefix = QRTZ_
    
    #开启分布式部署
    org.quartz.jobStore.isClustered = true
    #配置是否使用
    org.quartz.jobStore.useProperties = false
    
    #分布式节点有效性检查时间间隔,单位:毫秒
    org.quartz.jobStore.clusterCheckinInterval = 10000
    
    #线程池实现类
    org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
    
    #执行最大并发线程数量
    org.quartz.threadPool.threadCount = 10
    
    #线程优先级
    org.quartz.threadPool.threadPriority = 5
    
    #配置为守护线程,设置后任务将不会执行
    #org.quartz.threadPool.makeThreadsDaemons=true
    
    #配置是否启动自动加载数据库内的定时任务,默认true
    org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true
    

    我们看到org.quartz.jobStore.class进行持久化配置设置成了JobStoreTX属性,需要建立数据库表进行任务信息的持久化。其实官方还有一种RAMJobStore用于存储内存中的调度信息,当进程终止时,所有调度信息都将丢失。本文使用JobStoreTX(需要建立quartz的大概10张表,建表语句传在了github)。

    Job&JobDetail

    JobDetail作为Job的实例,一般由静态方法JobBuilder创建,通过fluent风格链式构建了Job的各项属性,

    其中newJob需要一个泛型上限为Job的入参。

    // 构建job信息
    JobDetail job = JobBuilder.newJob(DynamicQuartzJob.class)
              .withIdentity(jobKey) //jobName+jobGroup
              .withDescription(quartzJobDetails.getDescription())
              .usingJobData("jobData", quartzJobDetails.getJobData())
              .build();
    

    而Job接口只有一个简单的方法:

    public interface Job {
        void execute(JobExecutionContext context)
            throws JobExecutionException;
    }
    

    当定时任务跑起来的时候,execute里的代码将会被执行。

    比如我们创建一个简单的定时任务:

    public class QuartzTest extends QuartzJobBean
    static Logger logger = LoggerFactory.getLogger(QuartzTest.class);
    {
        @Override
        protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
            logger.info("我是测试任务,我跑起来了,时间:{}",new Date());
        }
    
    

    注:QuartzJobBean是Job接口的实现类。

    Trigger

    JobDetailJobBuilder类的静态方法构建,同样,Trigger触发器由TriggerBuilder的静态方法构建。

    // 构建job的触发规则 cronExpression
        Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity(triggerKey)
                .startNow()
                .withSchedule(CronScheduleBuilder
                .cronSchedule(quartzJobDetails.getCronExpression()))
                .build();
    

    Trigger触发器用于触发任务作业,当trigger触发器触发执行时,scheduler调度程序中的其中一个线程将调用execute()的一个线程。quartz最常用的触发器分为SimpleTriggerCronTrigger触发器两种。
    SimpleTrigger用于在给定时间执行一次作业,或给定时间每隔一段时间执行一次作业。这个功能Springboot@scheduled注解也能实现。
    如果是希望以日历时间表触发,则CronTrigger就比较合适例如每周六下午3点执行,我们完全可以用cron表达式实现日历触发的时间规则,cron表达式可由quartzJobDetails对象的CronExpression属性传入。

    最后,别忘了用schedulerjobtrigger整合起来,因为他们是统一协作的:

    // 注册job和trigger信息
    scheduler.scheduleJob(job, trigger);
    

    JobDataMap

    一般业务方法会要求动态传参处理,这时候就需要jobDataMap来进行参数传递了。我们在构建JobDetail的时候,通过

    usingJobData("jobData", quartzJobDetails.getJobData())
    

    动态传入调度任务所需的参数,以达到业务需求。

    JobListener&TriggerListener

    用于在任务调度期间,各阶段的状态解读。这里我就以JobListener为例,TriggerListener也是相似的。

    首先,构建jobListener

    jobContext.getScheduler().getListenerManager().addJobListener(new MyJobListener(), KeyMatcher.keyEquals(jobKey));
    

    这里我是在executeInternal方法里面构建的,因为listner不会持久化,服务重启将会丢失监听。当然在构建job的时候也可以注册listener,如果没持久化监听的需求的话。

    看一下MyJobListener:

    public class MyJobListener implements JobListener {
        public static final String LISTENER_NAME = "MyJobListener";
    
    //    @Autowired
    //    private JobScheduleLogMapper logMapper;
    
        @Override
        public String getName() {
            return LISTENER_NAME; //must return a name
        }
    
        //任务被调度前
        @Override
        public void jobToBeExecuted(JobExecutionContext context) {
    
            String jobName = context.getJobDetail().getKey().toString();
    //        System.out.println("jobToBeExecuted");
            System.out.println("Job调度前 : " + jobName + " is going to start...");
    
        }
    
        //任务调度被拒了
        @Override
        public void jobExecutionVetoed(JobExecutionContext context) {
            System.out.println("Job调度被拒:jobExecutionVetoed");
            //todo:原因捕获
    
        }
    
        //任务被调度后
        @Override
        public void jobWasExecuted(JobExecutionContext context,
                                   JobExecutionException jobException) {
    //        System.out.println("Job调度器:jobWasExecuted");
    
            String jobName = context.getJobDetail().getKey().toString();
            System.out.println("Job调度后 : " + jobName + " is finished...");
    
            if (jobException!=null&&!jobException.getMessage().equals("")) {
                System.out.println("Exception thrown by: " + jobName
                        + " Exception: " + jobException.getMessage());
            }
            JobScheduleLog log = new JobScheduleLog();
            log.setJobRuntime(String.valueOf(context.getJobRunTime()));
            log.setId(Optional.ofNullable(context.get("id")).map(p->Integer.parseInt(String.valueOf(context.get("id")))).orElse(null));
            JobScheduleLogMapper logMapper = SpringContextHolder.getBean(JobScheduleLogMapper.class);
            logMapper.updateByPrimaryKeySelective(log);
        }
    }
    

    任务调度前,调度后已经任务被拒,我们都可以使用钩子。

    动态构建任务调度

    下一个问题,我们知道新建一个调度job只要继承QuartzJobBean类并实现executeInternal就行,那么如果我有成百上千个任务,难道我要新建几千个类么?如果我想把已有的方法加入定时任务调度,难道我还要去改造原有的方法么?
    必然不是的,这时候我们可以新建一个动态类继承QuartzJobBean,并新建自己的业务表(例如建一个jobCaller表),传入项目方法的全类路径,这样我们就可以executeInternal方法里通过读表拉取需要调度的任务方法,通过jobDataMap拿到参数,通过反射直接invoke目标方法了,这样就省去了大量的构建调度任务的工作了,并且可以在不动原有业务代码的基础上,定向指定任何一个方法加入任务调度了。
    ok,talk is cheap, show me the code:

    public class DynamicQuartzJob extends QuartzJobBean {
        @Autowired
        private JobScheduleLogManager jobManager;
    
        @Override
        protected void executeInternal(JobExecutionContext jobContext) {
            try {
                int i = jobManager.trans2JobLogBefore(jobContext);
                if (i <= 0) return;
                JobDetailImpl jobDetail = (JobDetailImpl) jobContext.getJobDetail();
                String name = jobDetail.getName();
                if (StringUtils.isEmpty(name)) {
                    throw new JobExecutionException("can not find service info, because desription is empty");
                }
                //注册job和trigger的监听器
                JobKey jobKey = jobContext.getJobDetail().getKey();
                TriggerKey triggerKey = jobContext.getTrigger().getKey();
                jobContext.getScheduler().getListenerManager().addJobListener(new MyJobListener(), KeyMatcher.keyEquals(jobKey));
                jobContext.getScheduler().getListenerManager().addTriggerListener(new MyTriggerListener(), KeyMatcher.keyEquals(triggerKey));
    
                String[] serviceInfo = StringUtils.delimitedListToStringArray(name, ".");
                // serviceInfo[0] is JOB_NAME_PREFIX
                String beanName = serviceInfo[1];
                String methodName = serviceInfo[2];
                Object serviceImpl = getApplicationContext(jobContext).getBean(beanName);
                Method method;
                Class<?>[] parameterTypes = new Class[]{String.class};
                Object[] arguments = null;
                method = serviceImpl.getClass().getMethod(methodName, parameterTypes);
                method.invoke(serviceImpl, jobContext.getJobDetail().getJobDataMap().getString("jobData"));
                jobManager.trans2JobLogAfter(jobContext, i);
            } catch (Exception ex) {
     ErrorLog.errorConvertJson(ApplicationContextWare.getAppName(), LogTreadLocal.getTrackingNo(), this.getClass(), "quartz定时任务execute异常", ex);
            }
        }
    

    这里方法签名参数我设定了一个String类型的形参,其实可以在添加任务到jobCaller表的时候带上参数,executeInternal的时候读表拉取方法签名。当然也可以传一个大json,目标方法自己解析。

    最佳实践

    这里我新建了一张job_caller表,用于记录我的jobName(类名.方法名),jobGroup(没有就默认),jobData(jobDatamap),以及cron表达式。

    image-20190203160812939.png

    可以看到我们传入的时间规则是每隔10秒执行一次,调度的是HelloServicesayHello()方法,传入的参数是xgj111111.

    image-20190203160639805.png

    看一下HelloService的sayHello()方法做了什么:

    @Component
    public class HelloService {
    
        public void sayHello(String a) {
            System.out.println(a+"======hello world, i am quartz");
        }
    
        public void callHello(String b) {
            System.out.println(b+"======call");
        }
    }
    

    ok,只是简单的打印,来看看效果:

    image-20190203160412045.png

    每隔10秒(时间打印忘加了~),sayHello都将会被执行,并且监听器能捕获到各个阶段。

    单节点服务重启调度恢复

    由于任务是持久化在表里的,在服务重启后,quartz仍然可以去恢复调度任务,并且能够预先执行misfire的任务,这里就不演示了,很简单的。

    多节点分布式调度漂移

    这个就比较有意思了,在多个节点调度确定的任务时,分布式环境下,某个节点宕机,这个节点调度的作业能否自动漂移到其他节点?

    quartz.properties里,org.quartz.jobStore.isClustered开启了分布式的配置,此属性设置为true,quartz将使用ClusterManager来初始化节点。

    基于上一个调度HelloService#sayHello,我们再新增一个调度用于调用HelloService#callHello,同时新增一个quartz节点。(为何第二个节点能调度callHello?==>基于quartz的负载均衡),如图:

    image-20190203165316261.png image-20190203165207341.png

    启动两个服务,分别监听在81238124端口:

    image-20190203170522365.png image-20190203170536220.png

    8123调度的是callHello

    image-20190203165407608.png

    8124调度的是sayHello

    image-20190203165421036.png

    这时候,我们把8123服务停掉,看看8124的调度情况:

    停止8123服务:

    image-20190203165608268.png

    这时候可以发现,81248123的任务接管过来了:

    image-20190203165635596.png

    于是可以得出结论:在分布式场景下,当quartz集群的某一台服务宕机,其所调度的任务将被其他服务接管,所以quartz是支持任务漂移的。

    那么如果这时候,我再讲8123起来会是什么情况呢?聪明的我和你应该都想到了,它由继续接管callHello的任务调度了。

    quartz的缺陷

    • 强依赖于各节点的系统时间,多节点系统时间不一致将会出现调度紊乱的情况
    • 容易造成数据库死锁(一个任务只能由一个线程来调度,这是由quartz_lock表的行锁来实现的,可以通过设置数据库事务级别来解决,不过也有说设置了也出现deadlock的)

    以上是我的一些基本见解和尝试。

    代码已上传至GitHub:https://github.com/xugejunllt/quartz-framework

    相关文章

      网友评论

          本文标题:SpringBoot整合任务调度框架Quartz的基础搭建

          本文链接:https://www.haomeiwen.com/subject/wkbjeqtx.html