QuartzSchedulerThread
调度器初始化后构造了核心调度线程,quartz整个作业获取执行均由此线程执行
调度过程:
1.等待线程池内的可用线程(此处如果线程池繁忙的话,会造成调度延迟或者失败)
2.查询本次可执行的触发器,通过now+idleWaitTime确定可以获取的Triggers的时间范围,获取的数量由Math.min(availThreadCount,qsRsrcs.getMaxBatchSize())最小值决定(此处可以通过配置优化,默认的batchSize是1)
3.告诉JobStore现在要触发作业了,并且更新了trigger的触发时间
4.循环触发此次作业,作业方法的调用是通过JobRunShell进行的,将构造出的每一个JobRunShell放入线程池等待调度
public void run(){
boolean lastAcquireFailed=false;
while(!halted.get()){
try{
//省略代码…检查调度器是否暂停或停止
//等待可用线程,此方法阻塞直到线程池有可用线程为止
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount>0){
//willalwaysbetrue,duetosemanticsofblockForAvailableThreads...
List<OperableTrigger> triggers=null;
Long now=System.currentTimeMillis();
clearSignaledSchedulingChange();
try{
//查询本次执行的触发器。
//参数(1)下一次间隔时间是now+idleWaitTime(此变量用来控制获取每一批触发器的间隔,设置过小导致频繁查询)
//参数(2)查询数量为 可用线程数或配置的最大批 两者最小
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now+idleWaitTime,Math.min(availThreadCount,qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow());
lastAcquireFailed = false;
}catch(JobPersistenceExceptionjpe){
if(!lastAcquireFailed){
qs.notifySchedulerListenersError(
"Anerroroccurredwhilescanningforthenexttriggerstofire.",
jpe);
}
lastAcquireFailed=true;
continue;
}catch(RuntimeExceptione){
if(!lastAcquireFailed){
getLog().error("quartzSchedulerThreadLoop:RuntimeException"
+e.getMessage(),e);
}
lastAcquireFailed=true;
continue;
}
if(triggers!=null&&!triggers.isEmpty()){
//省略代码…此处检查调度器是否到执行时间,与系统时间判断,只有时间差小于2毫秒才可继续执行
if(goAhead){
try{
//告诉JobStore现在要触发作业了,并且更新了trigger的触发时间
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res!=null)
bndles=res;
}catch(SchedulerExceptionse){
qs.notifySchedulerListenersError(
"Anerroroccurredwhilefiringtriggers'"
+triggers+"'",se);
//QTZ-179:aproblemoccurredinteractingwiththetriggersfromthedb
//wereleasethemandloopagain
for(inti=0;i<triggers.size();i++){
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}
}
//循环触发此次作业
for(inti=0;i<bndles.size();i++){
TriggerFiredResult result=bndles.get(i);
TriggerFiredBundle bndle=result.getTriggerFiredBundle();
Exception exception=result.getException();
if(exception instanceof RuntimeException){
getLog().error("RuntimeExceptionwhilefiringtrigger"+triggers.get(i),exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
//it's possible to get 'null' if the triggers was paused,
//blocked,or other similar occurrences that prevent it being
//fired at this time...o rif the scheduler was shutdown(halted)
if(bndle==null){
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
JobRunShell shell=null;
try{
//构造可执行作业,并初始化,真正执行作业是在这个类
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
}catch(SchedulerExceptionse){
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i),bndle.getJobDetail(),CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
//放入线程池中 执行作业!!
if(qsRsrcs.getThreadPool().runInThread(shell)==false){
//thiscaseshouldneverhappen,asitisindicativeofthe
//schedulerbeingshutdownorabuginthethreadpoolor
//athreadpoolbeingusedconcurrently-whichthedocs
//saynottodo...
getLog().error("ThreadPool.runInThread()returnfalse!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i),bndle.getJobDetail(),CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue;//while(!halted)
}
}else{//if(availThreadCount>0)
//shouldneverhappen,ifthreadPool.blockForAvailableThreads()followscontract
continue;//while(!halted)
}
longnow=System.currentTimeMillis();
longwaitTime=now+getRandomizedIdleWaitTime();
longtimeUntilContinue=waitTime-now;
synchronized(sigLock){
try{
if(!halted.get()){
//QTZ-336Ajobmighthavebeencompletedinthemeantimeandwemighthave
//missedthescheduledchangedsignalbynotwaitingforthenotify()yet
//Checkthatbeforewaitingfortoolongincasethisveryjobneedstobe
//scheduledverysoon
if(!isScheduleChanged()){
sigLock.wait(timeUntilContinue);
}
}
}catch(InterruptedExceptionignore){
}
}
}catch(RuntimeExceptionre){
getLog().error("Runtimeerroroccurredinmaintriggerfiringloop.",re);
}
}//while(!halted)
//dropreferencestoschedulerstufftoaidgarbagecollection...
qs=null;
qsRsrcs=null;
}
网友评论