美文网首页
5、quartz使用总结

5、quartz使用总结

作者: ltjxwxz | 来源:发表于2017-12-27 14:24 被阅读0次

    在此梳理一下项目中用到的关于quartz的知识:
    1、Spring提供的类
    (1)SchedulerFactoryBean
    (2)Job相关的类:Job执行任务的逻辑需要自己写,既然用了spring,自然要使用spring提供的Job相关的类。有两个:MethodInvokingJobDetailFactoryBean和QuartzJobBean。其中MethodInvokingJobDetailFactoryBean不支持存储到数据库,会报java.io.NotSerializableException,遂放弃。

    2、并发控制
    官方文档提供了一种并发控制方法:@DisallowConcurrentExecution
    该限制仅针对于JobDetail,同一时刻仅允许执行一个JobDetail,但可以并发执行多个Job类的不同实例。也就是如果用Job构建了多个JobDetail,如JobDetail1,JobDetail2,JobDetail3,那么这3个JobDetail还是并发执行的。
    根据org.quartz.threadPool.threadCount配置的线程个数 和 org.quartz.threadPool.class配置的线程类执行自己写的逻辑。

    3、数据持久化
    quartz提供两种持久化类型:RAMJobStore和JDBC JobStore
    RAMJobStore持久化到内存,重启应用后任务丢失。
    JDBC JobStore可以持久化到数据库,重启后任务依然存在。
    下载官网提供的quartz-2.2.3-distribution.tar.gz包,quartz\quartz-2.2.3\docs\dbTables提供了各种数据库的脚本,建表,quartz.properties文件中配置jobStore类型,代理类和数据源。同时在配置文件中指定quartz.properties文件的位置。

    4、动态管理任务
    (1)增加:

    scheduler.scheduleJob(jobDetail, trigger);
    

    (2)删除:

    scheduler.pauseTrigger(TriggerKey.triggerKey(testSuite.getName(), project.getName()));
    scheduler.unscheduleJob(TriggerKey.triggerKey(testSuite.getName(), project.getName()));
    scheduler.deleteJob(JobKey.jobKey(testSuite.getName(), project.getName()));
    

    5、执行的状态
    执行状态存放在qrtz_triggers表的trigger_state字段,源码中完整的状态有:WAITING,ACQUIRED,EXECUTING,COMPLETE,BLOCKED,ERROR,PAUSED,PAUSED_BLOCKED。配置文件中配置的JobStore是JobStoreTX,但是状态变化的相关代码都在JobStoreSupport类中,JobStoreSupport调用配置的Delegate拼接sql语句,完成状态变化。
    从源码中可以看出,acquired状态表示已经获得的,在job自定义逻辑之前执行。
    其他网友整理的状态变化图:


    image.png

    6、自定义Job类中使用spring管理的service
    Job继承spring提供的类QuartzJobBean,竟然不能直接注入自己写的service。原因是Quartz初始化是自己的JobContext,不同于Spring的ApplicationContext,所以无法直接注入。后来找到一种解决办法,在构建SchedulerFactoryBean的时候存放到map中。Job中使用时再取出来。

    @Bean(name = "schedulerFactory")
    public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setQuartzProperties(quartzProperties());
        // 把用到的job类中用到的service,dao等传给他,用@Autowired注解无法注入
        Map<String, Object> springBeanMap = new HashMap<String, Object>();
        springBeanMap.put("testngService", testngService);
        springBeanMap.put("quartzService", quartzService);
        springBeanMap.put("triggerDao", triggerDao);
        factory.setSchedulerContextAsMap(springBeanMap);
        factory.setWaitForJobsToCompleteOnShutdown(true);
        return factory;
    }
    
    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        JobDataMap dataMap = context.getJobDetail().getJobDataMap();
        try {
            testngService = (TestngService)context.getScheduler().getContext().get("testngService");
        } catch (SchedulerException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }
    

    7、监听器
    quartz提供了TriggerListeners、JobListeners和SchedulerListeners,使用方法在quartz\quartz-2.2.3\examples中有,很详细。
    注意:经过测试,监听器在运行过程中动态注册,第一次注册可用,重启后失效。

    8、总结:这次学习从0开始到应用到项目中,帮助最大的是官方提供的example代码、源代码和说明文档,在理解这些的基础上,学习一些优秀的博客,总结如下:
    中文说明文档:https://www.w3cschool.cn/quartz_doc/quartz_doc-lwuv2d2a.html
    增删改查:http://snailxr.iteye.com/blog/2076903#comments
    并发:http://blog.csdn.net/will_awoke/article/details/38921273
       https://www.cnblogs.com/Rozdy/p/4220186.html
       http://www.blogjava.net/stevenjohn/archive/2015/07/26/426425.html
    集群:http://www.importnew.com/22896.html
       http://soulshard.iteye.com/blog/337886
       https://tech.meituan.com/mt-crm-quartz.html
    核心概念:http://blog.csdn.net/guolong1983811/article/details/51501346
         http://blog.csdn.net/beliefer/article/details/51578546
         https://www.cnblogs.com/pzy4447/p/5201674.html
    问题:http://blog.csdn.net/jackylovesjava/article/details/50044271

    9、以下是一些代码
    9.1、完整的配置 quartz.properites

    # Default Properties file for use by StdSchedulerFactory
    # to create a Quartz Scheduler Instance, if a different
    # properties file is not explicitly specified.
    #
    org.quartz.scheduler.instanceName: DefaultQuartzScheduler
    org.quartz.scheduler.rmi.export: false
    org.quartz.scheduler.rmi.proxy: false
    org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
    
    #线程池相关配置
    org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
    org.quartz.threadPool.threadCount: 10
    org.quartz.threadPool.threadPriority: 5
    org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
    
    #错过执行时间设置
    #org.quartz.jobStore.misfireThreshold: 60000
    
    #quartz信息持久化到oracle数据库
    org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
    org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
    org.quartz.jobStore.useProperties: false
    org.quartz.jobStore.dataSource: myDS
    org.quartz.jobStore.tablePrefix: QRTZ_
    org.quartz.jobStore.isClustered: false
    
    #数据库连接参数
    org.quartz.dataSource.myDS.driver: oracle.jdbc.driver.OracleDriver
    org.quartz.dataSource.myDS.URL: jdbc:oracle:thin:@10.10.52.14:1521:wxkfdb
    org.quartz.dataSource.myDS.user: autotesting
    org.quartz.dataSource.myDS.password: test
    org.quartz.dataSource.myDS.maxConnections: 5
    
    

    9.2、quartz整合spring boot的配置类

    @Configuration
    public class QuartzCofig {
    
        @Autowired
        private TestngService testngService;
    
        @Autowired
        private TriggerDao triggerDao;
    
        @Autowired
        private QuartzService quartzService;
    
        @Bean(name = "schedulerFactory")
        public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
            SchedulerFactoryBean factory = new SchedulerFactoryBean();
            factory.setQuartzProperties(quartzProperties());
            // 把用到的job类中用到的service,dao等传给他,用@Autowired注解无法注入
            Map<String, Object> springBeanMap = new HashMap<String, Object>();
            springBeanMap.put("testngService", testngService);
            springBeanMap.put("quartzService", quartzService);
            springBeanMap.put("triggerDao", triggerDao);
            factory.setSchedulerContextAsMap(springBeanMap);
            factory.setWaitForJobsToCompleteOnShutdown(true);
            return factory;
        }
    
        @Bean
        public Properties quartzProperties() throws IOException {
            PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
            // 指定quart.properties文件位置
            propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
            //在quartz.properties中的属性被读取并注入后再初始化对象
            propertiesFactoryBean.afterPropertiesSet();
            return propertiesFactoryBean.getObject();
        }
    
        /*
         * 通过SchedulerFactoryBean获取Scheduler的实例
         * name不能设置为scheduler,否则QuartzService里注入的不是此处定义的scheduler
         */
        @Bean(name="myScheduler")
        public Scheduler scheduler() throws IOException {
            System.out.println("schedulerFactoryBean().getScheduler():" + schedulerFactoryBean().getScheduler());
            return schedulerFactoryBean().getScheduler();
        }
    }
    

    9.3、自定义的Job逻辑

    @Configuration
    @Component
    @PersistJobDataAfterExecution
    @DisallowConcurrentExecution
    public class ScheduleJob extends QuartzJobBean {
    
        private Logger logger = LoggerFactory.getLogger(ScheduleJob.class);
    
        @Override
        protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
            JobDataMap dataMap = context.getJobDetail().getJobDataMap();
    
            TestSuite testSuite = (TestSuite)dataMap.get("testSuite");
            Project project = (Project) dataMap.get("project");
    
            TestngService testngService = null;
            try {
                testngService = (TestngService)context.getScheduler().getContext().get("testngService");
            } catch (SchedulerException e) {
                e.printStackTrace();
                logger.error(e.getMessage());
            }
            logger.info("---" + context.getJobDetail().getKey() + "想要执行---");
            testngService.run(testSuite, project);
        }
    }
    

    9.4、增加,删除service

    @Service("quartzService")
    public class QuartzService {
    
        @Resource(name = "myScheduler")
        private Scheduler scheduler;
    
        @Autowired
        private TriggerDao triggerDao;
    
        @Autowired
        private ProcessDao processDao;
    
        @Autowired
        private ApplicationContext applicationContext;
    
        private Logger logger = LoggerFactory.getLogger(QuartzService.class);
    
        /**
         * 增加或修改一个job
         * @param testSuite
         * @param project
         */
        public void addJob(TestSuite testSuite, Project project) {
            String runTime = testSuite.getRuntime();
             if(!StringUtils.isEmpty(testSuite.getRuntime())) {
                // 生成一个triggerKey
                String testSuiteName = testSuite.getName();
                String projectName = project.getName();
    
                 JobDataMap jobDataMap = new JobDataMap();
                 jobDataMap.put("testSuite", testSuite);
                 jobDataMap.put("project", project);
    
                JobDetail jobDetail = JobBuilder.newJob(ScheduleJob.class)
                        .withIdentity(testSuiteName, projectName)
                        .usingJobData(jobDataMap)
                        .build();
                // 向Job传值
    
    //            jobDetail.getJobDataMap().put("testSuite", testSuite);
    //            jobDetail.getJobDataMap().put("project", project);
                 TestSuite testsuite = (TestSuite) jobDetail.getJobDataMap().get("testSuite");
                // misfire处理:上一个job执行结束,立即执行这个
                CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(runTime);
    //                        .withMisfireHandlingInstructionFireAndProceed();
                CronTrigger trigger = TriggerBuilder.newTrigger()
                        .withIdentity(testSuiteName, projectName)
                        .withSchedule(scheduleBuilder)
                        .build();
                try {
                    scheduler.scheduleJob(jobDetail, trigger);
                } catch (SchedulerException e) {
                    e.printStackTrace();
                }
                Trigger triggerInsert = new Trigger(project.getName() + "." + testSuite.getName(),
                        TriggerStateConstant.WAITING, null, project.getProjectid());
                triggerDao.insertOne(triggerInsert);
            }
        }
    
        /**
         * 删除job
         * @param testSuite
         * @param project
         */
        public void deleteJob(TestSuite testSuite, Project project) {
            try {
                scheduler.pauseTrigger(TriggerKey.triggerKey(testSuite.getName(), project.getName()));
                scheduler.unscheduleJob(TriggerKey.triggerKey(testSuite.getName(), project.getName()));
                scheduler.deleteJob(JobKey.jobKey(testSuite.getName(), project.getName()));
                // 删除sttrigger表的记录
                triggerDao.deleteByTriggerId(project.getName() + "." + testSuite.getName());
            } catch (SchedulerException e) {
                e.printStackTrace();
                logger.error(e.getMessage());
            }
        }
    
        /**
         * 判断该project下是否有trigger触发,如果有返回true
         * @param projectId
         * @return
         */
        public boolean hasTriggerFired(String projectId) {
            List<Trigger> triggerList = triggerDao.findByProjectId(projectId);
            if(CollectionUtils.isEmpty(triggerList)) {
                return true;
            } else {
                return false;
            }
        }
    
        /**
         * 获取可以执行的process
         * @param projectId
         * @return
         */
        public Process getAvaliableProcess(String projectId) {
            // 获取该project下所有process
            List<String> processIdList = processDao.findIdByProjectId(projectId);
            // 获取正在执行中的testsuite对应的processid
            List<String> executingProcssIdList = triggerDao.findExecutingProcess(projectId);
    
            List<String> differentList = new ArrayList<>();
            if(!CollectionUtils.isEmpty(executingProcssIdList)) {
                Set<String> processIdSet = new HashSet<>();
                processIdSet.addAll(processIdList);
                Set<String> executingProcssIdSet = new HashSet<String>();
                executingProcssIdSet.addAll(executingProcssIdList);
                // 取差集
                Set<String> differentSet = CommonUtil.getDifferentSet(processIdSet, executingProcssIdSet);
                differentList.addAll(differentSet);
            } else {
                differentList = processIdList;
            }
            // 随机获取一个Process
            if(differentList.size() != 0) {
                String randomProcessId = CommonUtil.getRandom(differentList);
                return processDao.findByProcessId(randomProcessId);
            } else {
                return null;
            }
        }
    }
    

    9.5、资源调度的单例类。
    (1)用单例模式的原因:要保证每个Job执行的过程中获得的ProcessResource类的对象是同一个对象,map 是同一个map,否则有多个map的话,使用的就不是同一份资源了。

    public class ProcessResource {
    
        private ProcessDao processDao = (ProcessDao) SpringUtil.getBean("processDao");
        private Map<String, LinkedList<Process>> map = new HashMap<String, LinkedList<Process>>();
        private static ProcessResource instance = null;
        private Object lock = new Object();
    
        private Logger logger = LoggerFactory.getLogger(ProcessResource.class);
    
    
        private ProcessResource() {
            if (instance != null) {
                return;
            }
        }
    
        public static ProcessResource getInstance() {
            if (instance == null) {
                synchronized (ProcessResource.class) {
                    if (instance == null) {
                        instance = new ProcessResource();
                        instance.init();
                    }
                }
            }
            return instance;
        }
    
    
        public void init() {
            List<String> projectIdList = processDao.findProjectId();
            for(String projectId : projectIdList) {
                LinkedList<Process> list = processDao.findByProjectId(projectId);
                map.put(projectId, list);
            }
            instance.setMap(map);
        }
    
        public Process getProcess(String projectId) {
            synchronized (lock) {
                LinkedList<Process> list = instance.getMap().get(projectId);
                // 判断list中是否有元素,如果有,返回, 如果没有,打印信息
                if(!CollectionUtils.isEmpty(list)) {
                    return list.removeFirst();
                } else {
                    logger.info("ProcessResource中没有可用的Process了....");
                    return null;
                }
            }
        }
    
        /**
         * 释放资源
         * @param process
         * @param projectId
         */
        public void releaseProcess(Process process, String projectId) {
            LinkedList<Process> list = instance.getMap().get(projectId);
            // 判断list中是否有元素,如果有,返回, 如果没有,打印信息
            list.addLast(process);
        }
    
        public void setMap(Map<String, LinkedList<Process>> map) {
            this.map = map;
        }
    
        public Map<String, LinkedList<Process>> getMap() {
            return map;
        }
    }
    

    相关文章

      网友评论

          本文标题:5、quartz使用总结

          本文链接:https://www.haomeiwen.com/subject/nmmdgxtx.html