美文网首页Quartz 源码解读-初识Quartz
04 Quartz-初识Quartz-QuartzSchedul

04 Quartz-初识Quartz-QuartzSchedul

作者: 花神子 | 来源:发表于2019-05-20 16:16 被阅读0次
    quartz

    我在03 Quartz-初识Quartz-Scheduler初始化 篇中解说了getScheduler() 的instantiate(),简单的介绍了Scheduler的整个初始化过程,接下来一起了解下之后的流程;

    Scheduler使用

    getScheduler

    public Scheduler getScheduler() throws SchedulerException {
        if (cfg == null) {
            initialize();
        }
        //从缓存中查询获取Schedule任务,任务名称从配置中获取,若无指定,则默认指定QuartzScheduler
        SchedulerRepository schedRep = SchedulerRepository.getInstance();
        Scheduler sched = schedRep.lookup(getSchedulerName());
        //判断若存在且已停止运行,则从缓存中移除
        if (sched != null) {
            if (sched.isShutdown()) {
                schedRep.remove(getSchedulerName());
            } else {
                return sched;
            }
        }
        sched = instantiate();
        return sched;
    }
    

    start()

    之前在03 Quartz-初识Quartz-Scheduler初始化解说Scheduler 其实就是交互API,里面还有一个QuartzScheduler的实例变量,调用start()方法真正调用的是QuartzScheduler的start()方法。

    public void start() throws SchedulerException {
        sched.start(); 
    }
    

    QuartzScheduler的start()方法。

    • 进入start方法,判断是否已关闭或者停止;

    • 进行Scheduler Starting Listener通知 (就是轮询所以的监听器进行逐一通知)

    • this.resources.getJobStore().schedulerStarted(); 因为使用内存模型:其实就是个空实现 什么都没做。

    • startPlugins(); 如有则逐一调用配置插件的start()方法;

    • togglePause:进行信号通知

    • notifySchedulerListenersStarted();

    public void start() throws SchedulerException {
    
        if (shuttingDown|| closed) {  throw ( "... 省略异常处理过程;"); }
    
        // QTZ-212 : calling new schedulerStarting() method on the listeners
        // right after entering start()
        notifySchedulerListenersStarting();
        //如果不为空则进行恢复逻辑
        if (initialStart == null) {
            initialStart = new Date();
            this.resources.getJobStore().schedulerStarted();            
            startPlugins();
        } else {
            resources.getJobStore().schedulerResumed();
        }
    
        schedThread.togglePause(false);
    
        getLog().info(
                "Scheduler " + resources.getUniqueIdentifier() + " started.");
        
        notifySchedulerListenersStarted();
    }
    

    1.1.3 togglePause()

    看下这个方法schedThread.togglePause(false);属于QuartzSchedulerThread 这个方法其实全局就两处进行调用,参数决定是否进行暂停切换。初始化这职位false; 其他只有关闭等逻辑才进行设置true

    • 设置paused = false;

    • 其次就是调用了sigLock.notifyAll(); ? 它是在唤醒谁呢?谁想要唤醒?其实这个我们在03 Quartz-初识Quartz-Scheduler初始化篇幅中解说QuartzScheduler的初始化逻辑的时候就已经介绍了。

    /**
     * <p>
     * Signals the main processing loop to pause at the next possible point.
     * </p>
     */
    void togglePause(boolean pause) {
        synchronized (sigLock) {
            paused = pause;
    
            if (paused) {
                signalSchedulingChange(0);
            } else {
                sigLock.notifyAll();
            }
        }
    }
    
    • 我们查看全局当中只有一处使用了sigLock.wait(); 显而易见就是这,就是当前类的run() 方法,不要忘记我们真正的核心是QuartzScheduler, QuartzScheduler维护整个Quartz的所有运行时实例对象。QuartzSchedulerThread是Quartz的主线程对象。

    1.1.4 run()

    The main processing loop of the <code>QuartzSchedulerThread</code>.
    正如上面所说:

    run()方法比较长,只能逐一解说了。

    • a. 首先是声明一个int变量acquiresFailed=0;一般来说为0,只有获取下一个触发器发生异常才会自增。
    int acquiresFailed = 0;//a
    
    • b. 其次就是一个while循环 也就是run的主体逻辑,循环判断接收一个 AtomicBoolean 作为是否暂停的判断;
    • c. 循环体的第一个就是一个同步块(里面也是一个循环体):如果是初始化则主线程就是在同步块进行等待,等其他获得sigLock的线程发起信号通知。如果有其他线程发起激活信号(sigLock.notifyAll())传递就继续向下执行。

    • d. 如果第一次获取失败则进行获取访问此jobStore时反复失败的等待时间(以毫秒为单位).RAM模式为20;然后暂停等待。

    • e. 判断线程池是否有空闲线程
      **qsRsrcs.getThreadPool().blockForAvailableThreads(); **获取可用线程数量;然后清除调度改变的信号。

    //获取可用线程个数
    public int blockForAvailableThreads() {
        synchronized (nextRunnableLock) {
            while ((availWorkers.size() < 1 || handoffPending) && !isShutdown) {
                try {
                    nextRunnableLock.wait(500);
                } catch (InterruptedException ignore) {
                }
            }
            //返回可用线程集合的大小
            return availWorkers.size();
        }
    }
    
    • f. 从jobStore中获取下次被触发的触发器(是根据活跃线程数最大批量大小(默认为1)的两者最小值作为本次获取触发容器的大小:所以此处我们可根据自身服务器的特性进行优化设置:当检查某个Trigger应该触发时,默认每次只Acquire一个Trigger,(为什么要有Acquire的过程呢?是为了防止多线程访问的情况下,同一个Trigger被不同的线程多次触发)。尤其是使用JDBC JobStore时,一次Acquire就是一个update语句,尽可能一次性的多获取几个Trigger,一起触发,当定时器数量非常大的时候,这是个非常有效的优化。当定时器数量比较少时,触发不是极为频繁时,这个优化的意义就不大了。)。qsRsrcs.getJobStore().acquireNextTriggers (获取最近需要执行的任务列表(30s内的将要执行的任务)。然后根据执行时间进行排序,然后计算出需要wait()的时间。当调度时间来临,欢迎主线程,将任务交给一个线程池进行执行。)该方法还是比较重要的,里面会进行一些判断逻辑applyMisfire 然后进行设置的策略对应处理。
    idleWaitTime:
    在调度程序处于空闲状态时,调度程序查询的触发器可用之前等待的时间量(以毫秒为单位),默认是30秒;30秒内没有需要执行的任务,则等待一个随机时间。getRandomizedIdleWaitTime产生一个30秒内随机等待时间。
    可通过配置属性org.quartz.scheduler.idleWaitTime设置。
    
    batchTriggerAcquisitionMaxCount:
    允许调度程序节点一次获取(用于触发)的触发器的最大数量,默认是1;
    可通过org.quartz.scheduler.batchTriggerAcquisitionMaxCount改写。
    
    batchTriggerAcquisitionFireAheadTimeWindow:
    时间窗口调节参数
    允许触发器在其预定的火灾时间之前被获取和触发的时间(毫秒)的时间量,默认是0;
    可通过org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow改写
    
    • 取到triggers之后会判断第一条是否到执行时,如果没有会一直循环等待,等待的过程中会根据信号判断是否有必要释放当前触发器重新调度;在QuartzSchedulerThread主线程中,首先会取出最近30秒内将要执行的任务,然后等待executeTime-now时间,然后在等待唤醒时交给线程池处理。
    • 为什么triggers集合的第一个就是最早需要被执行的?因为这个跟JobStore中存放triggers的数据结构有关,它是用TreeSet存放的;
    if (availThreadCount > 0) { // will always be true, due to  semantics of blockForAvailableThreads...
    List<OperableTrigger> triggers = null;
    long now = System.currentTimeMillis();
    
    //清除调度改变的信号
    clearSignaledSchedulingChange();
    try {
        //到JobStore中获取下次被触发的触发器
        triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime,
                Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
        lastAcquireFailed = false;
        if (log.isDebugEnabled())
            log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
    } catch (JobPersistenceException jpe) {
        if (!lastAcquireFailed) {
            qs.notifySchedulerListenersError(
                    "An error occurred while scanning for the next triggers to fire.", jpe);
        }
        lastAcquireFailed = true;
        continue;
    } catch (RuntimeException e) {
        if (!lastAcquireFailed) {
            getLog().error("quartzSchedulerThreadLoop: RuntimeException " + e.getMessage(), e);
        }
        lastAcquireFailed = true;
        continue;
    }
    
    if (triggers != null && !triggers.isEmpty()) {
        now = System.currentTimeMillis();
        //这里为什么triggers的第一个对象就是最早需要被执行的? 
        long triggerTime = triggers.get(0).getNextFireTime().getTime();
        long timeUntilTrigger = triggerTime - now;
        //如果第一条下次触发时间大于当前时间则进入等待
        while (timeUntilTrigger > 2) {
            synchronized (sigLock) {
                if (halted.get()) {
                    break;
                }
                if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
                    try {
                        now = System.currentTimeMillis();
                        timeUntilTrigger = triggerTime - now;
                        if (timeUntilTrigger >= 1)
                            sigLock.wait(timeUntilTrigger);
                    } catch (InterruptedException ignore) {
                    }
                }
            }
            //等待的过程中看看有没有收到调度信号
            if (releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
                break;
            }
            now = System.currentTimeMillis();
            timeUntilTrigger = triggerTime - now;
        }
    
        // this happens if releaseIfScheduleChangedSignificantly
        // decided to release triggers
        //这个可能在前面等待的时候被清理掉了
        if (triggers.isEmpty())
            continue;
    
    • g. qsRsrcs.getJobStore().triggersFired(triggers);JobStore中的triggersFired(triggers);方法会把trigger和其对应的JobStore封装到TriggerFiredBundle中,并把 TriggerFiredBundle 对象传给TriggerFiredResult,

    • h. shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); 创建线程对象构造执行对象,JobRunShell实现了Runnable

      • 把Trigger和JobDetail封装一下,生成执行任务shell(shellFactory获取的)
    //TriggerFiredResult-->TriggerFiredBundle-->(job, trigger, 一堆time)
    List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
    
    boolean goAhead = true;
    synchronized (sigLock) {
        goAhead = !halted.get();
    }
    if (goAhead) {
        try {
            // 通知jobStore开始执行了,根据trigger获取其对应的JobDetail, 封装成TriggerFiredResult
            List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
            if (res != null)
                bndles = res;
        } catch (SchedulerException se) {
            qs.notifySchedulerListenersError(
                    "An error occurred while firing triggers '" + triggers + "'", se);
            // QTZ-179 : a problem occurred interacting with
            // the triggers from the db
            // we release them and loop again
            for (int i = 0; i < triggers.size(); i++) {
                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
            }
            continue;
        }
    
    }
    
    for (int i = 0; i < bndles.size(); i++) {
        TriggerFiredResult result = bndles.get(i);
        TriggerFiredBundle bndle = result.getTriggerFiredBundle();
        Exception exception = result.getException();
    
        if (exception instanceof RuntimeException) {
            getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
            qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
            continue;
        }
    
        // it's possible to get 'null' if the triggers was
        // paused,
        // blocked, or other similar occurrences that
        // prevent it being
        // fired at this time... or if the scheduler was
        // shutdown (halted)
        if (bndle == null) {
            qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
            continue;
        }
    
        // 下面是开始执行任务
        JobRunShell shell = null;
        try {
            //构造执行对象,JobRunShell实现了Runnable
            shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
            //这个里面会用我们自定义的Job来new一个对象,并把相关执行Job是需要的数据传给JobExecutionContextImpl(这是我们自定义job的execute方法参数)
            shell.initialize(qs);
        } catch (SchedulerException se) {
            qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(),
                    CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
            continue;
        }
    
    • i. sRsrcs.getThreadPool().runInThread(shell); 提交线程池;
    // 这里是把任务放入到线程池中
    if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
        // this case should never happen, as it is
        // indicative of the
        // scheduler being shutdown or a bug in the
        // thread pool or
        // a thread pool being used concurrently - which
        // the docs
        // say not to do...
        getLog().error("ThreadPool.runInThread() return false!");
            //放到线程池失败后,通知jobStore完成
        qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(),
                CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
    }
    
    • j. 执行线程池
      JobRunShell 放到了线程池中,看SimpleThreadPool的runInThread(Runnable runnable)方法逻辑很简单,到可用线程集合中取一个线程执行任务并放到正在使用线程集合中,如果线程成被shutdown了则创建一个额外的线程,如下:
    private List<WorkerThread> workers;
    private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
    private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();
    
    public boolean runInThread(Runnable runnable) {
        if (runnable == null) {
            return false;
        }
        synchronized (nextRunnableLock) {
            handoffPending = true;
            // Wait until a worker thread is available
            while ((availWorkers.size() < 1) && !isShutdown) {
                try {
                    nextRunnableLock.wait(500);
                } catch (InterruptedException ignore) {
                }
            }
            if (!isShutdown) {
                //到可用线程集合里取出第一个
                WorkerThread wt = (WorkerThread) availWorkers.removeFirst();
                //放到该正在使用线程池中
                busyWorkers.add(wt);
                //运行任务
                wt.run(runnable);
            } else {
                // If the thread pool is going down, execute the Runnable
                // within a new additional worker thread (no thread from the
                // pool).
                //如果线程池关闭了则创建一个额外的线程
                WorkerThread wt = new WorkerThread(this, threadGroup, "WorkerThread-LastJob", prio,
                        isMakeThreadsDaemons(), runnable);
                busyWorkers.add(wt);
                workers.add(wt);
                wt.start();
            }
            nextRunnableLock.notifyAll();
            handoffPending = false;
        }
    
        return true;
    }
    

    执行任务的是WorkerThread线程,把任务传给WorkerThread,它会调用任务的run方法,这里即是调用JobRunShell的run方法, JobRunShell 的run方法最终就是调自定义Job的execute方法。

    //添加任务
    public void run(Runnable newRunnable) {
        synchronized (lock) {
            if (runnable != null) {
                throw new IllegalStateException("Already running a Runnable!");
            }
    
            runnable = newRunnable;
            lock.notifyAll();
        }
    }
    /**
     * <p>
     * Loop, executing targets as they are received.
     * </p>
     */
    @Override
    public void run() {
        boolean ran = false;
        while (run.get()) {
            try {
                synchronized (lock) {
                    while (runnable == null && run.get()) {
                        lock.wait(500);
                    }
    
                    if (runnable != null) {
                        ran = true;
                        //调用任务的run方法
                        runnable.run();
                    }
                }
            } 
            .....
        }
    }
    

    相关文章

      网友评论

        本文标题:04 Quartz-初识Quartz-QuartzSchedul

        本文链接:https://www.haomeiwen.com/subject/kagkzqtx.html