美文网首页SpringSpringBoot
springboot整合quartz定时任务

springboot整合quartz定时任务

作者: 东方舵手 | 来源:发表于2018-08-23 19:37 被阅读507次

    1. quartz的基本实现原理

    ** Quartz 核心元素 **

    Quartz任务调度的核心元素为:

    Scheduler——任务调度器

    Trigger——触发器

    Job——任务

    其中trigger和job是任务调度的元数据,scheduler是实际执行调度的控制器。

    Trigger是用于定义调度时间的元素,即按照什么时间规则去执行任务。Quartz中主要提供了四种类型的trigger:SimpleTrigger,CronTirgger,DateIntervalTrigger,和NthIncludedDayTrigger。这四种trigger可以满足企业应用中的绝大部分需求。

    Job用于表示被调度的任务。主要有两种类型的job:无状态的(stateless)和有状态的(stateful)。对于同一个trigger来说,有状态的job不能被并行执行,只有上一次触发的任务被执行完之后,才能触发下一次执行。Job主要有两种属性:volatility和durability,其中volatility表示任务是否被持久化到数据库存储,而durability表示在没有trigger关联的时候任务是否被保留。两者都是在值为true的时候任务被持久化或保留。一个job可以被多个trigger关联,但是一个trigger只能关联一个job。

    Scheduler由scheduler工厂创建:DirectSchedulerFactory或者StdSchedulerFactory。第二种工厂StdSchedulerFactory使用较多,因为DirectSchedulerFactory使用起来不够方便,需要作许多详细的手工编码设置。Scheduler主要有三种:RemoteMBeanScheduler,RemoteScheduler和StdScheduler。

    Quartz 线程

    在Quartz中,有两类线程,Scheduler调度线程和任务执行线程,其中任务执行线程通常使用一个线程池维护一组线程。

    Scheduler调度线程主要有两个:执行常规调度的线程,和执行misfiredtrigger的线程。

    常规调度线程轮询存储的所有trigger,如果有需要触发的trigger,即到达了下一次触发的时间,则从任务执行线程池获取一个空闲线程,执行与该trigger关联的任务。

    Misfire线程是扫描所有的trigger,查看是否有misfiredtrigger,如果有的话根据misfire的策略分别处理(fire now OR wait for the next fire)

    Quartz Job数据存储

    Quartz中的trigger和job需要存储下来才能被使用。Quartz中有两种存储方式:RAMJobStore,JobStoreSupport,其中RAMJobStore是将trigger和job存储在内存中,而JobStoreSupport是基于jdbc将trigger和job存储到数据库中。RAMJobStore的存取速度非常快,但是由于其在系统被停止后所有的数据都会丢失,所以在集群应用中,必须使用JobStoreSupport。

    Quartz 集群架构

    一个Quartz集群中的每个节点是一个独立的Quartz应用,它又管理着其他的节点。这就意味着你必须对每个节点分别启动或停止。Quartz集群中,独立的Quartz节点并不与另一其的节点或是管理节点通信,而是通过相同的数据库表来感知到另一Quartz应用的,如图2.1所示。

    ** Quartz集群相关数据库表**

    因为Quartz集群依赖于数据库,所以必须首先创建Quartz数据库表,Quartz发布包中包括了所有被支持的数据库平台的SQL脚本。这些SQL脚本存放于<quartz_home>/docs/dbTables 目录下。

    1、QRTZ_JOB_DETAILS:存储的是job的详细信息,包括:[DESCRIPTION]描述,[IS_DURABLE]是否持久化,[JOB_DATA]持久化对象等基本信息。

    2、QRTZ_TRIGGERS:触发器信息,包含:job的名,组外键,[DESCRIPTION]触发器的描述等基本信息,还有[START_TIME]开始执行时间,[END_TIME]结束执行时间,[PREV_FIRE_TIME]上次执行时间,[NEXT_FIRE_TIME]下次执行时间,[TRIGGER_TYPE]触发器类型:simple和cron,[TRIGGER_STATE]执行状态:WAITING,PAUSED,ACQUIRED分别为:等待,暂停,运行中。

    3、QRTZ_CRON_TRIGGERS:保存cron表达式。

    4、QRTZ_SCHEDULER_STATE:存储集群中note实例信息,quartz会定时读取该表的信息判断集群中每个实例的当前状态,INSTANCE_NAME:之前配置文件中org.quartz.scheduler.instanceId配置的名字,就会写入该字段,如果设置为AUTO,quartz会根据物理机名和当前时间产生一个名字。 [LAST_CHECKIN_TIME]上次检查时间,[CHECKIN_INTERVAL]检查间隔时间。

    5、QRTZ_PAUSED_TRIGGER_GRPS:暂停的任务组信息。

    6、QRTZ_LOCKS,悲观锁发生的记录信息。

    7、QRTZ_FIRED_TRIGGERS,正在运行的触发器信息。

    8、QRTZ_SIMPLE_TRIGGERS,简单的触发器详细信息。

    9、QRTZ_BLOB_TRIGGERS,触发器存为二进制大对象类型(用于Quartz用户自己触发数据库定制自己的触发器,然而JobStore不明白怎么存放实例的时候)。

    10、QRTZ_CALENDARS,以 Blob 类型存储 Quartz 的 Calendar 信息

    数据库相关表介绍:http://www.cnblogs.com/zhenyuyaodidiao/p/4755649.html

    参数配置介绍:http://blog.csdn.net/zixiao217/article/details/53091812

    2. pom.xml引入依赖

    <!--集成quartz-->
    <dependency>
        <groupId>org.quartz-scheduler</groupId>
        <artifactId>quartz</artifactId>
        <version>${quartz.version}</version>
        <!-- quartz默认使用c3p0连接池,如果项目使用的不是则需要排除依赖包 -->
        <exclusions>
            <exclusion>
                <artifactId>c3p0</artifactId>
                <groupId>c3p0</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context-support</artifactId>
    </dependency>
    

    3. 创建配置文件类

    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.quartz.SchedulerFactoryBean;
    
    import javax.sql.DataSource;
    import java.util.Properties;
    
    /**
     * 定时任务配置
     *
     */
    @Configuration
    public class QuartzConfig {
    
        @Bean
        public SchedulerFactoryBean scheduler(@Qualifier("druidDataSource") DataSource dataSource) {
    
            //quartz参数
            Properties prop = new Properties();
            //配置实例
            //prop.put("org.quartz.scheduler.instanceName", "MyScheduler");//实例名称
            prop.put("org.quartz.scheduler.instanceId", "AUTO");
            //线程池配置
            prop.put("org.quartz.threadPool.threadCount", "5");
            //JobStore配置
            prop.put("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
            prop.put("org.quartz.jobStore.tablePrefix", "QRTZ_");
    
            SchedulerFactoryBean factory = new SchedulerFactoryBean();
            factory.setDataSource(dataSource);
            factory.setQuartzProperties(prop);
            factory.setSchedulerName("MyScheduler");//数据库中存储的名字
            //QuartzScheduler 延时启动,应用启动5秒后 QuartzScheduler 再启动
            factory.setStartupDelay(5);
    
            //factory.setApplicationContextSchedulerContextKey("applicationContextKey");
            //可选,QuartzScheduler 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了
            factory.setOverwriteExistingJobs(true);
            //设置自动启动,默认为true
            factory.setAutoStartup(true);
    
            return factory;
        }
    }
    

    4. 创建自定义Job任务类,继承QuartzJobBean

    import com.alibaba.fastjson.JSON;
    import com.qfedu.rongzaiboot.entity.ScheduleJob;
    import com.qfedu.rongzaiboot.entity.ScheduleJobLog;
    import com.qfedu.rongzaiboot.service.ScheduleJobLogService;
    import com.qfedu.rongzaiboot.utils.SpringContextUtils;
    import org.apache.commons.lang.StringUtils;
    import org.quartz.JobExecutionContext;
    import org.quartz.JobExecutionException;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.scheduling.quartz.QuartzJobBean;
    import org.springframework.util.ReflectionUtils;
    
    import java.lang.reflect.Method;
    import java.util.Date;
    
    public class QuartzJob extends QuartzJobBean {
        private Logger logger = LoggerFactory.getLogger(getClass());
    
        @Override
        protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
            System.out.println("执行quartz任务。。。。。");
    
            String json = context.getMergedJobDataMap().getString("JOB_PARAM_KEY");
            //将获取的对象序列化的json 转化为实体类对象
            ScheduleJob scheduleJob = JSON.parseObject(json, ScheduleJob.class);
    
            Long jobId = scheduleJob.getJobId();
            String beanName = scheduleJob.getBeanName();
            String methodName = scheduleJob.getMethodName();
            String params = scheduleJob.getParams();
    
            //quartz没有被spring管理 所以通过其它方式获取service
            ScheduleJobLogService scheduleJobLogService = (ScheduleJobLogService) SpringContextUtils.getBean("scheduleJobLogServiceImpl");
            //保存任务记录日志
            ScheduleJobLog scheduleJobLog = new ScheduleJobLog();
            scheduleJobLog.setJobId(jobId);
            scheduleJobLog.setBeanName(beanName);
            scheduleJobLog.setMethodName(methodName);
            scheduleJobLog.setParams(params);
            scheduleJobLog.setCreateTime(new Date());
    
            long startTime = System.currentTimeMillis();
    
            try {
                Object targetClass = SpringContextUtils.getBean(beanName);
                Method method = null;
                //通过反射获取方法
                if (StringUtils.isNotBlank(params)) {
                    method = targetClass.getClass().getDeclaredMethod(methodName, String.class);
                } else {
                    method = targetClass.getClass().getDeclaredMethod(methodName);
                }
    
                ReflectionUtils.makeAccessible(method);//使方法具有public权限
                //根据反射执行方法
                if (StringUtils.isNotBlank(params)) {
                    method.invoke(targetClass, params);
                } else {
                    method.invoke(targetClass);
                }
    
                long endTime = System.currentTimeMillis() - startTime;
    
                scheduleJobLog.setStatus((byte) 0);//保存日志里的操作状态 0:成功
                scheduleJobLog.setTimes(endTime);//耗时多长时间
    
                logger.info("任务执行成功,任务ID:" + jobId + ",总耗时:" + endTime + "毫秒");
    
            } catch (Exception e) {
                long endTime = System.currentTimeMillis() - startTime;
                scheduleJobLog.setError(StringUtils.substring(e.toString(),2000));//错误消息
                scheduleJobLog.setStatus((byte)1);//失败
                scheduleJobLog.setTimes(endTime);//耗时
    
                e.printStackTrace();
                logger.error("任务执行失败,任务ID:"+jobId);
            } finally {
                //最后调用service保存定时任务日志记录
                scheduleJobLogService.save(scheduleJobLog);
            }
    
        }
    
    }
    

    5. 创建Scheduler工具类,quartz的操作核心,包括操作quartz在数据库中的表

    import com.alibaba.fastjson.JSON;
    import com.qfedu.rongzaiboot.entity.ScheduleJob;
    import com.qfedu.rongzaiboot.quartz.QuartzJob;
    import org.quartz.*;
    
    public class SchedulerUtils {
    
        /**
         * 创建任务
         */
        public static void createJob(Scheduler scheduler, ScheduleJob scheduleJob) {
    
            try {
                Long jobId = scheduleJob.getJobId();
                //创建Job对象
                JobDetail job = JobBuilder.newJob(QuartzJob.class).withIdentity("JOB_" + jobId).build();
                //获取cron表达式 并创建对象
                CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression())
                        .withMisfireHandlingInstructionDoNothing();
                //创建触发器
                CronTrigger trigger = TriggerBuilder.newTrigger()
                        .withIdentity("TRIGGET_" + jobId)
                        .withSchedule(cronScheduleBuilder) //将cron表达式配置到触发器
                        .build();
    
                //将对象josn序列化存储到Job的getJobDataMap()方法中,为后续根据获取属性执行对应的类的任务
                job.getJobDataMap().put("JOB_PARAM_KEY", JSON.toJSONString(scheduleJob));
                //存数据
                scheduler.scheduleJob(job, trigger);
                scheduler.pauseJob(JobKey.jobKey("JOB_" + jobId));//使任务处于等待状态,创建后不会执行
            } catch (SchedulerException e) {
                throw new RRException("创建任务失败", e);
            }
        }
    
        /**
         * 更新任务
         */
        public static void updateJob(Scheduler scheduler, ScheduleJob scheduleJob) {
            //获取新的cron表达式
            CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression())
                    .withMisfireHandlingInstructionDoNothing();
    
            Long jobId = scheduleJob.getJobId();
    
            try {
                //拿到原有的trigger
                TriggerKey triggerKey = TriggerKey.triggerKey("TRIGGER_" + jobId);
                CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
                //为原有的trigger赋予新的cron表达式
                trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
                        .withSchedule(cronScheduleBuilder).build();
                //执行原有的trigger更新
                scheduler.rescheduleJob(triggerKey, trigger);
            } catch (SchedulerException e) {
                e.printStackTrace();
                throw new RRException("更新定时任务失败", e);
            }
        }
    
        /**
         * 删除任务
         */
        public static void deleteJob(Scheduler scheduler, Long jobId) {
            try {
                scheduler.deleteJob(JobKey.jobKey("JOB_" + jobId));
            } catch (SchedulerException e) {
                e.printStackTrace();
                throw new RRException("删除定时任务失败", e);
            }
        }
    
        /**
         * 恢复任务
         */
        public static void resumeJob(Scheduler scheduler, Long jobId) {
            try {
                scheduler.resumeJob(JobKey.jobKey("JOB_" + jobId));
            } catch (SchedulerException e) {
                e.printStackTrace();
                throw new RRException("恢复定时任务失败", e);
            }
        }
    
        /**
         * 立即执行定时任务
         */
        public static void run(Scheduler scheduler, Long jobId) {
            try {
                //只执行一次并且不会改变任务的状态
                scheduler.triggerJob(JobKey.jobKey("JOB_" + jobId));
            } catch (SchedulerException e) {
                e.printStackTrace();
                throw new RRException("立即执行定时任务失败", e);
            }
        }
    
        /**
         * 暂停任务
         *
         * @param scheduler
         * @param jobId
         */
        public static void pauseJob(Scheduler scheduler, Long jobId) {
            try {
                scheduler.pauseJob(JobKey.jobKey("JOB_" + jobId));
            } catch (SchedulerException e) {
                e.printStackTrace();
                throw new RRException("暂停定时任务失败", e);
            }
        }
    }
    

    6. 保存任务的自定义类实体类

    public class ScheduleJob implements Serializable {
        private static final long serialVersionUID = 1L;
    
        private Long jobId;
    
        private String beanName; //执行的类名
    
        private String methodName; //方法名
    
        private String params; //参数
    
        private String cronExpression; //cron表达式
    
        private Byte status; //任务状态 0,运行 1,暂停
    
        private String remark; //备注
    
        private Date createTime; //创建时间
    }
    

    7. 创建quartz任务

    controller

        /**
         * 保存定时任务
         */
        @MyLog("保存定时任务")
        @RequestMapping("/save")
        @RequiresPermissions("schedule:job:save")
        public R save(@RequestBody ScheduleJob scheduleJob){
            scheduleJobService.save(scheduleJob);
            return R.ok();
        }
    

    service,impl

        /**
         * 保存定时任务
         */
        void save(ScheduleJob scheduleJob);
    //------------------
    @Override
        @Transactional(propagation = Propagation.REQUIRED)
        public void save(ScheduleJob scheduleJob) {
            //保存实体类的信息
            scheduleJob.setCreateTime(new Date());
            scheduleJob.setStatus(Constant.ScheduleStatus.PAUSE.getValue());
            scheduleJobMapper.insertSelective(scheduleJob);
    
            //创建定时任务 并保存到对应的quatrz表中
            SchedulerUtils.createJob(scheduler, scheduleJob);
        }
    

    dao,mapper

    修改任务

    controller

       /**
         * 定时任务信息
         */
        @GetMapping("/info/{jobId}")
        @RequiresPermissions(value={"schedule:job:info"})
        public R info(@PathVariable("jobId") Long jobId){
            ScheduleJob scheduleJob = scheduleJobService.queryObject(jobId);
            return R.ok().put("scheduleJob", scheduleJob);
        }
    -------------------------------------------
        @MyLog("修改定时任务")
        @PostMapping("/update")
        @RequiresPermissions(value={"schedule:job:update"})
        public R update(@RequestBody ScheduleJob scheduleJob){
    
            scheduleJobService.update(scheduleJob);
            return R.ok();
        }
    

    service,impl

        /**
         * 查询
         */
        ScheduleJob queryObject(Long jobId);
    
        /**
         * 更新定时任务
         */
        void update(ScheduleJob scheduleJob);
    --------------------------------------------------------
        @Override
        public ScheduleJob queryObject(Long menuId) {
            return scheduleJobMapper.selectByPrimaryKey(menuId);
        }
    
        @Override
        @Transactional(propagation = Propagation.REQUIRED)
        public void update(ScheduleJob scheduleJob) {
    
            SchedulerUtils.updateJob(scheduler, scheduleJob);
    
            scheduleJobMapper.updateByPrimaryKeySelective(scheduleJob);
        }
    

    dao,mapper

      <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Long" >
        select
        <include refid="Base_Column_List" />
        from schedule_job
        where job_id = #{jobId,jdbcType=BIGINT}
      </select>
    
      <update id="updateByPrimaryKeySelective" parameterType="com.qfedu.rongzaiboot.entity.ScheduleJob" >
        update schedule_job
        <set >
          <if test="beanName != null" >
            bean_name = #{beanName,jdbcType=VARCHAR},
          </if>
          <if test="methodName != null" >
            method_name = #{methodName,jdbcType=VARCHAR},
          </if>
          <if test="params != null" >
            params = #{params,jdbcType=VARCHAR},
          </if>
          <if test="cronExpression != null" >
            cron_expression = #{cronExpression,jdbcType=VARCHAR},
          </if>
          <if test="status != null" >
            status = #{status,jdbcType=TINYINT},
          </if>
          <if test="remark != null" >
            remark = #{remark,jdbcType=VARCHAR},
          </if>
          <if test="createTime != null" >
            create_time = #{createTime,jdbcType=TIMESTAMP},
          </if>
        </set>
        where job_id = #{jobId,jdbcType=BIGINT}
      </update>
    

    删除任务

    controller

        /**
         * 删除定时任务
         */
        @MyLog("删除定时任务")
        @PostMapping("/del")
        @RequiresPermissions("schedule:job:delete")
        public R delete(@RequestBody Long[] jobIds){
            scheduleJobService.deleteBatch(jobIds);
    
            return R.ok();
        }
    

    service,impl

        /**
         * 批量删除
         */
        @Override
        @Transactional(propagation = Propagation.REQUIRED)
        public void deleteBatch(Long[] jobIds) {
    
            for(Long jobId : jobIds){
                SchedulerUtils.deleteJob(scheduler, jobId);
            }
    
            //删除数据
            scheduleJobMapper.deleteBatch(jobIds);
        }
    

    dao,mapper

        <!-- 批量删除任务记录 -->
        <delete id="deleteBatch">
            delete from schedule_job where job_id in
            <foreach collection="array" item="jobId" open="(" separator="," close=")">
                #{jobId}
            </foreach>
        </delete>
    

    暂停任务

    controller

        /**
         * 暂停定时任务
         */
        @MyLog("暂停定时任务")
        @PostMapping("/pause")
        @RequiresPermissions("schedule:job:pause")
        public R pause(@RequestBody Long[] jobIds){
            scheduleJobService.pause(jobIds);
            return R.ok();
        }
    

    service,impl

        /**
         * 批量暂停任务
         */
        @Override
        @Transactional(propagation = Propagation.REQUIRED)
        public void pause(Long[] jobIds) {
    
            for(Long jobId : jobIds){
                SchedulerUtils.pauseJob(scheduler, jobId);
            }
    
            Map<String, Object> map = new HashMap<>();
            map.put("list", jobIds);
            map.put("status", Constant.ScheduleStatus.PAUSE.getValue());
            scheduleJobMapper.updateBatch(map);
        }
    

    dao,mapper

        <!-- 批量更新状态 -->
        <update id="updateBatch">
            update schedule_job set status = #{status} where job_id in
            <foreach collection="list" item="jobId" open="(" separator="," close=")">
                #{jobId}
            </foreach>
        </update>
    

    恢复任务

    controller

        /**
         * 恢复定时任务
         */
        @MyLog("恢复定时任务")
        @PostMapping("/resume")
        @RequiresPermissions("schedule:job:resume")
        public R resume(@RequestBody Long[] jobIds){
            scheduleJobService.resume(jobIds);
            return R.ok();
        }
    

    service,impl

        /**
         * 恢复定时任务
         */
        @Override
        @Transactional(propagation = Propagation.REQUIRED)
        public void resume(Long[] jobIds) {
    
            for(Long jobId : jobIds){
                SchedulerUtils.resumeJob(scheduler, jobId);
            }
    
            Map<String, Object> map = new HashMap<>();
            map.put("list", jobIds);
            map.put("status", Constant.ScheduleStatus.NORMAL.getValue());
            scheduleJobMapper.updateBatch(map);
        }
    

    dao,mapper

        <!-- 批量更新状态 -->
        <update id="updateBatch">
            update schedule_job set status = #{status} where job_id in
            <foreach collection="list" item="jobId" open="(" separator="," close=")">
                #{jobId}
            </foreach>
        </update>
    

    立即执行任务(项目里被设置只会执行一次)

    controller

        /**
         * 立即执行定时任务
         */
        @MyLog("立即执行定时任务")
        @PostMapping("/run")
        @RequiresPermissions("schedule:job:run")
        public R run(@RequestBody Long[] jobIds){
            scheduleJobService.run(jobIds);
            return R.ok();
        }
    

    service,impl

        /**
         * 立即执行定时任务
         */
        @Override
        @Transactional(propagation = Propagation.REQUIRED)
        public void run(Long[] jobIds) {
            for(Long jobId : jobIds){
                SchedulerUtils.run(scheduler, jobId);
            }
        }
    

    dao,mapper

    8. 创建quartz的相关的数据库中的表

    1. 在官网下载quartz


    2. 解压进入到目录 docs > daTables 目录 里面有数据库表sql脚本选择自己对应的数据库脚本,直接执行就可以生成quartz相关的表



    相关文章

      网友评论

        本文标题:springboot整合quartz定时任务

        本文链接:https://www.haomeiwen.com/subject/bzxjiftx.html