美文网首页
quartz源码2-初始化

quartz源码2-初始化

作者: modou1618 | 来源:发表于2019-01-11 10:46 被阅读0次

一 StdSchedulerFactory.getScheduler

1.1 配置获取

key 说明
org.quartz.scheduler.instanceName 实例名称
org.quartz.scheduler.threadName 线程名称
org.quartz.scheduler.instanceId 集群实例id生成方式
org.quartz.scheduler.rmi.* rmi配置
org.quartz.scheduler.jmx.* jmx配置
org.quartz.jobStore.* 持久化配置
org.quartz.threadPool.* 线程池配置
org.quartz.scheduler.jobFactory.* job工厂配置
other 其他调度配置

1.2 实例id生成器InstanceIdGenerator

方式
SimpleInstanceIdGenerator return InetAddress.getLocalHost().getHostName() + System.currentTimeMillis();
HostnameInstanceIdGenerator return InetAddress.getLocalHost().getHostName();
SystemPropertyInstanceIdGenerator prepend前置值+系统配置+postpend后置值

1.3 job工厂JobFactory

1.3.1 SimpleJobFactory

  • 目标类反射实例化job
    jobClass.newInstance();

1.3.2 PropertySettingJobFactory

  • 继承自SimpleJobFactory
  • 获取持久化数据
JobDataMap jobDataMap = new JobDataMap();
        jobDataMap.putAll(scheduler.getContext());
        jobDataMap.putAll(bundle.getJobDetail().getJobDataMap());
        jobDataMap.putAll(bundle.getTrigger().getJobDataMap());
  • 使用持久化数据设置job属性
    setBeanProps(job, jobDataMap);

1.4 线程池ThreadPool

  • count初始化池大小
  • workers 初始化线程池=availWorkers+busyWorkers
  • availWorkers可用的线程池
  • busyWorkers正在执行job的线程池
    private int count = -1;
    private List<WorkerThread> workers;
    private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
    private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();

1.5 持久化JobStore

1.5.1 RAMJobStore

  • 内存存储数据

1.5.2 JobStoreCMT

  • 非事务管理db持久化

1.5.3 JobStoreTX

  • 事务管理db持久化,一般使用本类型
  • ClassLoadHelper loadHelper 线程类加载器
  • SchedulerSignaler signaler 通知调度线程,误点或集群管理线程或job管理接口插入调度任务

1.5.4 访问锁Semaphore

1.5.4.1 SimpleSemaphore

  • java对象锁
ThreadLocal<HashSet<String>> lockOwners = new ThreadLocal<HashSet<String>>();
HashSet<String> locks = new HashSet<String>();

1.5.4.3 DBSemaphore

  • sql命令锁
    quartz_locks表作为锁表, select ... for update 加锁

1.5.5 db连接池ConnectionProvider

支持用户配置连接池,也可使用quartz默认的连接池

1.5.6 db连接池代理DBConnectionManager

代理多个连接池,根据池名称区分

1.6 线程执行器ThreadExecutor

JobStore中对应线程执行的线程池

  • ClusterManager集群管理线程执行线程池
  • MisfireHandler误点管理线程执行线程池
  • QuartzSchedulerThread任务调度线程执行线程池

1.7 插件SchedulerPlugin

1.8 JobListener和TriggerListener

  • 名称注入
Method  nameSetter = listener.getClass().getMethod("setName", strArg);
nameSetter.invoke(listener, new Object[] {jobListenerNames[i] } );
  • 属性注入
    setBeanProps(listener, lp);

1.9 JobRunShellFactory

  • job运行容器工厂,job被调度时创建job执行容器

1.10 QuartzSchedulerResources

  • 调度相关资源统合

1.11 QuartzScheduler

  • 调度逻辑管理
  • listener初始化
  • 调度线程初始化
  • ExecutingJobsManager 执行job数据统计
  • SchedulerSignalerImpl 调度线程通知
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
        throws SchedulerException {
        this.resources = resources;
        if (resources.getJobStore() instanceof JobListener) {
            addInternalJobListener((JobListener)resources.getJobStore());
        }

        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        schedThreadExecutor.execute(this.schedThread);
        if (idleWaitTime > 0) {
            this.schedThread.setIdleWaitTime(idleWaitTime);
        }

        jobMgr = new ExecutingJobsManager();
        addInternalJobListener(jobMgr);
        errLogger = new ErrorLogger();
        addInternalSchedulerListener(errLogger);

        signaler = new SchedulerSignalerImpl(this, this.schedThread);
        
        getLog().info("Quartz Scheduler v." + getVersion() + " created.");
    }

1.12 StdScheduler

  • 调度对外管理接口, 代理QuartzScheduler

1.13 添加引用,避免gc清除对象

   // prevents the repository from being garbage collected
   qs.addNoGCObject(schedRep);
   // prevents the db manager from being garbage collected
   if (dbMgr != null) {
        qs.addNoGCObject(dbMgr);
   }

二 调度线程初始化

  • private AtomicBoolean halted;用于优雅停止线程
  • 线程启动时,private boolean paused; 表示暂停线程执行,等待其他流程初始化完成。
  • this.resources.getJobStore().schedulerStarted();启动集群管理线程和误点管理线程。
public void start() throws SchedulerException {

        if (shuttingDown|| closed) {
            throw new SchedulerException(
                    "The Scheduler cannot be restarted after shutdown() has been called.");
        }

        // QTZ-212 : calling new schedulerStarting() method on the listeners
        // right after entering start()
        notifySchedulerListenersStarting();

        if (initialStart == null) {
            initialStart = new Date();
            this.resources.getJobStore().schedulerStarted();            
            startPlugins();
        } else {
            resources.getJobStore().schedulerResumed();
        }

        schedThread.togglePause(false);

        getLog().info(
                "Scheduler " + resources.getUniqueIdentifier() + " started.");
        
        notifySchedulerListenersStarted();
    }
  • private final Object sigLock = new Object(); 通知调度线程继续执行,可能原因有终止线程,调度任务变更,启动线程。
  • 通知调度线程有变更的调度任务需要插入流程流程中
private boolean signaled;
private long signaledNextFireTime;
  • 线程启动后,一开始会循环等待
// check if we're supposed to pause...
                synchronized (sigLock) {
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }

                        // reset failure counter when paused, so that we don't
                        // wait again after unpausing
                        acquiresFailed = 0;
                    }

                    if (halted.get()) {
                        break;
                    }
                }

三 集群管理线程初始化

  • 初始化时检查一次集群节点是否超时,并进行处理

  • 节点超时计算
    节点上次注册时间+max(节点注册周期,当前时间-节点上次注册时间)+7500ms < 当前时间

    集群节点超时检查处理流程.png
  • 启动线程,默认7.5s检查一次集群节点状态

  • 发现集群节点下线,则通知调度线程可能有调度任务插入。

public void run() {
            while (!shutdown) {

                if (!shutdown) {
                    long timeToSleep = getClusterCheckinInterval();
                    long transpiredTime = (System.currentTimeMillis() - lastCheckin);
                    timeToSleep = timeToSleep - transpiredTime;
                    if (timeToSleep <= 0) {
                        timeToSleep = 100L;
                    }

                    if(numFails > 0) {
                        timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
                    }
                    
                    try {
                        Thread.sleep(timeToSleep);
                    } catch (Exception ignore) {
                    }
                }

                if (!shutdown && this.manage()) {
                    signalSchedulingChangeImmediately(0L);
                }

            }//while !shutdown
        }

四 误点管理线程初始化

  • 误点处理流程


    误点管理线程.png
  • 1分钟误点窗口周期调用误点处理函数
public void run() {
            
            while (!shutdown) {

                long sTime = System.currentTimeMillis();

                RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage();

                if (recoverMisfiredJobsResult.getProcessedMisfiredTriggerCount() > 0) {
                    signalSchedulingChangeImmediately(recoverMisfiredJobsResult.getEarliestNewTime());
                }

                if (!shutdown) {
                    long timeToSleep = 50l;  // At least a short pause to help balance threads
                    if (!recoverMisfiredJobsResult.hasMoreMisfiredTriggers()) {
                        timeToSleep = getMisfireThreshold() - (System.currentTimeMillis() - sTime);
                        if (timeToSleep <= 0) {
                            timeToSleep = 50l;
                        }

                        if(numFails > 0) {
                            timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
                        }
                    }
                    
                    try {
                        Thread.sleep(timeToSleep);
                    } catch (Exception ignore) {
                    }
                }//while !shutdown
            }
        }

相关文章

网友评论

      本文标题:quartz源码2-初始化

      本文链接:https://www.haomeiwen.com/subject/wpytdqtx.html