美文网首页
quartz源码3-调度处理

quartz源码3-调度处理

作者: modou1618 | 来源:发表于2019-01-11 22:04 被阅读0次

    一 类依赖结构

    类依赖结构.png

    二 调度线程

    • 初始化阶段等待其他流程初始化完成,进入调度处理
    synchronized (sigLock) {
        while (paused && !halted.get()) {
            try {
                    // wait until togglePause(false) is called...
                    sigLock.wait(1000L);
            } catch (InterruptedException ignore) {
            }
    
    // reset failure counter when paused, so that we don't
    // wait again after unpausing
        acquiresFailed = 0;
        }
    
        if (halted.get()) {
            break;
        }
    }
    
    • 数据库访问失败则睡眠后重试
    // wait a bit, if reading from job store is consistently
    // failing (e.g. DB is down or restarting)..
    if (acquiresFailed > 1) {
        try {
            long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
            Thread.sleep(delay);
        } catch (Exception ignore) {
        }
    }
    
    • Object.wait()等待直到job执行线程池有可用的线程
    int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
    
    public int blockForAvailableThreads() {
        synchronized(nextRunnableLock) {
    
            while((availWorkers.size() < 1 || handoffPending) && !isShutdown) {
                try {
                    nextRunnableLock.wait(500);
                } catch (InterruptedException ignore) {
                }
            }
    
            return availWorkers.size();
        }
    }
    
    • 清空调度插队的信息,调度处理过程中,可以被集群管理线程或误点管理线程或job管理接口通知调度任务插队
    public void clearSignaledSchedulingChange() {
        synchronized(sigLock) {
            signaled = false;
            signaledNextFireTime = 0;
        }
    }
    
    • 获取下一个处理周期里最早即将会被触发的一组trigger。
      now + idleWaitTime 下一个处理周期时间点,默认周期30s。
      Math.min(availThreadCount, qsRsrcs.getMaxBatchSize())表示一次最多会触发的trigger数量为可用线程数和配置数量中的较小值,默认配置批量处理数量为1。
      详情见 3.1 acquireNextTrigger()
    triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                    now + idleWaitTime, 
                    Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), 
                    qsRsrcs.getBatchTimeWindow());
    
    • 计算下次最早待调度job的触发时间
    now = System.currentTimeMillis();
    long triggerTime = triggers.get(0).getNextFireTime().getTime();
    long timeUntilTrigger = triggerTime - now;
    
    • 距离现在超过2ms,则进入等待状态,等待可能的插队job
    • 如果有更早待调度的插队job,清除之前创建的firetrigger,并恢复tirgger状态STATE_ACQUIRED->STATE_WAITING。
    while(timeUntilTrigger > 2) {
            synchronized (sigLock) {
                if (halted.get()) {
                    break;
                }
                if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
                    try {
                        // we could have blocked a long while
                        // on 'synchronize', so we must recompute
                        now = System.currentTimeMillis();
                        timeUntilTrigger = triggerTime - now;
                        if(timeUntilTrigger >= 1)
                            sigLock.wait(timeUntilTrigger);
                    } catch (InterruptedException ignore) {
                    }
                }
            }
            if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
                break;
            }
            now = System.currentTimeMillis();
            timeUntilTrigger = triggerTime - now;
        }
    
    • triggersFired() 更新sql中trigger状态为执行中,并获取调度所需相关信息,详情见3.2
    • 根据待调度的trigger信息,创建job执行容器
      JobRunShell shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
    • job执行线程池运行job执行容器
      qsRsrcs.getThreadPool().runInThread(shell)
    • 随机睡眠一个30s内的时间,等待下次调度或有插队job
    long now = System.currentTimeMillis();
    long waitTime = now + getRandomizedIdleWaitTime();
    long timeUntilContinue = waitTime - now;
    synchronized(sigLock) {
        try {
          if(!halted.get()) {
            // QTZ-336 A job might have been completed in the mean time and we might have
            // missed the scheduled changed signal by not waiting for the notify() yet
            // Check that before waiting for too long in case this very job needs to be
            // scheduled very soon
            if (!isScheduleChanged()) {
              sigLock.wait(timeUntilContinue);
            }
          }
        } catch (InterruptedException ignore) {
        }
    }
    

    三 一些处理函数

    3.1 acquireNextTrigger()

    获取下个窗口内的触发job

    • 构建sql语句,从triggers表中查询符合条件的trigger。
      条件为下次触发时间next_fire_time在start和end之间的trigger,且状态为等待调度STATE_WAITING,最多取maxCount个。
      start=当前时间-误点窗口(默认1min)
      end=当前时间+调度周期(默认30s)
     List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
    
    • 检验对应触发器trigger存在
    • 检验对应jobdetail存在
    • 非并发job,只添加最早的一个trigger
    • 更新符合条件的triggers状态,等待调度->即将调度
      STATE_WAITING->STATE_ACQUIRED
    • 插入STATE_ACQUIRED状态的fire_trigger

    3.2 triggerFired()

    • 再次获取trigger状态,并检查
    // Make sure trigger wasn't deleted, paused, or completed...
    // if trigger was deleted, state will be STATE_DELETED
    String state = getDelegate().selectTriggerState(conn,
            trigger.getKey());
    if (!state.equals(STATE_ACQUIRED)) {
        return null;
    }
    
    • 检查job状态
    • 更新fire_trigger状态为STATE_EXECUTING, 更新调度时间,下次调度时间
      getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);
    • 更新trigger的下次触发时间
    previousFireTime = nextFireTime;
    nextFireTime = getFireTimeAfter(nextFireTime);
    
    • 对禁止并发执行的job,更新job下的trigger状态,阻塞再次调度
      STATE_WAITING->STATE_BLOCKED
      STATE_ACQUIRED->STATE_BLOCKED
      STATE_PAUSED->STATE_PAUSED_BLOCKED
    • 更新trigger信息及状态
      若trigger无下次触发时间,则trigger状态会更新为COMPLETE,表示调度完成。
      若为禁止并发job,则更新为block
      若为可并发job,则更新为waiting
      若已配置pause group,则更新为STATE_PAUSED
    • 返回trigger信息,共调度使用

    相关文章

      网友评论

          本文标题:quartz源码3-调度处理

          本文链接:https://www.haomeiwen.com/subject/wixwrqtx.html