美文网首页
调度执行--quartz核心线程类

调度执行--quartz核心线程类

作者: 丁钰铭 | 来源:发表于2019-02-25 20:12 被阅读0次

QuartzSchedulerThread

调度器初始化后构造了核心调度线程,quartz整个作业获取执行均由此线程执行

调度过程:
1.等待线程池内的可用线程(此处如果线程池繁忙的话,会造成调度延迟或者失败)
2.查询本次可执行的触发器,通过now+idleWaitTime确定可以获取的Triggers的时间范围,获取的数量由Math.min(availThreadCount,qsRsrcs.getMaxBatchSize())最小值决定(此处可以通过配置优化,默认的batchSize是1)
3.告诉JobStore现在要触发作业了,并且更新了trigger的触发时间
4.循环触发此次作业,作业方法的调用是通过JobRunShell进行的,将构造出的每一个JobRunShell放入线程池等待调度

public void run(){
    boolean lastAcquireFailed=false;

    while(!halted.get()){
        try{
                 //省略代码…检查调度器是否暂停或停止
        //等待可用线程,此方法阻塞直到线程池有可用线程为止
        int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
        if(availThreadCount>0){
            //willalwaysbetrue,duetosemanticsofblockForAvailableThreads...
        
            List<OperableTrigger> triggers=null;
            
            Long now=System.currentTimeMillis();
            
            clearSignaledSchedulingChange();
            
            try{
                //查询本次执行的触发器。
                //参数(1)下一次间隔时间是now+idleWaitTime(此变量用来控制获取每一批触发器的间隔,设置过小导致频繁查询)
                //参数(2)查询数量为 可用线程数或配置的最大批 两者最小
                triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                now+idleWaitTime,Math.min(availThreadCount,qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow());
                lastAcquireFailed = false;
            }catch(JobPersistenceExceptionjpe){
                if(!lastAcquireFailed){
                    qs.notifySchedulerListenersError(
                    "Anerroroccurredwhilescanningforthenexttriggerstofire.",
                    jpe);
                }
                lastAcquireFailed=true;
                continue;
                }catch(RuntimeExceptione){
                if(!lastAcquireFailed){
                    getLog().error("quartzSchedulerThreadLoop:RuntimeException"
                    +e.getMessage(),e);
                }
                lastAcquireFailed=true;
                continue;
            }
    
        if(triggers!=null&&!triggers.isEmpty()){
        
    //省略代码…此处检查调度器是否到执行时间,与系统时间判断,只有时间差小于2毫秒才可继续执行
        if(goAhead){
        try{
            //告诉JobStore现在要触发作业了,并且更新了trigger的触发时间
            List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
            if(res!=null)
                bndles=res;
        }catch(SchedulerExceptionse){
            qs.notifySchedulerListenersError(
            "Anerroroccurredwhilefiringtriggers'"
            +triggers+"'",se);
            //QTZ-179:aproblemoccurredinteractingwiththetriggersfromthedb
            //wereleasethemandloopagain
            for(inti=0;i<triggers.size();i++){
            qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
            }
            continue;
        }
        
        }
    
        //循环触发此次作业
        for(inti=0;i<bndles.size();i++){
            TriggerFiredResult result=bndles.get(i);
            TriggerFiredBundle bndle=result.getTriggerFiredBundle();
            Exception exception=result.getException();
            
            if(exception instanceof RuntimeException){
                getLog().error("RuntimeExceptionwhilefiringtrigger"+triggers.get(i),exception);
                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                continue;
            }
        
            //it's possible to get 'null' if the triggers was paused,
            //blocked,or other similar occurrences that prevent it being
            //fired at this time...o rif the scheduler was shutdown(halted)
            if(bndle==null){
                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                continue;
            }
        
            JobRunShell shell=null;
            try{
                //构造可执行作业,并初始化,真正执行作业是在这个类
                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                shell.initialize(qs);
            }catch(SchedulerExceptionse){
                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i),bndle.getJobDetail(),CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                continue;
            }
            //放入线程池中 执行作业!!
            if(qsRsrcs.getThreadPool().runInThread(shell)==false){
                //thiscaseshouldneverhappen,asitisindicativeofthe
                //schedulerbeingshutdownorabuginthethreadpoolor
                //athreadpoolbeingusedconcurrently-whichthedocs
                //saynottodo...
                getLog().error("ThreadPool.runInThread()returnfalse!");
                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i),bndle.getJobDetail(),CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
            }
    
        }
        
        continue;//while(!halted)
        }
    }else{//if(availThreadCount>0)
    //shouldneverhappen,ifthreadPool.blockForAvailableThreads()followscontract
    continue;//while(!halted)
    }
    
    longnow=System.currentTimeMillis();
    longwaitTime=now+getRandomizedIdleWaitTime();
    longtimeUntilContinue=waitTime-now;
    synchronized(sigLock){
    try{
    if(!halted.get()){
    //QTZ-336Ajobmighthavebeencompletedinthemeantimeandwemighthave
    //missedthescheduledchangedsignalbynotwaitingforthenotify()yet
    //Checkthatbeforewaitingfortoolongincasethisveryjobneedstobe
    //scheduledverysoon
    if(!isScheduleChanged()){
    sigLock.wait(timeUntilContinue);
    }
    }
    }catch(InterruptedExceptionignore){
    }
    }
    
    }catch(RuntimeExceptionre){
    getLog().error("Runtimeerroroccurredinmaintriggerfiringloop.",re);
    }
    }//while(!halted)
    
    //dropreferencestoschedulerstufftoaidgarbagecollection...
    qs=null;
    qsRsrcs=null;
}

相关文章

  • 调度执行--quartz核心线程类

    QuartzSchedulerThread 调度器初始化后构造了核心调度线程,quartz整个作业获取执行均由此线...

  • 2019-03-25——Java并发 Executor框架 Th

    ThreadPoolExecutor是Java线程池的核心类,以内部线程池的形式对外提供管理任务执行,线程调度,线...

  • Quartz 定时任务框架详解

    1.Quartz 体系结构 Quartz 设计有四个核心类,分别是Scheduler(调度器)、Job(任务) 、...

  • springboot整合quartz定时任务

    1. quartz的基本实现原理 ** Quartz 核心元素 ** Quartz任务调度的核心元素为: Sche...

  • Quartz框架

    Quartz Quartz是一个全功能,开源的任务调度服务。Quartz的核心概念:schedule任务调度,jo...

  • Spring Boot整合Quartz

    一、Quartz简介   Quartz是一个开源作业调度框架,其核心概念包括: Job:表示一个工作,要执行的具体...

  • Quartz 调度器

    一、简介 Quartz是一个开源作业调度框架,框架的核心是调度器,调度器负责管理Quartz应用运行时环境,调度器...

  • Quartz初始化源码跟踪(1)

    Quartz调用示例: 可以看到,Quartz的核心调度器是通过工厂创建的。SchedulerFactory两种实...

  • Quartz 源码解析(一) —— 基本介绍

    Quartz是什么 Quartz提供了一些Scheduler(调度策略),以便我们管理和执行Job(任务)。 官网...

  • java虚拟机读书笔记之线程调度

    java线程调度 线程调度主要有两种方式,协同式线程调度和抢占式线程调度。1、协同式: 线程的执行时间由线程本身...

网友评论

      本文标题:调度执行--quartz核心线程类

      本文链接:https://www.haomeiwen.com/subject/zhknyqtx.html