美文网首页
quartz源码3-调度处理

quartz源码3-调度处理

作者: modou1618 | 来源:发表于2019-01-11 22:04 被阅读0次

一 类依赖结构

类依赖结构.png

二 调度线程

  • 初始化阶段等待其他流程初始化完成,进入调度处理
synchronized (sigLock) {
    while (paused && !halted.get()) {
        try {
                // wait until togglePause(false) is called...
                sigLock.wait(1000L);
        } catch (InterruptedException ignore) {
        }

// reset failure counter when paused, so that we don't
// wait again after unpausing
    acquiresFailed = 0;
    }

    if (halted.get()) {
        break;
    }
}
  • 数据库访问失败则睡眠后重试
// wait a bit, if reading from job store is consistently
// failing (e.g. DB is down or restarting)..
if (acquiresFailed > 1) {
    try {
        long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
        Thread.sleep(delay);
    } catch (Exception ignore) {
    }
}
  • Object.wait()等待直到job执行线程池有可用的线程
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();

public int blockForAvailableThreads() {
    synchronized(nextRunnableLock) {

        while((availWorkers.size() < 1 || handoffPending) && !isShutdown) {
            try {
                nextRunnableLock.wait(500);
            } catch (InterruptedException ignore) {
            }
        }

        return availWorkers.size();
    }
}
  • 清空调度插队的信息,调度处理过程中,可以被集群管理线程或误点管理线程或job管理接口通知调度任务插队
public void clearSignaledSchedulingChange() {
    synchronized(sigLock) {
        signaled = false;
        signaledNextFireTime = 0;
    }
}
  • 获取下一个处理周期里最早即将会被触发的一组trigger。
    now + idleWaitTime 下一个处理周期时间点,默认周期30s。
    Math.min(availThreadCount, qsRsrcs.getMaxBatchSize())表示一次最多会触发的trigger数量为可用线程数和配置数量中的较小值,默认配置批量处理数量为1。
    详情见 3.1 acquireNextTrigger()
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                now + idleWaitTime, 
                Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), 
                qsRsrcs.getBatchTimeWindow());
  • 计算下次最早待调度job的触发时间
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
  • 距离现在超过2ms,则进入等待状态,等待可能的插队job
  • 如果有更早待调度的插队job,清除之前创建的firetrigger,并恢复tirgger状态STATE_ACQUIRED->STATE_WAITING。
while(timeUntilTrigger > 2) {
        synchronized (sigLock) {
            if (halted.get()) {
                break;
            }
            if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
                try {
                    // we could have blocked a long while
                    // on 'synchronize', so we must recompute
                    now = System.currentTimeMillis();
                    timeUntilTrigger = triggerTime - now;
                    if(timeUntilTrigger >= 1)
                        sigLock.wait(timeUntilTrigger);
                } catch (InterruptedException ignore) {
                }
            }
        }
        if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
            break;
        }
        now = System.currentTimeMillis();
        timeUntilTrigger = triggerTime - now;
    }
  • triggersFired() 更新sql中trigger状态为执行中,并获取调度所需相关信息,详情见3.2
  • 根据待调度的trigger信息,创建job执行容器
    JobRunShell shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
  • job执行线程池运行job执行容器
    qsRsrcs.getThreadPool().runInThread(shell)
  • 随机睡眠一个30s内的时间,等待下次调度或有插队job
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
    try {
      if(!halted.get()) {
        // QTZ-336 A job might have been completed in the mean time and we might have
        // missed the scheduled changed signal by not waiting for the notify() yet
        // Check that before waiting for too long in case this very job needs to be
        // scheduled very soon
        if (!isScheduleChanged()) {
          sigLock.wait(timeUntilContinue);
        }
      }
    } catch (InterruptedException ignore) {
    }
}

三 一些处理函数

3.1 acquireNextTrigger()

获取下个窗口内的触发job

  • 构建sql语句,从triggers表中查询符合条件的trigger。
    条件为下次触发时间next_fire_time在start和end之间的trigger,且状态为等待调度STATE_WAITING,最多取maxCount个。
    start=当前时间-误点窗口(默认1min)
    end=当前时间+调度周期(默认30s)
 List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
  • 检验对应触发器trigger存在
  • 检验对应jobdetail存在
  • 非并发job,只添加最早的一个trigger
  • 更新符合条件的triggers状态,等待调度->即将调度
    STATE_WAITING->STATE_ACQUIRED
  • 插入STATE_ACQUIRED状态的fire_trigger

3.2 triggerFired()

  • 再次获取trigger状态,并检查
// Make sure trigger wasn't deleted, paused, or completed...
// if trigger was deleted, state will be STATE_DELETED
String state = getDelegate().selectTriggerState(conn,
        trigger.getKey());
if (!state.equals(STATE_ACQUIRED)) {
    return null;
}
  • 检查job状态
  • 更新fire_trigger状态为STATE_EXECUTING, 更新调度时间,下次调度时间
    getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);
  • 更新trigger的下次触发时间
previousFireTime = nextFireTime;
nextFireTime = getFireTimeAfter(nextFireTime);
  • 对禁止并发执行的job,更新job下的trigger状态,阻塞再次调度
    STATE_WAITING->STATE_BLOCKED
    STATE_ACQUIRED->STATE_BLOCKED
    STATE_PAUSED->STATE_PAUSED_BLOCKED
  • 更新trigger信息及状态
    若trigger无下次触发时间,则trigger状态会更新为COMPLETE,表示调度完成。
    若为禁止并发job,则更新为block
    若为可并发job,则更新为waiting
    若已配置pause group,则更新为STATE_PAUSED
  • 返回trigger信息,共调度使用

相关文章

  • quartz源码3-调度处理

    一 类依赖结构 二 调度线程 初始化阶段等待其他流程初始化完成,进入调度处理 数据库访问失败则睡眠后重试 Obje...

  • Quartz 分布式解决方案

    本文要点 1.Quartz相关重要概念 2.如何实现分布式调度 3.核心源码的实现 1.Quartz相关重要概念 ...

  • Quartz框架

    Quartz Quartz是一个全功能,开源的任务调度服务。Quartz的核心概念:schedule任务调度,jo...

  • Quartz 调度器

    一、简介 Quartz是一个开源作业调度框架,框架的核心是调度器,调度器负责管理Quartz应用运行时环境,调度器...

  • Quartz 概览

    Quartz Enterprise Job Scheduler 什么是Quartz作业调度库? Quartz是一个...

  • Quartz.NET 作业调度(一):Test

    Quartz.NET 是一个开源的作业调度框架,是 Java 作业调度框架 Quartz 的.NET 版本,对于...

  • springboot整合quartz定时任务

    1. quartz的基本实现原理 ** Quartz 核心元素 ** Quartz任务调度的核心元素为: Sche...

  • 深入解读Quartz任务调度器

    深入解读Quartz任务调度器 1.Quartz简介 1.1.概要 Quartz是OpenSymphony提供的强...

  • SpringBoot 基于Quartz的任务调度

    一. 什么是 SpringBootQuartz ? Quartz 是一个任务调度工具包, 能够很自由的处理添加和维...

  • Spring 调度系统

    Spring 调度系统简介 一说到调度,Java 程序员马上就会想到 Quartz,但 Quartz 学习使用起来...

网友评论

      本文标题:quartz源码3-调度处理

      本文链接:https://www.haomeiwen.com/subject/wixwrqtx.html