美文网首页Java高级架构程序员
[土木匠] spring boot 集成 quartz 集群 最

[土木匠] spring boot 集成 quartz 集群 最

作者: 后端老鸟 | 来源:发表于2017-12-29 18:04 被阅读223次

    【转载请注明出处】:土木匠   https://www.jianshu.com/p/7c6e63c88dc2

    这篇文章我写的是集群方式的,如果是单节点且不需要持久化可以参考文章https://www.jianshu.com/p/fe257adc331d

    1、依赖jar包

    如果使用的是Spring cloud 微服务架构,查看官网发现目前中央仓库中还没有 spring-boot-starter-quartz ,只有在spring 的官方仓库中有

    image.png
    路径http://repo.spring.io/milestone/org/springframework/boot/spring-boot-starter-quartz/
    要用的话需要手动指定版本
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-quartz</artifactId>
         <version>2.0.0.M2</version>
    </dependency>
    
    细看这个jar包内容的话不难发现,这个jar包只是在pom文件中引用了几个依赖jar包 image.png

    既然是这样,我们不如直接引用这几个jar包省事,还能根据自己的项目环境选择合适的版本

            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jdbc</artifactId>
            </dependency>
            <dependency>
                <groupId>org.quartz-scheduler</groupId>
                <artifactId>quartz</artifactId>
                <version>2.3.0</version>
            </dependency>
            <dependency>
                <groupId>org.quartz-scheduler</groupId>
                <artifactId>quartz-jobs</artifactId>
                <version>2.3.0</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context-support</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-tx</artifactId>
            </dependency>
    

    我的是Spring cloud项目,和spring相关的没有指定具体版本,各位视情况而定,如果是单纯的Spring boot项目,手动指定下版本号即可。

    2、数据库配置

    由于分布式的quartz需要将任务和触发器持久化到数据库,这样就的为quartz配置一套数据源,而一般的项目如果有其他的业务需要操作数据库,项目中本身需要为spring配置一套数据源,这样在同一个运行环境中就会同时有两套数据源的配置,如果不是有特殊需求,通一个环境有一套配置足以。在网上也看了好多集成的例子,但是都是配置两套,当时的直觉告诉我一套是可以的,当时也试了一些方式,后来自己摸索出来了,直接说我的实现方式,其他的省略,要看的话可以直接去官网看一眼,下面以msql数据库为例,其他类似。


    image.png

    quartz.properties

    org.quartz.scheduler.instanceName=test-schedule
    org.quartz.scheduler.instanceId=AUTO
    org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
    org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
    org.quartz.jobStore.tablePrefix=QRTZ_
    org.quartz.jobStore.isClustered=true
    org.quartz.jobStore.useProperties=false
    org.quartz.jobStore.clusterCheckinInterval=20000
    org.quartz.scheduler.skipUpdateCheck=true
    org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
    org.quartz.threadPool.threadCount=10
    org.quartz.threadPool.threadPriority=5
    org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true
    

    application-dev.properties相关部分

    server.port=8084
    spring.application.name=test-schedule
    server.tomcat.max-http-header-size=8192
    
    spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test-schedule?useUnicode=true&autoReconnect=true&rewriteBatchedStatements=TRUE&useSSL=false
    spring.datasource.username=root
    spring.datasource.password=123456
    spring.datasource.driver-class-name=com.mysql.jdbc.Driver
    

    QuartzJobFactory

    import org.quartz.spi.TriggerFiredBundle;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
    import org.springframework.scheduling.quartz.AdaptableJobFactory;
    import org.springframework.stereotype.Component;
    
    /**
     * <p>Job工厂</p>
     * <PRE>
     * <BR>    修改记录
     * <BR>-----------------------------------------------
     * <BR>    修改日期         修改人          修改内容
     * </PRE>
     *
     * @author zl
     * @version 1.0
     * @date Created in 2017/12/16 15:48
     * @copyright: Copyright (c) founders
     */
    @Component
    public class QuartzJobFactory extends AdaptableJobFactory {
        @Autowired
        private AutowireCapableBeanFactory capableBeanFactory;
    
        @Override
        protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
            Object jobInstance = super.createJobInstance(bundle);
            capableBeanFactory.autowireBean(jobInstance);
            return jobInstance;
        }
    }
    

    QuartzConfig

    import org.quartz.Scheduler;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.ClassPathResource;
    import org.springframework.scheduling.quartz.SchedulerFactoryBean;
    
    import javax.sql.DataSource;
    
    /**
     * <p>Quartz配置</p>
     * <PRE>
     * <BR>    修改记录
     * <BR>-----------------------------------------------
     * <BR>    修改日期         修改人          修改内容
     * </PRE>
     *
     * @author zl
     * @version 1.0
     * @date Created in 2017/12/16 15:33
     * @copyright: Copyright (c) founders
     */
    @Configuration
    public class QuartzConfig {
    
        @Autowired
        DataSource dataSource;
    
        @Bean
        public SchedulerFactoryBean schedulerFactoryBean (QuartzJobFactory quartzJobFactory) throws Exception {
            SchedulerFactoryBean factoryBean=new SchedulerFactoryBean();
            factoryBean.setJobFactory(quartzJobFactory);
            factoryBean.setConfigLocation(new ClassPathResource("quartz.properties"));
            factoryBean.setDataSource(dataSource);
            factoryBean.afterPropertiesSet();
            return factoryBean;
        }
    
        @Bean
        public Scheduler scheduler(SchedulerFactoryBean schedulerFactoryBean) throws Exception {
            Scheduler scheduler=schedulerFactoryBean.getScheduler();
            scheduler.start();
            return scheduler;
        }
    
    }
    
    

    到这里,配置部分已经结束了,还需要创建一下数据库就可以安心写具体job了。
    quartz-2.3.0.jar这个jar包的org.quartz.impl.jdbcjobstore包下有对应的各种数据库的初始化sql脚本

    image.png

    3、job

    BaseJob

    import org.quartz.Job;
    import org.quartz.JobExecutionContext;
    import org.quartz.JobExecutionException;
    
    /**
     * <p>job接口</p>
     * <PRE>
     * <BR>    修改记录
     * <BR>-----------------------------------------------
     * <BR>    修改日期         修改人          修改内容
     * </PRE>
     *
     * @author zl
     * @version 1.0
     * @date Created in 2017/12/16 15:57
     * @copyright: Copyright (c) founders
     */
    
    public interface BaseJob extends Job {
        @Override
        void execute(JobExecutionContext context) throws JobExecutionException;
    }
    

    TestJob

    import lombok.extern.slf4j.Slf4j;
    import org.quartz.JobExecutionContext;
    import org.quartz.JobExecutionException;
    import org.springframework.stereotype.Component;
    
    /**
     * <p>Test Job</p>
     * <PRE>
     * <BR>    修改记录
     * <BR>-----------------------------------------------
     * <BR>    修改日期         修改人          修改内容
     * </PRE>
     *
     * @author zl
     * @version 1.0
     * @date Created in 2017/12/16 16:14
     * @copyright: Copyright (c) founders
     */
    @Slf4j
    @Component
    @DisallowConcurrentExecution
    public class TestJob implements BaseJob {
    
        @Override
        public void execute(JobExecutionContext context) throws JobExecutionException {
            log.info("test job----PreviousFireTime={},NextFireTime={},FireTime={}" ,context.getPreviousFireTime(),context.getNextFireTime(),context.getFireTime());
        }
    }
    

    @DisallowConcurrentExecution意思是不允许并发执行,也就是说当Job的执行时间(如执行完需要30s)大于job的执行时间间隔(如10s),默认情况下,quartz为了能让job按照预定的时间间隔执行,会马上启用新的线程执行job。
    这个时候启动项目,然后在数据库qrtz_cron_triggersqrtz_job_detailsqrtz_triggers这三个表添加相应的任务和触发器,如果添加的正确,job是可以跑起来的,下面我再介绍下job的管理。

    4、job管理

    TaskInfoVo

    import lombok.Data;
    import java.util.Date;
    
    /**
     * <p>task</p>
     * <PRE>
     * <BR>    修改记录
     * <BR>-----------------------------------------------
     * <BR>    修改日期         修改人          修改内容
     * </PRE>
     *
     * @author zl
     * @version 1.0
     * @date Created in 2017/12/16 22:00
     * @copyright: Copyright (c) founders
     */
    @Data
    public class TaskInfoVo {
    
        private String jobName;
        private String jobGroup;
        private String jobDescription;
        private String jobStatus;
        private String cronExpression;
        private String createTime;
    
        private Date previousFireTime;
        private Date nextFireTime;
    
    }
    
    

    JobService 这个类拿去直接是可以用的,注意我用了lombok

    import com.sunlands.zlcx.schedule.exception.BusinessException;
    import com.sunlands.zlcx.schedule.vo.PageResultVO;
    import com.sunlands.zlcx.schedule.vo.TaskInfoVo;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.quartz.*;
    import org.quartz.impl.matchers.GroupMatcher;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.HashSet;
    import java.util.List;
    import java.util.stream.Collectors;
    
    /**
     * <p>Job管理</p>
     * <PRE>
     * <BR>    修改记录
     * <BR>-----------------------------------------------
     * <BR>    修改日期         修改人          修改内容
     * </PRE>
     *
     * @author zl
     * @version 1.0
     * @date Created in 2017/12/16 21:58
     * @copyright: Copyright (c) founders
     */
    @Service
    @Slf4j
    public class JobService {
    
        @Autowired
        private Scheduler scheduler;
    
        /**
         * 分页查询
         *
         * @return
         */
        public PageResultVO<TaskInfoVo> list(int page, int size) {
    
            PageResultVO<TaskInfoVo> resultVO = new PageResultVO<TaskInfoVo>();
            try {
                List<TaskInfoVo> list = new ArrayList<>();
                for (String groupJob : scheduler.getJobGroupNames()) {
                    for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.<JobKey>groupEquals(groupJob))) {
                        List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
                        for (Trigger trigger : triggers) {
                            Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
                            JobDetail jobDetail = scheduler.getJobDetail(jobKey);
    
                            String cronExpression = "", createTime = "";
    
                            if (trigger instanceof CronTrigger) {
                                CronTrigger cronTrigger = (CronTrigger) trigger;
                                cronExpression = cronTrigger.getCronExpression();
                                createTime = cronTrigger.getDescription();
                            }
                            TaskInfoVo info = new TaskInfoVo();
                            info.setJobName(jobKey.getName());
                            info.setJobGroup(jobKey.getGroup());
                            info.setJobDescription(jobDetail.getDescription());
                            info.setJobStatus(triggerState.name());
                            info.setCronExpression(cronExpression);
                            info.setCreateTime(createTime);
                            info.setPreviousFireTime(trigger.getPreviousFireTime());
                            info.setNextFireTime(trigger.getNextFireTime());
                            list.add(info);
                        }
                    }
                }
                resultVO.setTotal(list.size());
                resultVO.setRows(list.stream().skip((page - 1) * size).limit(size).collect(Collectors.toList()));
    
            } catch (SchedulerException e) {
                log.error("分页查询定时任务失败,page={},size={},e={}", page, size, e);
            }
    
            return resultVO;
        }
    
        /**
         * 添加
         *
         * @param jobName
         * @param jobGroup
         * @param cronExpression
         * @param jobDescription
         */
        public void addJob(String jobName, String jobGroup, String cronExpression, String jobDescription) {
            if (StringUtils.isAnyBlank(jobName, jobGroup, cronExpression, jobDescription)) {
                throw new BusinessException(String.format("参数错误, jobName={},jobGroup={},cronExpression={},jobDescription={}", jobName, jobGroup, cronExpression, jobDescription));
            }
            String createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
            try {
                log.info("添加jobName={},jobGroup={},cronExpression={},jobDescription={}", jobName, jobGroup, cronExpression, jobDescription);
    
                if (checkExists(jobName, jobGroup)) {
                    log.error("Job已经存在, jobName={},jobGroup={}", jobName, jobGroup);
                    throw new BusinessException(String.format("Job已经存在, jobName={},jobGroup={}", jobName, jobGroup));
                }
    
                TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
                JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
    
                CronScheduleBuilder schedBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing();
                CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withDescription(createTime).withSchedule(schedBuilder).build();
    
                Class<? extends Job> clazz = (Class<? extends Job>) Class.forName(jobName);
                JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(jobKey).withDescription(jobDescription).build();
                scheduler.scheduleJob(jobDetail, trigger);
            } catch (SchedulerException | ClassNotFoundException e) {
                log.error("添加job失败, jobName={},jobGroup={},e={}", jobName, jobGroup, e);
                throw new BusinessException("类名不存在或执行表达式错误");
            }
        }
    
        /**
         * 修改
         *
         * @param jobName
         * @param jobGroup
         * @param cronExpression
         * @param jobDescription
         */
        public void edit(String jobName, String jobGroup, String cronExpression, String jobDescription) {
            if (StringUtils.isAnyBlank(jobName, jobGroup, cronExpression, jobDescription)) {
                throw new BusinessException(String.format("参数错误, jobName={},jobGroup={},cronExpression={},jobDescription={}", jobName, jobGroup, cronExpression, jobDescription));
            }
            String createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
    
            try {
                log.info("修改jobName={},jobGroup={},cronExpression={},jobDescription={}", jobName, jobGroup, cronExpression, jobDescription);
                if (!checkExists(jobName, jobGroup)) {
                    log.error("Job不存在, jobName={},jobGroup={}", jobName, jobGroup);
                    throw new BusinessException(String.format("Job不存在, jobName={},jobGroup={}", jobName, jobGroup));
                }
                TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
                JobKey jobKey = new JobKey(jobName, jobGroup);
                CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing();
                CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withDescription(createTime).withSchedule(cronScheduleBuilder).build();
    
                JobDetail jobDetail = scheduler.getJobDetail(jobKey);
                jobDetail.getJobBuilder().withDescription(jobDescription);
                HashSet<Trigger> triggerSet = new HashSet<>();
                triggerSet.add(cronTrigger);
    
                scheduler.scheduleJob(jobDetail, triggerSet, true);
            } catch (SchedulerException e) {
                log.error("修改job失败, jobName={},jobGroup={},e={}", jobName, jobGroup, e);
                throw new BusinessException("类名不存在或执行表达式错误");
            }
        }
    
        /**
         * 删除
         *
         * @param jobName
         * @param jobGroup
         */
        public void delete(String jobName, String jobGroup) {
            try {
                log.info("删除jobName={},jobGroup={}", jobName, jobGroup);
                TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
                if (checkExists(jobName, jobGroup)) {
                    scheduler.pauseTrigger(triggerKey);
                    scheduler.unscheduleJob(triggerKey);
                }
            } catch (SchedulerException e) {
                log.error("删除job失败, jobName={},jobGroup={},e={}", jobName, jobGroup, e);
                throw new BusinessException(e.getMessage());
            }
        }
    
        /**
         * 暂停
         *
         * @param jobName
         * @param jobGroup
         */
        public void pause(String jobName, String jobGroup) {
            try {
                log.info("暂停jobName={},jobGroup={}", jobName, jobGroup);
                TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
                if (!checkExists(jobName, jobGroup)) {
                    log.error("Job不存在, jobName={},jobGroup={}", jobName, jobGroup);
                    throw new BusinessException(String.format("Job不存在, jobName={},jobGroup={}", jobName, jobGroup));
                }
                scheduler.pauseTrigger(triggerKey);
            } catch (SchedulerException e) {
                log.error("暂停job失败, jobName={},jobGroup={},e={}", jobName, jobGroup, e);
                throw new BusinessException(e.getMessage());
            }
        }
    
        /**
         * 重启
         *
         * @param jobName
         * @param jobGroup
         */
        public void resume(String jobName, String jobGroup) {
            try {
                log.info("重启jobName={},jobGroup={}", jobName, jobGroup);
                TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
                if (!checkExists(jobName, jobGroup)) {
                    log.error("Job不存在, jobName={},jobGroup={}", jobName, jobGroup);
                    throw new BusinessException(String.format("Job不存在, jobName={},jobGroup={}", jobName, jobGroup));
                }
                scheduler.resumeTrigger(triggerKey);
            } catch (SchedulerException e) {
                log.error("重启job失败, jobName={},jobGroup={},e={}", jobName, jobGroup, e);
                throw new BusinessException(e.getMessage());
            }
        }
    
        /**
         * 立即执行
         *
         * @param jobName
         * @param jobGroup
         */
        public void trigger(String jobName, String jobGroup) {
            try {
                log.info("立即执行jobName={},jobGroup={}", jobName, jobGroup);
                if (!checkExists(jobName, jobGroup)) {
                    log.error("Job不存在, jobName={},jobGroup={}", jobName, jobGroup);
                    throw new BusinessException(String.format("Job不存在, jobName={},jobGroup={}", jobName, jobGroup));
                }
                JobKey jobKey = new JobKey(jobName, jobGroup);
                scheduler.triggerJob(jobKey);
            } catch (SchedulerException e) {
                log.error("立即执行job失败, jobName={},jobGroup={},e={}", jobName, jobGroup, e);
                throw new BusinessException(e.getMessage());
            }
        }
    
        /**
         * 验证是否存在
         *
         * @param jobName
         * @param jobGroup
         * @throws SchedulerException
         */
        private boolean checkExists(String jobName, String jobGroup) throws SchedulerException {
            TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
            return scheduler.checkExists(triggerKey);
        }
    
    
    }
    
    

    Controller和页面我就不拿出来了,自行写一下即可,可以写个测试类跑一下。

    【转载请注明出处】:土木匠   https://www.jianshu.com/p/7c6e63c88dc2

    相关文章

      网友评论

        本文标题:[土木匠] spring boot 集成 quartz 集群 最

        本文链接:https://www.haomeiwen.com/subject/kceywxtx.html