前言
Quartz是Java实现的定时器框架,该文章分析Quartz执行原理,没有涉及用法。调试中使用到多线程调试,可以使用下面设置,将Suspend设置为Thread。另外想了解框架源码时,建议先看文档看资料。框架实现有个简单的思路。顺着你的思路,带着问题去源码里验证。针对一个问题多debug几遍。阅读源码时,建议自上而下的去看。知道这个方法的作用是什么,具体的实现细节后面在看。

下面例子使用SimpleTrigger
、RAMJobStore
。
public static void main(String[] args) {
JobDetail job = newJob(HelloJob.class)
.withIdentity("job1", "group1")
.build();
Trigger trigger = newTrigger()
.withIdentity("trigger1", "group1")
.startNow()
.withSchedule(simpleSchedule().withIntervalInMinutes(2).repeatForever())
.build();
StdSchedulerFactory factory = new StdSchedulerFactory();
try {
Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(job, trigger);
scheduler.start();
} catch (SchedulerException e) {
e.printStackTrace();
}
}
执行原理
Quartz
中两个核心类QuartzSchedulerThread
和SimpleThreadPool
。前者是调度线程,主要是获取要执行的任务,将任务传递给后者。后者通过内部维护的工作线程,接受任务并执行。
QuartzSchedulerThread
该类也是Thread
子类,重写了run()
。在QuartzScheduler
中被创建并启动。
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
throws SchedulerException {
this.resources = resources;
if (resources.getJobStore() instanceof JobListener) {
addInternalJobListener((JobListener)resources.getJobStore());
}
//创建调度线程,并启动线程
this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
schedThreadExecutor.execute(this.schedThread);
if (idleWaitTime > 0) {
this.schedThread.setIdleWaitTime(idleWaitTime);
}
jobMgr = new ExecutingJobsManager();
addInternalJobListener(jobMgr);
errLogger = new ErrorLogger();
addInternalSchedulerListener(errLogger);
signaler = new SchedulerSignalerImpl(this, this.schedThread);
getLog().info("Quartz Scheduler v." + getVersion() + " created.");
}
QuartzSchedulerThread
的run()
如下:
1.线程启动时,paused默认为true,halted默认是false,进入循环1;
2.QuartzScheduler
调用start()
时,委托给QuartzSchedulerThread
的togglePause(false)
。该方法将paused设置为false,并唤醒sigLock对象上的监视锁。QuartzSchedulerThread
跳出循环1;
3.调用RAMJobStore
的acquireNextTriggers()
,获取30s内要执行的trigger的List
。
4.如果List
为空,线程进入等待状态,时间最多30s。如果List
不为空,并且fire时间和现在时间差大于2ms,线程进入循环2,直到时间差小于1ms,线程跳出循环2.
5.调用RAMJobStore
的triggersFired()
。遍历入参中的trigger,先根据trigger的key查询triggersByKey中的trigger2,然后计算trigger的nextFireTime,再计算trigger2的nextFireTime。最后trigger返回,trigger2放入treeset中。
6.调用JobRunShell
的initialize()
,将job、trigger放入JobRunShell
的属性JobExecutionContextImpl
中。任务的实质就是JobRunShell
;
7.调用SimpleThreadPool
的runInThread()
,将任务传递给线程池中的WorkerThread
。工作线程执行任务。
public void run() {
int acquiresFailed = 0;
while (!halted.get()) {
try {
//循环1,是线程初始状态
//paused默认是true,halted默认是false
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
acquiresFailed = 0;
}
if (halted.get()) {
break;
}
}
//线程池中可用的工作线程数
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers;
long now = System.currentTimeMillis();
try {
//RAMJobStore中TreeSet中获取30s内要执行的trigger
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
} catch (JobPersistenceException jpe) {
continue;
} catch (RuntimeException e) {
continue;
}
if (triggers != null && !triggers.isEmpty()) {
//即将执行的trigger和现在时间的时间差
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
//循环2,如果trigger大于2ms
while(timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
//当时间差小于1ms,跳出循环
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
// set triggers to 'executing'
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
//triggers中trigger的fire时间,封装成job和trigger绑定的对象
//从triggersByKey中取出trigger,并计算trigger下一次fire时间,时间不为空时放入treeset中
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null) bndles = res;
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occurred while firing triggers '"
+ triggers + "'", se);
//QTZ-179 : a problem occurred interacting with the triggers from the db
//we release them and loop again
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}
}
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
JobRunShell shell = null;
try {
//创建任务shell,JobRunShell实现Runnable
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
//初始化任务,
//生成包含job、scheduler、trigger的JobExecutionContextImpl
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
//将任务传递给SimpleThreadPool
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {}
}
continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}
//如果没有可执行的trigger,线程最多等待30s
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {}
}
} catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
} // while (!halted)
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
WorkerThread
该类是SimpleThreadPool
的内部类。是Thread
的子类,run()
如下:
1.线程启动,默认执行循环1,如果任务为空就执行循环2;
2.有任务时,线程跳出循环2,执行任务;
3.线程恢复原状,方便下次使用。
public void run() {
boolean ran = false;
//循环1
//run是AtomicBoolean,默认是true
while (run.get()) {
try {
synchronized(lock) {
//循环2,任务为空
while (runnable == null && run.get()) {
lock.wait(500);
}
if (runnable != null) {
ran = true;
//执行任务
runnable.run();
}
}
} catch (InterruptedException unblock) {
} catch (Throwable exceptionInRunnable) {
} finally {
//任务执行完,工作线程恢复原状,方便下次使用
synchronized(lock) {
runnable = null;
}
// repair the thread in case the runnable mucked it up...
if(getPriority() != tp.getThreadPriority()) {
setPriority(tp.getThreadPriority());
}
if (runOnce) {
run.set(false);
clearFromBusyWorkersList(this);
} else if(ran) {
ran = false;
makeAvailable(this);
}
}
}
//if (log.isDebugEnabled())
try {
getLog().debug("WorkerThread is shut down.");
} catch(Exception e) {
// ignore to help with a tomcat glitch
}
}
JobRunShell
该类实现Runnable
,是调度线程传递给工作线程的任务。该类的run()
如下:
1.获取job,执行job;
2.重新计算trigger。
public void run() {
qs.addInternalSchedulerListener(this);
try {
OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
JobDetail jobDetail = jec.getJobDetail();
do {
JobExecutionException jobExEx = null;
//获取job
Job job = jec.getJobInstance();
try {
begin();
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't begin execution.", se);
break;
}
long startTime = System.currentTimeMillis();
long endTime = startTime;
// execute the job
try {
log.debug("Calling execute on job " + jobDetail.getKey());
//执行任务
job.execute(jec);
endTime = System.currentTimeMillis();
} catch (JobExecutionException jee) {
endTime = System.currentTimeMillis();
jobExEx = jee;
getLog().info("Job " + jobDetail.getKey() +
" threw a JobExecutionException: ", jobExEx);
} catch (Throwable e) {
endTime = System.currentTimeMillis();
getLog().error("Job " + jobDetail.getKey() +
" threw an unhandled Exception: ", e);
SchedulerException se = new SchedulerException(
"Job threw an unhandled exception.", e);
qs.notifySchedulerListenersError("Job ("
+ jec.getJobDetail().getKey()
+ " threw an exception.", se);
jobExEx = new JobExecutionException(se, false);
}
jec.setJobRunTime(endTime - startTime);
// notify all job listeners
if (!notifyJobListenersComplete(jec, jobExEx)) {
break;
}
CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP;
// update the trigger
try {
instCode = trigger.executionComplete(jec, jobExEx);
} catch (Exception e) {
// If this happens, there's a bug in the trigger...
SchedulerException se = new SchedulerException(
"Trigger threw an unhandled exception.", e);
qs.notifySchedulerListenersError(
"Please report this error to the Quartz developers.",
se);
}
// notify all trigger listeners
if (!notifyTriggerListenersComplete(jec, instCode)) {
break;
}
// update job/trigger or re-execute job
if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) {
jec.incrementRefireCount();
try {
complete(false);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't finalize execution.", se);
}
continue;
}
try {
complete(true);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't finalize execution.", se);
continue;
}
//更新trigger
qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
break;
} while (true);
} finally {
qs.removeInternalSchedulerListener(this);
}
}
网友评论