一 StdSchedulerFactory.getScheduler
1.1 配置获取
key | 说明 |
---|---|
org.quartz.scheduler.instanceName | 实例名称 |
org.quartz.scheduler.threadName | 线程名称 |
org.quartz.scheduler.instanceId | 集群实例id生成方式 |
org.quartz.scheduler.rmi.* | rmi配置 |
org.quartz.scheduler.jmx.* | jmx配置 |
org.quartz.jobStore.* | 持久化配置 |
org.quartz.threadPool.* | 线程池配置 |
org.quartz.scheduler.jobFactory.* | job工厂配置 |
other | 其他调度配置 |
1.2 实例id生成器InstanceIdGenerator
类 | 方式 |
---|---|
SimpleInstanceIdGenerator | return InetAddress.getLocalHost().getHostName() + System.currentTimeMillis(); |
HostnameInstanceIdGenerator | return InetAddress.getLocalHost().getHostName(); |
SystemPropertyInstanceIdGenerator | prepend前置值+系统配置+postpend后置值 |
1.3 job工厂JobFactory
1.3.1 SimpleJobFactory
- 目标类反射实例化job
jobClass.newInstance();
1.3.2 PropertySettingJobFactory
- 继承自SimpleJobFactory
- 获取持久化数据
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.putAll(scheduler.getContext());
jobDataMap.putAll(bundle.getJobDetail().getJobDataMap());
jobDataMap.putAll(bundle.getTrigger().getJobDataMap());
- 使用持久化数据设置job属性
setBeanProps(job, jobDataMap);
1.4 线程池ThreadPool
- count初始化池大小
- workers 初始化线程池=availWorkers+busyWorkers
- availWorkers可用的线程池
- busyWorkers正在执行job的线程池
private int count = -1;
private List<WorkerThread> workers;
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();
1.5 持久化JobStore
1.5.1 RAMJobStore
- 内存存储数据
1.5.2 JobStoreCMT
- 非事务管理db持久化
1.5.3 JobStoreTX
- 事务管理db持久化,一般使用本类型
-
ClassLoadHelper loadHelper
线程类加载器 -
SchedulerSignaler signaler
通知调度线程,误点或集群管理线程或job管理接口插入调度任务
1.5.4 访问锁Semaphore
1.5.4.1 SimpleSemaphore
- java对象锁
ThreadLocal<HashSet<String>> lockOwners = new ThreadLocal<HashSet<String>>();
HashSet<String> locks = new HashSet<String>();
1.5.4.3 DBSemaphore
- sql命令锁
quartz_locks表作为锁表, select ... for update 加锁
1.5.5 db连接池ConnectionProvider
支持用户配置连接池,也可使用quartz默认的连接池
1.5.6 db连接池代理DBConnectionManager
代理多个连接池,根据池名称区分
1.6 线程执行器ThreadExecutor
JobStore中对应线程执行的线程池
- ClusterManager集群管理线程执行线程池
- MisfireHandler误点管理线程执行线程池
- QuartzSchedulerThread任务调度线程执行线程池
1.7 插件SchedulerPlugin
1.8 JobListener和TriggerListener
- 名称注入
Method nameSetter = listener.getClass().getMethod("setName", strArg);
nameSetter.invoke(listener, new Object[] {jobListenerNames[i] } );
- 属性注入
setBeanProps(listener, lp);
1.9 JobRunShellFactory
- job运行容器工厂,job被调度时创建job执行容器
1.10 QuartzSchedulerResources
- 调度相关资源统合
1.11 QuartzScheduler
- 调度逻辑管理
- listener初始化
- 调度线程初始化
-
ExecutingJobsManager
执行job数据统计 -
SchedulerSignalerImpl
调度线程通知
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
throws SchedulerException {
this.resources = resources;
if (resources.getJobStore() instanceof JobListener) {
addInternalJobListener((JobListener)resources.getJobStore());
}
this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
schedThreadExecutor.execute(this.schedThread);
if (idleWaitTime > 0) {
this.schedThread.setIdleWaitTime(idleWaitTime);
}
jobMgr = new ExecutingJobsManager();
addInternalJobListener(jobMgr);
errLogger = new ErrorLogger();
addInternalSchedulerListener(errLogger);
signaler = new SchedulerSignalerImpl(this, this.schedThread);
getLog().info("Quartz Scheduler v." + getVersion() + " created.");
}
1.12 StdScheduler
- 调度对外管理接口, 代理QuartzScheduler
1.13 添加引用,避免gc清除对象
// prevents the repository from being garbage collected
qs.addNoGCObject(schedRep);
// prevents the db manager from being garbage collected
if (dbMgr != null) {
qs.addNoGCObject(dbMgr);
}
二 调度线程初始化
-
private AtomicBoolean halted;
用于优雅停止线程 - 线程启动时,
private boolean paused;
表示暂停线程执行,等待其他流程初始化完成。 -
this.resources.getJobStore().schedulerStarted();
启动集群管理线程和误点管理线程。
public void start() throws SchedulerException {
if (shuttingDown|| closed) {
throw new SchedulerException(
"The Scheduler cannot be restarted after shutdown() has been called.");
}
// QTZ-212 : calling new schedulerStarting() method on the listeners
// right after entering start()
notifySchedulerListenersStarting();
if (initialStart == null) {
initialStart = new Date();
this.resources.getJobStore().schedulerStarted();
startPlugins();
} else {
resources.getJobStore().schedulerResumed();
}
schedThread.togglePause(false);
getLog().info(
"Scheduler " + resources.getUniqueIdentifier() + " started.");
notifySchedulerListenersStarted();
}
-
private final Object sigLock = new Object();
通知调度线程继续执行,可能原因有终止线程,调度任务变更,启动线程。 - 通知调度线程有变更的调度任务需要插入流程流程中
private boolean signaled;
private long signaledNextFireTime;
- 线程启动后,一开始会循环等待
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
// reset failure counter when paused, so that we don't
// wait again after unpausing
acquiresFailed = 0;
}
if (halted.get()) {
break;
}
}
三 集群管理线程初始化
-
初始化时检查一次集群节点是否超时,并进行处理
-
节点超时计算
集群节点超时检查处理流程.png
节点上次注册时间+max(节点注册周期,当前时间-节点上次注册时间)+7500ms < 当前时间
-
启动线程,默认7.5s检查一次集群节点状态
-
发现集群节点下线,则通知调度线程可能有调度任务插入。
public void run() {
while (!shutdown) {
if (!shutdown) {
long timeToSleep = getClusterCheckinInterval();
long transpiredTime = (System.currentTimeMillis() - lastCheckin);
timeToSleep = timeToSleep - transpiredTime;
if (timeToSleep <= 0) {
timeToSleep = 100L;
}
if(numFails > 0) {
timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
}
try {
Thread.sleep(timeToSleep);
} catch (Exception ignore) {
}
}
if (!shutdown && this.manage()) {
signalSchedulingChangeImmediately(0L);
}
}//while !shutdown
}
四 误点管理线程初始化
-
误点处理流程
误点管理线程.png - 1分钟误点窗口周期调用误点处理函数
public void run() {
while (!shutdown) {
long sTime = System.currentTimeMillis();
RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage();
if (recoverMisfiredJobsResult.getProcessedMisfiredTriggerCount() > 0) {
signalSchedulingChangeImmediately(recoverMisfiredJobsResult.getEarliestNewTime());
}
if (!shutdown) {
long timeToSleep = 50l; // At least a short pause to help balance threads
if (!recoverMisfiredJobsResult.hasMoreMisfiredTriggers()) {
timeToSleep = getMisfireThreshold() - (System.currentTimeMillis() - sTime);
if (timeToSleep <= 0) {
timeToSleep = 50l;
}
if(numFails > 0) {
timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
}
}
try {
Thread.sleep(timeToSleep);
} catch (Exception ignore) {
}
}//while !shutdown
}
}
网友评论