一 类依赖结构
类依赖结构.png二 调度线程
- 初始化阶段等待其他流程初始化完成,进入调度处理
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
// reset failure counter when paused, so that we don't
// wait again after unpausing
acquiresFailed = 0;
}
if (halted.get()) {
break;
}
}
- 数据库访问失败则睡眠后重试
// wait a bit, if reading from job store is consistently
// failing (e.g. DB is down or restarting)..
if (acquiresFailed > 1) {
try {
long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
Thread.sleep(delay);
} catch (Exception ignore) {
}
}
-
Object.wait()
等待直到job执行线程池有可用的线程
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
public int blockForAvailableThreads() {
synchronized(nextRunnableLock) {
while((availWorkers.size() < 1 || handoffPending) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}
return availWorkers.size();
}
}
- 清空调度插队的信息,调度处理过程中,可以被集群管理线程或误点管理线程或job管理接口通知调度任务插队
public void clearSignaledSchedulingChange() {
synchronized(sigLock) {
signaled = false;
signaledNextFireTime = 0;
}
}
- 获取下一个处理周期里最早即将会被触发的一组trigger。
now + idleWaitTime
下一个处理周期时间点,默认周期30s。
Math.min(availThreadCount, qsRsrcs.getMaxBatchSize())
表示一次最多会触发的trigger数量为可用线程数和配置数量中的较小值,默认配置批量处理数量为1。
详情见 3.1 acquireNextTrigger()
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime,
Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()),
qsRsrcs.getBatchTimeWindow());
- 计算下次最早待调度job的触发时间
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
- 距离现在超过2ms,则进入等待状态,等待可能的插队job
- 如果有更早待调度的插队job,清除之前创建的firetrigger,并恢复tirgger状态STATE_ACQUIRED->STATE_WAITING。
while(timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
- triggersFired() 更新sql中trigger状态为执行中,并获取调度所需相关信息,详情见3.2
- 根据待调度的trigger信息,创建job执行容器
JobRunShell shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
- job执行线程池运行job执行容器
qsRsrcs.getThreadPool().runInThread(shell)
- 随机睡眠一个30s内的时间,等待下次调度或有插队job
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}
三 一些处理函数
3.1 acquireNextTrigger()
获取下个窗口内的触发job
- 构建sql语句,从triggers表中查询符合条件的trigger。
条件为下次触发时间next_fire_time在start和end之间的trigger,且状态为等待调度STATE_WAITING,最多取maxCount个。
start=当前时间-误点窗口(默认1min)
end=当前时间+调度周期(默认30s)
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
- 检验对应触发器trigger存在
- 检验对应jobdetail存在
- 非并发job,只添加最早的一个trigger
- 更新符合条件的triggers状态,等待调度->即将调度
STATE_WAITING->STATE_ACQUIRED - 插入STATE_ACQUIRED状态的fire_trigger
3.2 triggerFired()
- 再次获取trigger状态,并检查
// Make sure trigger wasn't deleted, paused, or completed...
// if trigger was deleted, state will be STATE_DELETED
String state = getDelegate().selectTriggerState(conn,
trigger.getKey());
if (!state.equals(STATE_ACQUIRED)) {
return null;
}
- 检查job状态
- 更新fire_trigger状态为STATE_EXECUTING, 更新调度时间,下次调度时间
getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);
- 更新trigger的下次触发时间
previousFireTime = nextFireTime;
nextFireTime = getFireTimeAfter(nextFireTime);
- 对禁止并发执行的job,更新job下的trigger状态,阻塞再次调度
STATE_WAITING->STATE_BLOCKED
STATE_ACQUIRED->STATE_BLOCKED
STATE_PAUSED->STATE_PAUSED_BLOCKED - 更新trigger信息及状态
若trigger无下次触发时间,则trigger状态会更新为COMPLETE,表示调度完成。
若为禁止并发job,则更新为block
若为可并发job,则更新为waiting
若已配置pause group,则更新为STATE_PAUSED - 返回trigger信息,共调度使用
网友评论