美文网首页
领略Quartz源码架构之美——源码实弹之运行过程(四)

领略Quartz源码架构之美——源码实弹之运行过程(四)

作者: 向光奔跑_ | 来源:发表于2018-12-11 17:05 被阅读0次

    本章阅读收获:可了解Quartz框架中的正式开始运行部分源码

    继上节内容

    上面我们讲到了QuartzSchedulerThread中run方法的第一小部分,下面我们继续来进行分析。

    QuartzSchedulerThread中run方法后续分析

                        if (triggers != null && !triggers.isEmpty()) {
    
                            now = System.currentTimeMillis();
                            //这里为什么triggers的第一个对象就是最早需要被执行的?
                            long triggerTime = triggers.get(0).getNextFireTime().getTime();
                            long timeUntilTrigger = triggerTime - now;
                            //如果第一条下次触发时间大于当前时间则进入等待
                            while(timeUntilTrigger > 2) {
                                synchronized (sigLock) {
                                    if (halted.get()) {
                                        break;
                                    }
                                    if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
                                        try {
                                            // we could have blocked a long while
                                            // on 'synchronize', so we must recompute
                                            now = System.currentTimeMillis();
                                            timeUntilTrigger = triggerTime - now;
                                            if(timeUntilTrigger >= 1)
                                                sigLock.wait(timeUntilTrigger);
                                        } catch (InterruptedException ignore) {
                                        }
                                    }
                                }
                                //等待的过程中看看有没有收到调度信号
                                if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
                                    break;
                                }
                                now = System.currentTimeMillis();
                                timeUntilTrigger = triggerTime - now;
                            }
    
                            // this happens if releaseIfScheduleChangedSignificantly decided to release triggers
                            if(triggers.isEmpty())
                                continue;
    
                            // set triggers to 'executing'
                            List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
    
                            boolean goAhead = true;
                            synchronized(sigLock) {
                                goAhead = !halted.get();
                            }
                            if(goAhead) {
                                try {
                                    List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                    if(res != null)
                                        bndles = res;
                                } catch (SchedulerException se) {
                                    qs.notifySchedulerListenersError(
                                            "An error occurred while firing triggers '"
                                                    + triggers + "'", se);
                                    //QTZ-179 : a problem occurred interacting with the triggers from the db
                                    //we release them and loop again
                                    for (int i = 0; i < triggers.size(); i++) {
                                        qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                    }
                                    continue;
                                }
    
                            }
    
                            for (int i = 0; i < bndles.size(); i++) {
                                TriggerFiredResult result =  bndles.get(i);
                                TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                                Exception exception = result.getException();
    
                                if (exception instanceof RuntimeException) {
                                    getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
                                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                    continue;
                                }
    
                                // it's possible to get 'null' if the triggers was paused,
                                // blocked, or other similar occurrences that prevent it being
                                // fired at this time...  or if the scheduler was shutdown (halted)
                                if (bndle == null) {
                                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                    continue;
                                }
    
                                // 下面是开始执行任务
                                JobRunShell shell = null;
                                try {
                                    //构造执行对象,JobRunShell实现了Runnable
                                    shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                    //这个里面会用我们自定义的Job来new一个对象,并把相关执行Job是需要的数据传给JobExecutionContextImpl(这是我们自定义job的execute方法参数)
                                    shell.initialize(qs);
                                } catch (SchedulerException se) {
                                    qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                    continue;
                                }
    
                                // 这里是把任务放入到线程池中
                                if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                    // this case should never happen, as it is indicative of the
                                    // scheduler being shutdown or a bug in the thread pool or
                                    // a thread pool being used concurrently - which the docs
                                    // say not to do...
                                    getLog().error("ThreadPool.runInThread() return false!");
                                    qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                }
    
                            }
    
                            continue; // while (!halted)
                        }
                    } else { // if(availThreadCount > 0)
                        // should never happen, if threadPool.blockForAvailableThreads() follows contract
                        continue; // while (!halted)
                    }
    

    首先是判断trigger是否存在,不存在的话就continue,继续外层while循环,保证定时任务一直运行下去。

    接下来分析triggers存在的情况下:

    这里会获取trigger第一个触发器下次触发时间,这里值是指距离现在最近的触发时间,但是为什么trigger数组中第一个就是最快的呢?

    这里不知大家还记得上一节中我们讲到的获取Trigger过程,它是存放在

     protected TreeSet<TriggerWrapper> timeTriggers = new TreeSet<TriggerWrapper>(new TriggerWrapperComparator());
    

    这个类的实现类就是根据Trigger的触发时间的远近来进行排序的,然后

    OperableTrigger trig = (OperableTrigger) tw.trigger.clone();
                    if (result.isEmpty()) {
                        batchEnd = Math.max(tw.trigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow;
                    }
                    result.add(trig);
    

    通过克隆复制添加进去的,这个方法中返回的就是result。

    接下来我们继续往下讲,我们会把根据距离触发时间的时间间隔,如果间隔大于2(这里为什么是2呢?我猜想主要是因为接下来的判断中有判断间隔大于1的,如果两个值相同就没有意思了,而且保证有足够的时间到下一步判断还能大于1)就继续往下走。

    接下来的方法源码是:

        /**
         * 是否有更早的触发器
         */
        private boolean isCandidateNewTimeEarlierWithinReason(long oldTime, boolean clearSignal) {
            
            synchronized(sigLock) {
    
                //看信号是否改变
                if (!isScheduleChanged())
                    return false;
    
                boolean earlier = false;
    
                //如果信号发生改变了,则比较信号下次触发时间
                if(getSignaledNextFireTime() == 0)
                    earlier = true;
                else if(getSignaledNextFireTime() < oldTime )
                    earlier = true;
    
                //下次触发时间可能比较早,但是如果本来执行时间快到了这个信号也不算
                if(earlier) {
                    // so the new time is considered earlier, but is it enough earlier?
                    long diff = oldTime - System.currentTimeMillis();
                    if(diff < (qsRsrcs.getJobStore().supportsPersistence() ? 70L : 7L))
                        earlier = false;
                }
    
                //把本次的信号清除掉
                if(clearSignal) {
                    clearSignaledSchedulingChange();
                }
    
                return earlier;
            }
        }
    

    这个方法就是判断执行器有没有更早的触发器,如果没有的话则等待触发器触发的时间间隔时间。(至于这里的信号量改变,具体是哪里触发的现在还不太懂)。

    接下来是releaseIfScheduleChangedSignificantly方法:
    ···

    private boolean releaseIfScheduleChangedSignificantly(
            List<OperableTrigger> triggers, long triggerTime) {
        if (isCandidateNewTimeEarlierWithinReason(triggerTime, true)) {
            // above call does a clearSignaledSchedulingChange()
            for (OperableTrigger trigger : triggers) {
                qsRsrcs.getJobStore().releaseAcquiredTrigger(trigger);
            }
            triggers.clear();
            return true;
        }
        return false;
    }
    

    ···
    这里面也调用了之前的isCandidateNewTimeEarlierWithinReason方法,为什么这里的Boolean参数就是true了呢?之前的是false。因为这里需要清空信号。

                //把本次的信号清除掉
                if(clearSignal) {
                    clearSignaledSchedulingChange();
                }
    

    之后就是把trigger这个列表都清空掉。

    之后我们就进入了:

                            if(goAhead) {
                                try {
                                    List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                    if(res != null)
                                        bndles = res;
                                } catch (SchedulerException se) {
                                    qs.notifySchedulerListenersError(
                                            "An error occurred while firing triggers '"
                                                    + triggers + "'", se);
                                    //QTZ-179 : a problem occurred interacting with the triggers from the db
                                    //we release them and loop again
                                    for (int i = 0; i < triggers.size(); i++) {
                                        qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                    }
                                    continue;
                                }
    
                            }
    

    就通知jobStore开始执行了,根据trigger获取其对应的JobDetail, 封装成TriggerFiredResult

        /**
         * 通知jobStore开始执行了,根据trigger获取其对应的JobDetail, 封装成TriggerFiredResult
         */
        public List<TriggerFiredResult> triggersFired(List<OperableTrigger> firedTriggers) {
    
            synchronized (lock) {
                List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>();
    
                for (OperableTrigger trigger : firedTriggers) {
                    TriggerWrapper tw = triggersByKey.get(trigger.getKey());
                    // was the trigger deleted since being acquired?
                    if (tw == null || tw.trigger == null) {
                        continue;
                    }
                    // was the trigger completed, paused, blocked, etc. since being acquired?
                    if (tw.state != TriggerWrapper.STATE_ACQUIRED) {
                        continue;
                    }
    
                    Calendar cal = null;
                    if (tw.trigger.getCalendarName() != null) {
                        cal = retrieveCalendar(tw.trigger.getCalendarName());
                        if(cal == null)
                            continue;
                    }
                    Date prevFireTime = trigger.getPreviousFireTime();
                    // in case trigger was replaced between acquiring and firing
                    timeTriggers.remove(tw);
                    // call triggered on our copy, and the scheduler's copy
                    tw.trigger.triggered(cal);
                    trigger.triggered(cal);
                    //tw.state = TriggerWrapper.STATE_EXECUTING;
                    tw.state = TriggerWrapper.STATE_WAITING;
    
                    TriggerFiredBundle bndle = new TriggerFiredBundle(retrieveJob(
                            tw.jobKey), trigger, cal,
                            false, new Date(), trigger.getPreviousFireTime(), prevFireTime,
                            trigger.getNextFireTime());
    
                    JobDetail job = bndle.getJobDetail();
    
                    if (job.isConcurrentExectionDisallowed()) {
                        ArrayList<TriggerWrapper> trigs = getTriggerWrappersForJob(job.getKey());
                        for (TriggerWrapper ttw : trigs) {
                            if (ttw.state == TriggerWrapper.STATE_WAITING) {
                                ttw.state = TriggerWrapper.STATE_BLOCKED;
                            }
                            if (ttw.state == TriggerWrapper.STATE_PAUSED) {
                                ttw.state = TriggerWrapper.STATE_PAUSED_BLOCKED;
                            }
                            timeTriggers.remove(ttw);
                        }
                        blockedJobs.add(job.getKey());
                    } else if (tw.trigger.getNextFireTime() != null) {
                        synchronized (lock) {
                            timeTriggers.add(tw);
                        }
                    }
    
                    results.add(new TriggerFiredResult(bndle));
                }
                return results;
            }
        }
    

    获取到相应启动的TriggerFiredResult任务后,便是循环执行:

                            for (int i = 0; i < bndles.size(); i++) {
                                TriggerFiredResult result =  bndles.get(i);
                                TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                                Exception exception = result.getException();
    
                                if (exception instanceof RuntimeException) {
                                    getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
                                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                    continue;
                                }
    
                                // it's possible to get 'null' if the triggers was paused,
                                // blocked, or other similar occurrences that prevent it being
                                // fired at this time...  or if the scheduler was shutdown (halted)
                                if (bndle == null) {
                                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                    continue;
                                }
    
                                // 下面是开始执行任务
                                JobRunShell shell = null;
                                try {
                                    //构造执行对象,JobRunShell实现了Runnable
                                    shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                    //这个里面会用我们自定义的Job来new一个对象,并把相关执行Job是需要的数据传给JobExecutionContextImpl(这是我们自定义job的execute方法参数)
                                    shell.initialize(qs);
                                } catch (SchedulerException se) {
                                    qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                    continue;
                                }
    
                                // 这里是把任务放入到线程池中
                                if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                    // this case should never happen, as it is indicative of the
                                    // scheduler being shutdown or a bug in the thread pool or
                                    // a thread pool being used concurrently - which the docs
                                    // say not to do...
                                    getLog().error("ThreadPool.runInThread() return false!");
                                    qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                }
    
                            }
    

    我们会把任务封装成JobRunShell线程任务,然后放到线程池中跑动。

    qsRsrcs.getThreadPool().runInThread(shell) 的源码是:

        public boolean runInThread(Runnable runnable) {
            if (runnable == null) {
                return false;
            }
    
            synchronized (nextRunnableLock) {
    
                handoffPending = true;
    
                // Wait until a worker thread is available
                while ((availWorkers.size() < 1) && !isShutdown) {
                    try {
                        nextRunnableLock.wait(500);
                    } catch (InterruptedException ignore) {
                    }
                }
    
                if (!isShutdown) {
                    WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
                    busyWorkers.add(wt);
                    wt.run(runnable);
                } else {
                    // If the thread pool is going down, execute the Runnable
                    // within a new additional worker thread (no thread from the pool).
                    WorkerThread wt = new WorkerThread(this, threadGroup,
                            "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
                    busyWorkers.add(wt);
                    workers.add(wt);
                    wt.start();
                }
                nextRunnableLock.notifyAll();
                handoffPending = false;
            }
    
            return true;
        }
    

    这里显示查询有没有空闲的线程,如果没有就等待。有的话,生成一个WorkerThread线程,去跑动
    wt.run(runnable);这里的runnable就是JobRunShell。
    那么至于JobRunShell里的run方法时怎样的呢?

        public void run() {
            qs.addInternalSchedulerListener(this);
    
            try {
                OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
                JobDetail jobDetail = jec.getJobDetail();
    
                do {
    
                    JobExecutionException jobExEx = null;
                    Job job = jec.getJobInstance();
    
                    try {
                        begin();
                    } catch (SchedulerException se) {
                        qs.notifySchedulerListenersError("Error executing Job ("
                                + jec.getJobDetail().getKey()
                                + ": couldn't begin execution.", se);
                        break;
                    }
    
                    // notify job & trigger listeners...
                    try {
                        if (!notifyListenersBeginning(jec)) {
                            break;
                        }
                    } catch(VetoedException ve) {
                        try {
                            CompletedExecutionInstruction instCode = trigger.executionComplete(jec, null);
                            qs.notifyJobStoreJobVetoed(trigger, jobDetail, instCode);
                            
                            // QTZ-205
                            // Even if trigger got vetoed, we still needs to check to see if it's the trigger's finalized run or not.
                            if (jec.getTrigger().getNextFireTime() == null) {
                                qs.notifySchedulerListenersFinalized(jec.getTrigger());
                            }
    
                            complete(true);
                        } catch (SchedulerException se) {
                            qs.notifySchedulerListenersError("Error during veto of Job ("
                                    + jec.getJobDetail().getKey()
                                    + ": couldn't finalize execution.", se);
                        }
                        break;
                    }
    
                    long startTime = System.currentTimeMillis();
                    long endTime = startTime;
    
                    // execute the job
                    try {
                        log.debug("Calling execute on job " + jobDetail.getKey());
                        job.execute(jec);
                        endTime = System.currentTimeMillis();
                    } catch (JobExecutionException jee) {
                        endTime = System.currentTimeMillis();
                        jobExEx = jee;
                        getLog().info("Job " + jobDetail.getKey() +
                                " threw a JobExecutionException: ", jobExEx);
                    } catch (Throwable e) {
                        endTime = System.currentTimeMillis();
                        getLog().error("Job " + jobDetail.getKey() +
                                " threw an unhandled Exception: ", e);
                        SchedulerException se = new SchedulerException(
                                "Job threw an unhandled exception.", e);
                        qs.notifySchedulerListenersError("Job ("
                                + jec.getJobDetail().getKey()
                                + " threw an exception.", se);
                        jobExEx = new JobExecutionException(se, false);
                    }
    
                    jec.setJobRunTime(endTime - startTime);
    
                    // notify all job listeners
                    if (!notifyJobListenersComplete(jec, jobExEx)) {
                        break;
                    }
    
                    CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP;
    
                    // update the trigger
                    try {
                        instCode = trigger.executionComplete(jec, jobExEx);
                    } catch (Exception e) {
                        // If this happens, there's a bug in the trigger...
                        SchedulerException se = new SchedulerException(
                                "Trigger threw an unhandled exception.", e);
                        qs.notifySchedulerListenersError(
                                "Please report this error to the Quartz developers.",
                                se);
                    }
    
                    // notify all trigger listeners
                    if (!notifyTriggerListenersComplete(jec, instCode)) {
                        break;
                    }
    
                    // update job/trigger or re-execute job
                    if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) {
                        jec.incrementRefireCount();
                        try {
                            complete(false);
                        } catch (SchedulerException se) {
                            qs.notifySchedulerListenersError("Error executing Job ("
                                    + jec.getJobDetail().getKey()
                                    + ": couldn't finalize execution.", se);
                        }
                        continue;
                    }
    
                    try {
                        complete(true);
                    } catch (SchedulerException se) {
                        qs.notifySchedulerListenersError("Error executing Job ("
                                + jec.getJobDetail().getKey()
                                + ": couldn't finalize execution.", se);
                        continue;
                    }
    
                    qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
                    break;
                } while (true);
    
            } finally {
                qs.removeInternalSchedulerListener(this);
            }
        }
    

    我们这边就抓住关键点,就是Job job = jec.getJobInstance();
    还有 job.execute(jec);去执行任务,这样子整个调度执行任务就串联起来啦。

    结束语

    本章结束了整个Quartz调度器整个调度任务,当然有很多细节,比如调度器状态、触发器状态、任务状态等等还没有理解透彻,但是我认为对于我现在来说不是特别的重要,如果有需要我再进去深入修改就好了。最主要的是知道了Quartz是怎么玩的。而且最近工作中也没有特别用Quartz的地方,倒是数据库路由这块需要用到,所以我又要开启阅读数据库路由的源码啦~~~~ 大家有兴趣的可以持续关注我哦~~~~

    相关文章

      网友评论

          本文标题:领略Quartz源码架构之美——源码实弹之运行过程(四)

          本文链接:https://www.haomeiwen.com/subject/yklshqtx.html