美文网首页SpringBoot
Quartz定时任务串行调度 fixedDelay

Quartz定时任务串行调度 fixedDelay

作者: idelo | 来源:发表于2018-07-23 18:13 被阅读0次

    项目需要搞分布式,出于一些原因定时器的代码也需要部署两份,但是定时器是不需要跑两遍,所以考虑了分布式的定时任务框架Quartz。主要解决2个问题:

    1. 多台服务运行,保证只有一台服务的定时器在跑。这台服务不挂,另一台上的定时器永远不启动。
    2. 保证定时器串行调度。一个定时任务没有执行完,绝对不会触发第二次。(类似于Spring的定时器的fixedDelayString参数)

    于是我就开始写demo,第一个很容易就得到了解决这里就不细说了,坑在了第二个,由于我用的是比较新的版本所以找了很多资料都不合适,结果没办法,跑去调试源码。(容易吗我,写个demo要去调源码)

    网上很多人都说只要concurrent 设置为false

        @Bean("testJobDetail")
        public JobDetailFactoryBean testJobDetail() {
            JobDetailFactoryBean bean = new JobDetailFactoryBean();
            bean.setDurability(true);
            bean.setRequestsRecovery(true);
            bean.setJobClass(MyDetailQuartzJobBean.class);
            Map<String, String> map = new HashMap<>();
            //配置定时任务类
            map.put("targetObject", "testScheduleTask");
            map.put("targetMethod", "execute");
            //是否允许任务并发执行。当值为false时,表示必须等到前一个线程处理完毕后才再启一个新的线程
    //        map.put("concurrent", "false");
            bean.setJobDataAsMap(map);
            return bean;
        }
    

    或者,使QuartzJobBean类实现org.quartz.StatefulJob接口即可

    public class BackCoupon implements StatefulJob {
        @Override
        public void execute(JobExecutionContext context)
                  throws JobExecutionException {
        }
    }
    

    我都试过了完全没效果StatefulJob接口已经被废弃了不推荐使用,没办法了,撸起袖子调试源码吧。我们知道quartz是可以把jobdetail存到数据表里的


    图片.png

    我发现了一个很可疑的字段IS_NONCONCURRENT,入口点就在这里了。段点打在类的org.quartz.impl.jdbcjobstore.JobStoreSupport类storeJob方法,第610行,这里触发了jobdetail的一个更新操作,点进去发现了orcale的具体实现

     public int updateJobDetail(Connection conn, JobDetail job) throws IOException, SQLException {
            ByteArrayOutputStream baos = this.serializeJobData(job.getJobDataMap());
            byte[] data = baos.toByteArray();
            PreparedStatement ps = null;
            PreparedStatement ps2 = null;
            ResultSet rs = null;
    
            int var13;
            try {
                ps = conn.prepareStatement(this.rtp("UPDATE {0}JOB_DETAILS SET DESCRIPTION = ?, JOB_CLASS_NAME = ?, IS_DURABLE = ?, IS_NONCONCURRENT = ?, IS_UPDATE_DATA = ?, REQUESTS_RECOVERY = ?, JOB_DATA = EMPTY_BLOB()  WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"));
                ps.setString(1, job.getDescription());
                ps.setString(2, job.getJobClass().getName());
                this.setBoolean(ps, 3, job.isDurable());
                this.setBoolean(ps, 4, job.isConcurrentExectionDisallowed());
                this.setBoolean(ps, 5, job.isPersistJobDataAfterExecution());
                this.setBoolean(ps, 6, job.requestsRecovery());
                ps.setString(7, job.getKey().getName());
                ps.setString(8, job.getKey().getGroup());
                ps.executeUpdate();
                ps.close();
                ps = conn.prepareStatement(this.rtp("SELECT JOB_DATA FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ? FOR UPDATE"));
                ps.setString(1, job.getKey().getName());
                ps.setString(2, job.getKey().getGroup());
                rs = ps.executeQuery();
                int res = 0;
                if (rs.next()) {
                    Blob dbBlob = this.writeDataToBlob(rs, 1, data);
                    ps2 = conn.prepareStatement(this.rtp("UPDATE {0}JOB_DETAILS SET JOB_DATA = ?  WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"));
                    ps2.setBlob(1, dbBlob);
                    ps2.setString(2, job.getKey().getName());
                    ps2.setString(3, job.getKey().getGroup());
                    res = ps2.executeUpdate();
                }
    
                var13 = res;
            } finally {
                closeResultSet(rs);
                closeStatement(ps);
                closeStatement(ps2);
            }
    
            return var13;
        }
    

    这个时候发现IS_NONCONCURRENT 字段是通过JOB的isConcurrentExectionDisallowed属性确定的。而且下方有个循环,这个循环就是把之前JobDetailFactoryBean 配置的map里的值取出来。看一下SQL,根本就不能配置IS_NONCONCURRENT 嘛,那么IS_NONCONCURRENT 怎么办呢?
    点进去JobDetail的实现类查看相关的getter and setter发现如下东西:

      public boolean isConcurrentExectionDisallowed() {
            return ClassUtils.isAnnotationPresent(this.jobClass, DisallowConcurrentExecution.class);
        }
    

    是否有出现标签?哦,原来是通过扫描标签啊,DisallowConcurrentExecution这个标签是个空标签。找到定义的job,在类名上加上标签。问题就得到了解决。

    比如我这里绑定的job是MyDetailQuartzJobBean方法,用的是网上给的绑定方法map里面定义targetObject,targetMethod。然后通过反射调用自己写的定时任务,那么就在类名上加上注解即可。

    package com.quartz.demo.service;
    
    import org.quartz.DisallowConcurrentExecution;
    import org.quartz.JobExecutionContext;
    import org.quartz.JobExecutionException;
    import org.springframework.context.ApplicationContext;
    import org.springframework.scheduling.quartz.QuartzJobBean;
    
    import java.lang.reflect.Method;
    
    
    @DisallowConcurrentExecution
    public class MyDetailQuartzJobBean extends QuartzJobBean {
        // 计划任务所在类
        private String targetObject;
    
        // 具体需要执行的计划任务
        private String targetMethod;
        private ApplicationContext ctx;
    
        @Override
        protected void executeInternal(JobExecutionContext context)
                throws JobExecutionException {
            try {
                Object otargetObject = ctx.getBean(targetObject);
                Method m = null;
                try {
                    m = otargetObject.getClass().getMethod(targetMethod);
                    m.invoke(otargetObject);
                } catch (SecurityException e) {
                    e.printStackTrace();
                } catch (NoSuchMethodException e) {
                    e.printStackTrace();
                }
            } catch (Exception e) {
                throw new JobExecutionException(e);
            }
        }
    
        public void setApplicationContext(ApplicationContext applicationContext) {
            this.ctx = applicationContext;
        }
    
        public void setTargetObject(String targetObject) {
            this.targetObject = targetObject;
        }
    
        public void setTargetMethod(String targetMethod) {
            this.targetMethod = targetMethod;
        }
    }
    
    

    或者绑定JOB的时候就直接绑定到实际的定时任务类,不要通过反射(网友们说可能会出现序列化问题,我并没有尝试过)

    环境:2.0.3的spring-boot-starter-quartz。orcale数据库。

        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.3.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-quartz</artifactId>
            </dependency>
    
            <dependency>
                <groupId>com.github.noraui</groupId>
                <artifactId>ojdbc7</artifactId>
                <version>12.1.0.2</version>
            </dependency>
    
            <dependency>
                <groupId>com.mchange</groupId>
                <artifactId>c3p0</artifactId>
                <version>0.9.2.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    

    参考:

    quartz任务串行并行

    Scheduled with fixed delay in quartz scheduler?

    相关文章

      网友评论

        本文标题:Quartz定时任务串行调度 fixedDelay

        本文链接:https://www.haomeiwen.com/subject/yowjmftx.html