美文网首页java 设计Java学习笔记IT在线课程
定时任务管理系统(Quartz和Spring的整合)开源和源码简

定时任务管理系统(Quartz和Spring的整合)开源和源码简

作者: holly_wang_王小飞 | 来源:发表于2017-03-12 21:22 被阅读2675次

    利用学习的时间这里写了个Spring和Quartz结合的一个web项目,纯后端的项目,restful接口
    实现对定时任务的增、删、改、查、停止, 启动、定时规则修改、立即执行等。github地址:holly-quartz-web,这里刚开始是为了学习源码,后来有了一些改动,再后来就想做一些业务上的改造,所以clone了一个quartz-core的项目进行改造,后期打算对其集群方式进行改造等等。github地址:quartz-core,有一起感兴趣的朋友可以一起改造,目前的项目比较简单可以作为学习入门的项目,也可以作为搭建job管理系统的初期项目,慢慢迭代。

    的时候我们讲了下整体run方法以及集群实现的核心思想,进一步解释这条规则就是:一个调度器实例在执行涉及到分布式问题的数据库操作前,首先要获取QUARTZ2_LOCKS表中对应当前调度器的行级锁,获取锁后即可执行其他表中的数据库操作,随着操作事务的提交,行级锁被释放,供其他调度器实例获取.
    集群中的每一个调度器实例都遵循这样一种严格的操作规程,那么对于同一类调度器来说,每个实例对数据库的操作只能是串行的.而不同名的调度器之间却可以并行执行.这节我们看下细节,Quartz的一些设计上的取舍,以及有节点宕机后的job恢复执行(别的服务器节点是怎么接替它的任务的)。
      在JobStoreSupport类中有个内部类ClusterManager,ClusterManager也是个Thread,在run方法中

     @Override
            public void run() {
                while (!shutdown) {
    
                    if (!shutdown) {
                        long timeToSleep = getClusterCheckinInterval();
                        long transpiredTime = (System.currentTimeMillis() - lastCheckin);
                        timeToSleep = timeToSleep - transpiredTime;
                        if (timeToSleep <= 0) {
                            timeToSleep = 100L;
                        }
    
                        if(numFails > 0) {
                            timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
                        }
                        
                        try {
                            Thread.sleep(timeToSleep);
                        } catch (Exception ignore) {
                        }
                    }
    
                    if (!shutdown && this.manage()) {
                        signalSchedulingChangeImmediately(0L);
                    }
    
                }//while !shutdown
            }
        }
    

    从方法中可以看出 run期间会sleep一会,实际运行的是manage方法

    private boolean manage() {
                boolean res = false;
                try {
    
                    res = doCheckin();
    
                    numFails = 0;
                    getLog().debug("ClusterManager: Check-in complete.");
                } catch (Exception e) {
                    if(numFails % 4 == 0) {
                        getLog().error(
                            "ClusterManager: Error managing cluster: "
                                    + e.getMessage(), e);
                    }
                    numFails++;
                }
                return res;
            }
    

    实际的方法还是在

    protected boolean doCheckin() throws JobPersistenceException {
            boolean transOwner = false;
            boolean transStateOwner = false;
            boolean recovered = false;
    
            Connection conn = getNonManagedTXConnection();
            try {
                // Other than the first time, always checkin first to make sure there is 
                // work to be done before we acquire the lock (since that is expensive, 
                // and is almost never necessary).  This must be done in a separate
                // transaction to prevent a deadlock under recovery conditions.
                List<SchedulerStateRecord> failedRecords = null;
                if (!firstCheckIn) {
                    failedRecords = clusterCheckIn(conn);
                    commitConnection(conn);
                }
                
                if (firstCheckIn || (failedRecords.size() > 0)) {
                    getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS);
                    transStateOwner = true;
        
                    // Now that we own the lock, make sure we still have work to do. 
                    // The first time through, we also need to make sure we update/create our state record
                    failedRecords = (firstCheckIn) ? clusterCheckIn(conn) : findFailedInstances(conn);
        
                    if (failedRecords.size() > 0) {
                        getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
                        //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
                        transOwner = true;
        
                        clusterRecover(conn, failedRecords);
                        recovered = true;
                    }
                }
                
                commitConnection(conn);
            } catch (JobPersistenceException e) {
                rollbackConnection(conn);
                throw e;
            } finally {
                try {
                    releaseLock(LOCK_TRIGGER_ACCESS, transOwner);
                } finally {
                    try {
                        releaseLock(LOCK_STATE_ACCESS, transStateOwner);
                    } finally {
                        cleanupConnection(conn);
                    }
                }
            }
    
            firstCheckIn = false;
    
            return recovered;
        }   
    
    

    先检查数据库中的,再对自身做一下检查。当发现最后有失败的节点的时候会进行恢复。clusterRecover方法就是进行恢复的方法。

     protected void clusterRecover(Connection conn, List<SchedulerStateRecord> failedInstances)
            throws JobPersistenceException {
    
            if (failedInstances.size() > 0) {
    
                long recoverIds = System.currentTimeMillis();
    
                logWarnIfNonZero(failedInstances.size(),
                        "ClusterManager: detected " + failedInstances.size()
                                + " failed or restarted instances.");
                try {
                    for (SchedulerStateRecord rec : failedInstances) {
                        getLog().info(
                                "ClusterManager: Scanning for instance \""
                                        + rec.getSchedulerInstanceId()
                                        + "\"'s failed in-progress jobs.");
    
                        List<FiredTriggerRecord> firedTriggerRecs = getDelegate()
                                .selectInstancesFiredTriggerRecords(conn,
                                        rec.getSchedulerInstanceId());
    
                        int acquiredCount = 0;
                        int recoveredCount = 0;
                        int otherCount = 0;
    
                        Set<TriggerKey> triggerKeys = new HashSet<TriggerKey>();
    
                        for (FiredTriggerRecord ftRec : firedTriggerRecs) {
    
                            TriggerKey tKey = ftRec.getTriggerKey();
                            JobKey jKey = ftRec.getJobKey();
    
                            triggerKeys.add(tKey);
    
                            // release blocked triggers..
                            if (ftRec.getFireInstanceState().equals(STATE_BLOCKED)) {
                                getDelegate()
                                        .updateTriggerStatesForJobFromOtherState(
                                                conn, jKey,
                                                STATE_WAITING, STATE_BLOCKED);
                            } else if (ftRec.getFireInstanceState().equals(STATE_PAUSED_BLOCKED)) {
                                getDelegate()
                                        .updateTriggerStatesForJobFromOtherState(
                                                conn, jKey,
                                                STATE_PAUSED, STATE_PAUSED_BLOCKED);
                            }
    
                            // release acquired triggers..
                            if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) {
                                getDelegate().updateTriggerStateFromOtherState(
                                        conn, tKey, STATE_WAITING,
                                        STATE_ACQUIRED);
                                acquiredCount++;
                            } else if (ftRec.isJobRequestsRecovery()) {
                                // handle jobs marked for recovery that were not fully
                                // executed..
                                if (jobExists(conn, jKey)) {
                                    @SuppressWarnings("deprecation")
                                    SimpleTriggerImpl rcvryTrig = new SimpleTriggerImpl(
                                            "recover_"
                                                    + rec.getSchedulerInstanceId()
                                                    + "_"
                                                    + String.valueOf(recoverIds++),
                                            Scheduler.DEFAULT_RECOVERY_GROUP,
                                            new Date(ftRec.getScheduleTimestamp()));
                                    rcvryTrig.setJobName(jKey.getName());
                                    rcvryTrig.setJobGroup(jKey.getGroup());
                                    rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY);
                                    rcvryTrig.setPriority(ftRec.getPriority());
                                    JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup());
                                    jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName());
                                    jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup());
                                    jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp()));
                                    jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_SCHEDULED_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getScheduleTimestamp()));
                                    rcvryTrig.setJobDataMap(jd);
    
                                    rcvryTrig.computeFirstFireTime(null);
                                    storeTrigger(conn, rcvryTrig, null, false,
                                            STATE_WAITING, false, true);
                                    recoveredCount++;
                                } else {
                                    getLog()
                                            .warn(
                                                    "ClusterManager: failed job '"
                                                            + jKey
                                                            + "' no longer exists, cannot schedule recovery.");
                                    otherCount++;
                                }
                            } else {
                                otherCount++;
                            }
    
                            // free up stateful job's triggers
                            if (ftRec.isJobDisallowsConcurrentExecution()) {
                                getDelegate()
                                        .updateTriggerStatesForJobFromOtherState(
                                                conn, jKey,
                                                STATE_WAITING, STATE_BLOCKED);
                                getDelegate()
                                        .updateTriggerStatesForJobFromOtherState(
                                                conn, jKey,
                                                STATE_PAUSED, STATE_PAUSED_BLOCKED);
                            }
                        }
    
                        getDelegate().deleteFiredTriggers(conn,
                                rec.getSchedulerInstanceId());
    
                        // Check if any of the fired triggers we just deleted were the last fired trigger
                        // records of a COMPLETE trigger.
                        int completeCount = 0;
                        for (TriggerKey triggerKey : triggerKeys) {
    
                            if (getDelegate().selectTriggerState(conn, triggerKey).
                                    equals(STATE_COMPLETE)) {
                                List<FiredTriggerRecord> firedTriggers =
                                        getDelegate().selectFiredTriggerRecords(conn, triggerKey.getName(), triggerKey.getGroup());
                                if (firedTriggers.isEmpty()) {
    
                                    if (removeTrigger(conn, triggerKey)) {
                                        completeCount++;
                                    }
                                }
                            }
                        }
    
                        logWarnIfNonZero(acquiredCount,
                                "ClusterManager: ......Freed " + acquiredCount
                                        + " acquired trigger(s).");
                        logWarnIfNonZero(completeCount,
                                "ClusterManager: ......Deleted " + completeCount
                                        + " complete triggers(s).");
                        logWarnIfNonZero(recoveredCount,
                                "ClusterManager: ......Scheduled " + recoveredCount
                                        + " recoverable job(s) for recovery.");
                        logWarnIfNonZero(otherCount,
                                "ClusterManager: ......Cleaned-up " + otherCount
                                        + " other failed job(s).");
    
                        if (!rec.getSchedulerInstanceId().equals(getInstanceId())) {
                            getDelegate().deleteSchedulerState(conn,
                                    rec.getSchedulerInstanceId());
                        }
                    }
                } catch (Throwable e) {
                    throw new JobPersistenceException("Failure recovering jobs: "
                            + e.getMessage(), e);
                }
            }
        }
    
    

    期间集群检查及恢复涉及到核心表示QRTZ_SCHEDULER_STATE表。会有检查的时间和间隔。判断是否故障想必大家很明白了。检查时间长时间没有更新。

    故障检测核心表

    发现有故障的节点 会接管对应的任务,并将该节点的数据删除以免下次再检测到,然后重复接管。

    集群方面基本就是这些内容,细节还得debug代码,好多我自己也不知道。看的出quartz在设计的时候有好多取舍的地方,以数据库为边界的操作,数据库随时有可能发生变化。 如果这时调度器发生了改变,新的trigger添加进来,那么有可能新添加的trigger比当前待执行的trigger更急迫,那么需要放弃当前trigger重新获取,然而,这里存在一个值不值得的问题,如果重新获取新trigger的时间要长于当前时间到新trigger出发的时间,那么即使放弃当前的trigger,仍然会导致xntrigger获取失败,但我们又不知道获取新的trigger需要多长时间,于是,我们做了一个主观的评判,若jobstore为RAM,那么假定获取时间需要7ms,若jobstore是持久化的,假定其需要70ms,当前时间与新trigger的触发时间之差小于这个值的我们认为不值得重新获取。这些都算是取舍吧,还有官方的文档介绍,集群特性对于高cpu使用率的任务效果很好,但是对于大量的短任务,各个节点都会抢占数据库锁,这样就出现大量的线程等待资源.这种情况随着节点的增加会越来越严重. 所以比较适合较长时间执行一次的任务
    Only one node will fire the job for each firing. What I mean by that is, if the job has a repeating trigger that tells it to fire every 10 seconds, then at 12:00:00 exactly one node will run the job, and at 12:00:10 exactly one node will run the job, etc. It won't necessarily be the same node each time - it will more or less be random which node runs it. The load balancing mechanism is near-random for busy schedulers (lots of triggers) but favors the same node for non-busy (e.g. few triggers) schedulers.

    The clustering feature works best for scaling out long-running and/or cpu-intensive jobs (distributing the work-load over multiple nodes). If you need to scale out to support thousands of short-running (e.g 1 second) jobs, consider partitioning the set of jobs by using multiple distinct schedulers (including multiple clustered schedulers for HA). The scheduler makes use of a cluster-wide lock, a pattern that degrades performance as you add more nodes (when going beyond about three nodes - depending upon your database's capabilities, etc.).

    整个分析先到此处,后续计划 会支持注解方式,即在一个类上用注解的方式即可实现任务的添加。集群方式改造,希望引入zookeeper实现。减轻短小任务对数据库的压力以及引入rpc模式实现远程电泳任务等等,后续持续更新。。。

    相关文章

      网友评论

        本文标题:定时任务管理系统(Quartz和Spring的整合)开源和源码简

        本文链接:https://www.haomeiwen.com/subject/uhzogttx.html