美文网首页
quartz源码阅读

quartz源码阅读

作者: 海蟾子_null | 来源:发表于2018-05-01 20:27 被阅读0次

前面的话

这里只对quartz的源码做一个整体的梳理,关于quartz的整体结构,百度Google之,一堆一堆的。

具体阅读

quartz中主要围绕3个东东搞各种逻辑。分别是调度器(Scheduler),触发器(trigger)和任务(job)。调度器去获取触发器,触发器指定任务的调度时间,调度策略,调度状态,优先级,开始时间,结束时间等信息。任务就是具体的业务逻辑实现。

一个栗子进入代码

        SchedulerFactory factory = new StdSchedulerFactory("test_quartz.properties");
        Scheduler scheduler = factory.getScheduler();
        scheduler.start();
        Trigger t = newTrigger().withIdentity("t1","g1").startAt(new Date(1466746025000l)).withSchedule(simpleSchedule().withMisfireHandlingInstructionNextWithRemainingCount().withRepeatCount(0)).build();
        JobDetail job = newJob(TestJob.class).withIdentity("myJob1", "g1").build();
        scheduler.scheduleJob(job,t);
    }
public class TestJob implements Job {
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        try {
            Thread.sleep(20000);
            System.out.println(context.getTrigger().getKey()+"执行成功!!!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

上面两段代码就是一个简单的任务的写法。
主要过程如下:
1、首先通过调度器工厂获取一个调度器。启动调度器。
2、定义触发器。
3、定义任务。
4、通过调度器将触发器和任务关联起来。
首先来看下调度器的初始化。
调度器工厂初始化主要是读取配置信息。通过getScheduler方法才是真正的初始化scheduler,里边主要是通过配置信息组装scheduler。这里不是重点,一笔带过。【注:在调度器组装的时候,顺便启动了任务的执行线程
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);

public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
        throws SchedulerException {
        this.resources = resources;
        if (resources.getJobStore() instanceof JobListener) {
            addInternalJobListener((JobListener)resources.getJobStore());
        }

        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        schedThreadExecutor.execute(this.schedThread);

,只是在线程启动后一直等待,知道调度器调用start方法

while (paused && !halted.get()) {
                      try {
                          // wait until togglePause(false) is called...
                          sigLock.wait(1000L);
                      } catch (InterruptedException ignore) {
                      }

下面试调度器的启动。

调度器启动过程

quartz支持集群模式下的任务调度。任务持久化采用DB的方式。
这里主要涉及集群模式下的任务执行过程。
启动过程代码如下:

 public void start() throws SchedulerException {
        if (shuttingDown|| closed) {
            throw new SchedulerException(
                    "The Scheduler cannot be restarted after shutdown() has been called.");
        }
        notifySchedulerListenersStarting();
        if (initialStart == null) {
            initialStart = new Date();
    //调度器第一次启动
            this.resources.getJobStore().schedulerStarted();            
            startPlugins();
        } else {
          
            resources.getJobStore().schedulerResumed();
        }
    //将执行线程唤醒。用于获取触发器,触发任务
        schedThread.togglePause(false);
        getLog().info(
                "Scheduler " + resources.getUniqueIdentifier() + " started.");        
        notifySchedulerListenersStarted();
    }
public void schedulerStarted() throws SchedulerException {

//如果是集群,启动集群的管理线程,定时检查集群的健康性。
        if (isClustered()) {
            clusterManagementThread = new ClusterManager();
            if(initializersLoader != null)
                clusterManagementThread.setContextClassLoader(initializersLoader);
            clusterManagementThread.initialize();
        } else {
            try {
  //非集群,则恢复调度器宕机前的任务
                recoverJobs();
            } catch (SchedulerException se) {
                throw new SchedulerConfigException(
                        "Failure occured during job recovery.", se);
            }
        }
//起线程,用于检查是否有任务错过执行时间,若有,则根据不同的策略修改不同的nextfiretime值,以便于工作线程去选择trigger。
        misfireHandler = new MisfireHandler();
        if(initializersLoader != null)
            misfireHandler.setContextClassLoader(initializersLoader);
        misfireHandler.initialize();
        schedulerRunning = true;
        
        getLog().debug("JobStore background threads started (as scheduler was started).");
    }

下面深入对三大线程做讲解。QuartzSchedulerThread,ClusterManager和MisfireHandler。

QuartzSchedulerThread

这个线程是quartz的主要线程,负责调度的。看下代码:
run方法很长,这里选取主要的代码。

  public void run() {
        int acquiresFailed = 0;
        while (!halted.get()) {
            try {
                synchronized (sigLock) {
                    while (paused && !halted.get()) {
                        try {
                           //scheduler调用start方法前,在此处循环,直到start方法里调用了togglePause方法。
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }
。。。
                int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
                    try {
                      //获取当前可以被触发的trigger
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                        acquiresFailed = 0;
                     。。。

                        List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
                        if(goAhead) {
                            try {
                              //触发trigger,主要是修改qrtz_trigger表中trigger的状态从acquired状态变成WATTING或者complete状态。并计算下一次执行时间,等待下一次被选中。同时修改QRTZ_FIRED_TRIGGERS中trigger状态为executing状态。
                                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                if(res != null)
                                    bndles = res;
。。。
                            JobRunShell shell = null;
                            try {
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }
                            //真正的到了调用job的execute的地方了,该方法执行完成之后,本次调度就真正完成了。
                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                // this case should never happen, as it is indicative of the
                                // scheduler being shutdown or a bug in the thread pool or
                                // a thread pool being used concurrently - which the docs
                                // say not to do...
                                getLog().error("ThreadPool.runInThread() return false!");
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }

                        }

                        continue; // while (!halted)
 //后面就是该线程等待一段时间,用于其他节点来调度任务。
    }

总结起来,上面代码的逻辑如下:
1、启动时,该线程一直都在等待,知道有调用scheudler的start方法,开始唤醒该线程。
2、查看当前任务的处理线程池里空闲线程的个数,然后去qrtz_triggers表中获取可以处理的trigger,并将trigger的状态改为acquired,同时插入表qrtz_fired_triggers,此时qrtz_fired_triggers表中trigger的状态也为acquired。
3、获取到待执行的trigger,由于取的是时间窗里的trigger,所以,从待执行的trigger列表中取第一个trigger(trigger列表是按照next_fire_time升序排列),与当前时间比较,如果大于2s,则等待。
4、等到第一个trigger的任务到了,则去qrtz_triggers表中再次确认获取到的trigger的状态是否为aquired,若是,则修改qrtz_fired_triggers状态为executing。同时,qrtz_triggers中的状态在本次调度时已经走到尽头,可以等待下一次的调度了。即,计算下一次的调度时间,将并将任务状态改为watting状态。若计算得到的下一次调度时间为null,则表明该任务已经执行完成。将任务改为complete状态。返回待本次调度的trigger。
5、循环trigger,获取任务执行线程,执行任务的execute方法。
6、改调度线程wait一段时间,等待下一次获取trigger,调度。
接下来看下真正调度的线程JobRunShell,同样,很长的run方法,这里只摘取部分代码:

 public void run() {
        qs.addInternalSchedulerListener(this);
        try {
            OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
            JobDetail jobDetail = jec.getJobDetail();
            do {
                JobExecutionException jobExEx = null;
                Job job = jec.getJobInstance();
......
                long startTime = System.currentTimeMillis();
                long endTime = startTime;
                try {
                    job.execute(jec);
                    endTime = System.currentTimeMillis();
                } catch (JobExecutionException jee) {
                    endTime = System.currentTimeMillis();
                    jobExEx = jee;
                    getLog().info("Job " + jobDetail.getKey() +
                            " threw a JobExecutionException: ", jobExEx);
                } catch (Throwable e) {
                    endTime = System.currentTimeMillis();
                    getLog().error("Job " + jobDetail.getKey() +
                            " threw an unhandled Exception: ", e);
                    SchedulerException se = new SchedulerException(
                            "Job threw an unhandled exception.", e);
                    qs.notifySchedulerListenersError("Job ("
                            + jec.getJobDetail().getKey()
                            + " threw an exception.", se);
                    jobExEx = new JobExecutionException(se, false);
                }

                jec.setJobRunTime(endTime - startTime);
......
                CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP;
                try {
                    instCode = trigger.executionComplete(jec, jobExEx);
                } catch (Exception e) {
                    // If this happens, there's a bug in the trigger...
                    SchedulerException se = new SchedulerException(
                            "Trigger threw an unhandled exception.", e);
                    qs.notifySchedulerListenersError(
                            "Please report this error to the Quartz developers.",
                            se);
                }
                if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) {
                    jec.incrementRefireCount();
                    try {
                        complete(false);
                    } catch (SchedulerException se) {
                        qs.notifySchedulerListenersError("Error executing Job ("
                                + jec.getJobDetail().getKey()
                                + ": couldn't finalize execution.", se);
                    }
                    continue;
                }

                try {
                    complete(true);
                } catch (SchedulerException se) {
                    qs.notifySchedulerListenersError("Error executing Job ("
                            + jec.getJobDetail().getKey()
                            + ": couldn't finalize execution.", se);
                    continue;
                }

                qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
                break;
            } while (true);

        } finally {
            qs.removeInternalSchedulerListener(this);
        }
    }

上述代码的逻辑很简单,就是获取job并执行job的execute方法。执行完成之后,通过不同的返回码,进行不同的数据库操作。 qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);这句话就是通过不同的返回值做不同的数据库操作。主要是修改qrtz_triggers里的trigger状态及某些场景下删除trigger。然后是删除qrtz_fired_triggers里的当前trigger。
到此,正常的任务调度完成了。当然其中很多步骤里都调用了SchedulerListener,TriggerListener中的一些方法,这些是quartz开放出来的定制接口,方便每步操作时,我们对任务的监控。

状态转换

接下来看misfired的进程MisfireHandler。

Misfirehandler是一个内部类。run接口 代码如下:

  public void run() {
            
            while (!shutdown) {

                long sTime = System.currentTimeMillis();

//获取misfired的job
                RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage();

// 如果任务处理线程在等待下一次的扫描满足的trigger,则唤醒线程,来处理misfired的任务
                if (recoverMisfiredJobsResult.getProcessedMisfiredTriggerCount() > 0) {
                    signalSchedulingChangeImmediately(recoverMisfiredJobsResult.getEarliestNewTime());
                }

                if (!shutdown) {
                    long timeToSleep = 50l;  // At least a short pause to help balance threads
                    if (!recoverMisfiredJobsResult.hasMoreMisfiredTriggers()) {
                        timeToSleep = getMisfireThreshold() - (System.currentTimeMillis() - sTime);
                        if (timeToSleep <= 0) {
                            timeToSleep = 50l;
                        }

                        if(numFails > 0) {
                            timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
                        }
                    }
                    
                    try {
                        Thread.sleep(timeToSleep);
                    } catch (Exception ignore) {
                    }
                }//while !shutdown
            }
        }

主要逻辑在manager方法里。

 private RecoverMisfiredJobsResult manage() {
            try {
                getLog().debug("MisfireHandler: scanning for misfires...");

                RecoverMisfiredJobsResult res = doRecoverMisfires();
                numFails = 0;
                return res;
            } catch (Exception e) {
                if(numFails % 4 == 0) {
                    getLog().error(
                        "MisfireHandler: Error handling misfires: "
                                + e.getMessage(), e);
                }
                numFails++;
            }
            return RecoverMisfiredJobsResult.NO_OP;
        }

主要的方法是doRecoverMisfires()。

protected RecoverMisfiredJobsResult doRecoverMisfires() throws JobPersistenceException {
        boolean transOwner = false;
        Connection conn = getNonManagedTXConnection();
        try {
            RecoverMisfiredJobsResult result = RecoverMisfiredJobsResult.NO_OP;
            
            // Before we make the potentially expensive call to acquire the 
            // trigger lock, peek ahead to see if it is likely we would find
            // misfired triggers requiring recovery.
            int misfireCount = (getDoubleCheckLockMisfireHandler()) ?
                getDelegate().countMisfiredTriggersInState(
                    conn, STATE_WAITING, getMisfireTime()) : 
                Integer.MAX_VALUE;
            
            if (misfireCount == 0) {
                getLog().debug(
                    "Found 0 triggers that missed their scheduled fire-time.");
            } else {
                transOwner = getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
                //修改misfired的next fired time ,等待任务选取线程去调度
                result = recoverMisfiredJobs(conn, false);
            }
            
            commitConnection(conn);
            return result;

int misfireCount = (getDoubleCheckLockMisfireHandler()) ? getDelegate().countMisfiredTriggersInState( conn,STATE_WAITING,getMisfireTime()) :
获取misfired的trigger,执行的查询为select count(TRIGGER_NAME) from QRTZ_TRIGGERS where SCHED_NAME=xxx and not (MISFIRE_INSTR = -1 ) and NEXT_FIRE_TIME < 当前时间 and TRIGGER_STATE='STATE_WAITING'即选取当前调度器的misfired的策略不为-1的,且下一次执行时间小于当前时间的且状态为waiting的trigger。
result = recoverMisfiredJobs(conn, false);真正获取misfired的job的时候了。

protected RecoverMisfiredJobsResult recoverMisfiredJobs(
        Connection conn, boolean recovering)
        throws JobPersistenceException, SQLException {

        // If recovering, we want to handle all of the misfired
        // triggers right away.
        int maxMisfiresToHandleAtATime = 
            (recovering) ? -1 : getMaxMisfiresToHandleAtATime();
        
        List<TriggerKey> misfiredTriggers = new LinkedList<TriggerKey>();
        long earliestNewTime = Long.MAX_VALUE;
        // We must still look for the MISFIRED state in case triggers were left 
        // in this state when upgrading to this version that does not support it. 
        boolean hasMoreMisfiredTriggers =
            getDelegate().hasMisfiredTriggersInState(
                conn, STATE_WAITING, getMisfireTime(), 
                maxMisfiresToHandleAtATime, misfiredTriggers);

        if (hasMoreMisfiredTriggers) {
            getLog().info(
                "Handling the first " + misfiredTriggers.size() +
                " triggers that missed their scheduled fire-time.  " +
                "More misfired triggers remain to be processed.");
        } else if (misfiredTriggers.size() > 0) { 
            getLog().info(
                "Handling " + misfiredTriggers.size() + 
                " trigger(s) that missed their scheduled fire-time.");
        } else {
            getLog().debug(
                "Found 0 triggers that missed their scheduled fire-time.");
            return RecoverMisfiredJobsResult.NO_OP; 
        }

        for (TriggerKey triggerKey: misfiredTriggers) {
            
            OperableTrigger trig = 
                retrieveTrigger(conn, triggerKey);

            if (trig == null) {
                continue;
            }

            doUpdateOfMisfiredTrigger(conn, trig, false, STATE_WAITING, recovering);

            if(trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime)
                earliestNewTime = trig.getNextFireTime().getTime();
        }

        return new RecoverMisfiredJobsResult(
                hasMoreMisfiredTriggers, misfiredTriggers.size(), earliestNewTime);
    }

每次获取misfired的trigger有一定的数量,默认20个,超过20个,则会在下一次去获取。
处理misfired的triggerdoUpdateOfMisfiredTrigger(conn, trig, false, STATE_WAITING, recovering);

 private void doUpdateOfMisfiredTrigger(Connection conn, OperableTrigger trig, boolean forceState, String newStateIfNotComplete, boolean recovering) throws JobPersistenceException {
        Calendar cal = null;
        if (trig.getCalendarName() != null) {
            cal = retrieveCalendar(conn, trig.getCalendarName());
        }

//触发triggerlistener的misfired方法。
        schedSignaler.notifyTriggerListenersMisfired(trig);

//根据不同的misfired的策略计算next_fired_time。【比较的绕,下次详细介绍】
        trig.updateAfterMisfire(cal);

        if (trig.getNextFireTime() == null) {
//如果下一次执行的时间为空,则认为执行的任务已经完成了。直接修改状态的state_complete
            storeTrigger(conn, trig,
                null, true, STATE_COMPLETE, forceState, recovering);
            schedSignaler.notifySchedulerListenersFinalized(trig);
        } else {
//修改任务的下一次执行时间和任务状态,等待调度线程去调度
            storeTrigger(conn, trig, null, true, newStateIfNotComplete,
                    forceState, recovering);
        }
    }

执行完上面的代码,提交了数据库事物后,任务就可以被正常调度了。到这里,misfired的任务大体的也完成。
然后就是回到manager方法了。唤醒调度线程,至此,misfired的本次扫描全部完成,接下来的事情就交给QuartzSchedulerThread来处理了。
写了好几天,终于写完了两个线程的处理过程。接下来的一篇主要介绍任务节点在down机的时候的处理及不同情况下trigger的next_fire_time的计算。

相关文章

网友评论

      本文标题:quartz源码阅读

      本文链接:https://www.haomeiwen.com/subject/pgefrftx.html