美文网首页
调度执行--quartz核心线程类

调度执行--quartz核心线程类

作者: 丁钰铭 | 来源:发表于2019-02-25 20:12 被阅读0次

    QuartzSchedulerThread

    调度器初始化后构造了核心调度线程,quartz整个作业获取执行均由此线程执行

    调度过程:
    1.等待线程池内的可用线程(此处如果线程池繁忙的话,会造成调度延迟或者失败)
    2.查询本次可执行的触发器,通过now+idleWaitTime确定可以获取的Triggers的时间范围,获取的数量由Math.min(availThreadCount,qsRsrcs.getMaxBatchSize())最小值决定(此处可以通过配置优化,默认的batchSize是1)
    3.告诉JobStore现在要触发作业了,并且更新了trigger的触发时间
    4.循环触发此次作业,作业方法的调用是通过JobRunShell进行的,将构造出的每一个JobRunShell放入线程池等待调度

    public void run(){
        boolean lastAcquireFailed=false;
    
        while(!halted.get()){
            try{
                     //省略代码…检查调度器是否暂停或停止
            //等待可用线程,此方法阻塞直到线程池有可用线程为止
            int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
            if(availThreadCount>0){
                //willalwaysbetrue,duetosemanticsofblockForAvailableThreads...
            
                List<OperableTrigger> triggers=null;
                
                Long now=System.currentTimeMillis();
                
                clearSignaledSchedulingChange();
                
                try{
                    //查询本次执行的触发器。
                    //参数(1)下一次间隔时间是now+idleWaitTime(此变量用来控制获取每一批触发器的间隔,设置过小导致频繁查询)
                    //参数(2)查询数量为 可用线程数或配置的最大批 两者最小
                    triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                    now+idleWaitTime,Math.min(availThreadCount,qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow());
                    lastAcquireFailed = false;
                }catch(JobPersistenceExceptionjpe){
                    if(!lastAcquireFailed){
                        qs.notifySchedulerListenersError(
                        "Anerroroccurredwhilescanningforthenexttriggerstofire.",
                        jpe);
                    }
                    lastAcquireFailed=true;
                    continue;
                    }catch(RuntimeExceptione){
                    if(!lastAcquireFailed){
                        getLog().error("quartzSchedulerThreadLoop:RuntimeException"
                        +e.getMessage(),e);
                    }
                    lastAcquireFailed=true;
                    continue;
                }
        
            if(triggers!=null&&!triggers.isEmpty()){
            
        //省略代码…此处检查调度器是否到执行时间,与系统时间判断,只有时间差小于2毫秒才可继续执行
            if(goAhead){
            try{
                //告诉JobStore现在要触发作业了,并且更新了trigger的触发时间
                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                if(res!=null)
                    bndles=res;
            }catch(SchedulerExceptionse){
                qs.notifySchedulerListenersError(
                "Anerroroccurredwhilefiringtriggers'"
                +triggers+"'",se);
                //QTZ-179:aproblemoccurredinteractingwiththetriggersfromthedb
                //wereleasethemandloopagain
                for(inti=0;i<triggers.size();i++){
                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                }
                continue;
            }
            
            }
        
            //循环触发此次作业
            for(inti=0;i<bndles.size();i++){
                TriggerFiredResult result=bndles.get(i);
                TriggerFiredBundle bndle=result.getTriggerFiredBundle();
                Exception exception=result.getException();
                
                if(exception instanceof RuntimeException){
                    getLog().error("RuntimeExceptionwhilefiringtrigger"+triggers.get(i),exception);
                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                    continue;
                }
            
                //it's possible to get 'null' if the triggers was paused,
                //blocked,or other similar occurrences that prevent it being
                //fired at this time...o rif the scheduler was shutdown(halted)
                if(bndle==null){
                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                    continue;
                }
            
                JobRunShell shell=null;
                try{
                    //构造可执行作业,并初始化,真正执行作业是在这个类
                    shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                    shell.initialize(qs);
                }catch(SchedulerExceptionse){
                    qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i),bndle.getJobDetail(),CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                    continue;
                }
                //放入线程池中 执行作业!!
                if(qsRsrcs.getThreadPool().runInThread(shell)==false){
                    //thiscaseshouldneverhappen,asitisindicativeofthe
                    //schedulerbeingshutdownorabuginthethreadpoolor
                    //athreadpoolbeingusedconcurrently-whichthedocs
                    //saynottodo...
                    getLog().error("ThreadPool.runInThread()returnfalse!");
                    qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i),bndle.getJobDetail(),CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                }
        
            }
            
            continue;//while(!halted)
            }
        }else{//if(availThreadCount>0)
        //shouldneverhappen,ifthreadPool.blockForAvailableThreads()followscontract
        continue;//while(!halted)
        }
        
        longnow=System.currentTimeMillis();
        longwaitTime=now+getRandomizedIdleWaitTime();
        longtimeUntilContinue=waitTime-now;
        synchronized(sigLock){
        try{
        if(!halted.get()){
        //QTZ-336Ajobmighthavebeencompletedinthemeantimeandwemighthave
        //missedthescheduledchangedsignalbynotwaitingforthenotify()yet
        //Checkthatbeforewaitingfortoolongincasethisveryjobneedstobe
        //scheduledverysoon
        if(!isScheduleChanged()){
        sigLock.wait(timeUntilContinue);
        }
        }
        }catch(InterruptedExceptionignore){
        }
        }
        
        }catch(RuntimeExceptionre){
        getLog().error("Runtimeerroroccurredinmaintriggerfiringloop.",re);
        }
        }//while(!halted)
        
        //dropreferencestoschedulerstufftoaidgarbagecollection...
        qs=null;
        qsRsrcs=null;
    }
    

    相关文章

      网友评论

          本文标题:调度执行--quartz核心线程类

          本文链接:https://www.haomeiwen.com/subject/zhknyqtx.html