我在03 Quartz-初识Quartz-Scheduler初始化
篇中解说了getScheduler() 的instantiate(),简单的介绍了Scheduler的整个初始化过程,接下来一起了解下之后的流程;
Scheduler使用
getScheduler
public Scheduler getScheduler() throws SchedulerException {
if (cfg == null) {
initialize();
}
//从缓存中查询获取Schedule任务,任务名称从配置中获取,若无指定,则默认指定QuartzScheduler
SchedulerRepository schedRep = SchedulerRepository.getInstance();
Scheduler sched = schedRep.lookup(getSchedulerName());
//判断若存在且已停止运行,则从缓存中移除
if (sched != null) {
if (sched.isShutdown()) {
schedRep.remove(getSchedulerName());
} else {
return sched;
}
}
sched = instantiate();
return sched;
}
start()
之前在03 Quartz-初识Quartz-Scheduler初始化
解说Scheduler 其实就是交互API,里面还有一个QuartzScheduler的实例变量,调用start()方法真正调用的是QuartzScheduler的start()方法。
public void start() throws SchedulerException {
sched.start();
}
QuartzScheduler的start()方法。
-
进入start方法,判断是否已关闭或者停止;
-
进行Scheduler Starting Listener通知 (就是轮询所以的监听器进行逐一通知)
-
this.resources.getJobStore().schedulerStarted(); 因为使用内存模型:其实就是个空实现 什么都没做。
-
startPlugins(); 如有则逐一调用配置插件的start()方法;
-
togglePause:进行信号通知
-
notifySchedulerListenersStarted();
public void start() throws SchedulerException {
if (shuttingDown|| closed) { throw ( "... 省略异常处理过程;"); }
// QTZ-212 : calling new schedulerStarting() method on the listeners
// right after entering start()
notifySchedulerListenersStarting();
//如果不为空则进行恢复逻辑
if (initialStart == null) {
initialStart = new Date();
this.resources.getJobStore().schedulerStarted();
startPlugins();
} else {
resources.getJobStore().schedulerResumed();
}
schedThread.togglePause(false);
getLog().info(
"Scheduler " + resources.getUniqueIdentifier() + " started.");
notifySchedulerListenersStarted();
}
1.1.3 togglePause()
看下这个方法schedThread.togglePause(false);属于QuartzSchedulerThread 这个方法其实全局就两处进行调用,参数决定是否进行暂停切换。初始化这职位false; 其他只有关闭等逻辑才进行设置true
-
设置paused = false;
-
其次就是调用了sigLock.notifyAll(); ? 它是在唤醒谁呢?谁想要唤醒?其实这个我们在03 Quartz-初识Quartz-Scheduler初始化篇幅中解说QuartzScheduler的初始化逻辑的时候就已经介绍了。
/**
* <p>
* Signals the main processing loop to pause at the next possible point.
* </p>
*/
void togglePause(boolean pause) {
synchronized (sigLock) {
paused = pause;
if (paused) {
signalSchedulingChange(0);
} else {
sigLock.notifyAll();
}
}
}
- 我们查看全局当中只有一处使用了sigLock.wait(); 显而易见就是这,就是当前类的run() 方法,不要忘记我们真正的核心是QuartzScheduler, QuartzScheduler维护整个Quartz的所有运行时实例对象。QuartzSchedulerThread是Quartz的主线程对象。
1.1.4 run()
The main processing loop of the <code>QuartzSchedulerThread</code>.
正如上面所说:
run()方法比较长,只能逐一解说了。
- a. 首先是声明一个int变量acquiresFailed=0;一般来说为0,只有获取下一个触发器发生异常才会自增。
int acquiresFailed = 0;//a
- b. 其次就是一个while循环 也就是run的主体逻辑,循环判断接收一个 AtomicBoolean 作为是否暂停的判断;
-
c. 循环体的第一个就是一个同步块(里面也是一个循环体):如果是初始化则主线程就是在同步块进行等待,等其他获得sigLock的线程发起信号通知。如果有其他线程发起激活信号(sigLock.notifyAll())传递就继续向下执行。
-
d. 如果第一次获取失败则进行获取访问此jobStore时反复失败的等待时间(以毫秒为单位).RAM模式为20;然后暂停等待。
-
e. 判断线程池是否有空闲线程
**qsRsrcs.getThreadPool().blockForAvailableThreads(); **获取可用线程数量;然后清除调度改变的信号。
//获取可用线程个数
public int blockForAvailableThreads() {
synchronized (nextRunnableLock) {
while ((availWorkers.size() < 1 || handoffPending) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}
//返回可用线程集合的大小
return availWorkers.size();
}
}
- f. 从jobStore中获取下次被触发的触发器(是根据活跃线程数和最大批量大小(默认为1)的两者最小值作为本次获取触发容器的大小:所以此处我们可根据自身服务器的特性进行优化设置:当检查某个Trigger应该触发时,默认每次只Acquire一个Trigger,(为什么要有Acquire的过程呢?是为了防止多线程访问的情况下,同一个Trigger被不同的线程多次触发)。尤其是使用JDBC JobStore时,一次Acquire就是一个update语句,尽可能一次性的多获取几个Trigger,一起触发,当定时器数量非常大的时候,这是个非常有效的优化。当定时器数量比较少时,触发不是极为频繁时,这个优化的意义就不大了。)。qsRsrcs.getJobStore().acquireNextTriggers (获取最近需要执行的任务列表(30s内的将要执行的任务)。然后根据执行时间进行排序,然后计算出需要wait()的时间。当调度时间来临,欢迎主线程,将任务交给一个线程池进行执行。)该方法还是比较重要的,里面会进行一些判断逻辑applyMisfire 然后进行设置的策略对应处理。
idleWaitTime:
在调度程序处于空闲状态时,调度程序查询的触发器可用之前等待的时间量(以毫秒为单位),默认是30秒;30秒内没有需要执行的任务,则等待一个随机时间。getRandomizedIdleWaitTime产生一个30秒内随机等待时间。
可通过配置属性org.quartz.scheduler.idleWaitTime设置。
batchTriggerAcquisitionMaxCount:
允许调度程序节点一次获取(用于触发)的触发器的最大数量,默认是1;
可通过org.quartz.scheduler.batchTriggerAcquisitionMaxCount改写。
batchTriggerAcquisitionFireAheadTimeWindow:
时间窗口调节参数
允许触发器在其预定的火灾时间之前被获取和触发的时间(毫秒)的时间量,默认是0;
可通过org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow改写
- 取到triggers之后会判断第一条是否到执行时,如果没有会一直循环等待,等待的过程中会根据信号判断是否有必要释放当前触发器重新调度;在QuartzSchedulerThread主线程中,首先会取出最近30秒内将要执行的任务,然后等待executeTime-now时间,然后在等待唤醒时交给线程池处理。
- 为什么triggers集合的第一个就是最早需要被执行的?因为这个跟JobStore中存放triggers的数据结构有关,它是用TreeSet存放的;
if (availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers = null;
long now = System.currentTimeMillis();
//清除调度改变的信号
clearSignaledSchedulingChange();
try {
//到JobStore中获取下次被触发的触发器
triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime,
Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
lastAcquireFailed = false;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if (!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.", jpe);
}
lastAcquireFailed = true;
continue;
} catch (RuntimeException e) {
if (!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException " + e.getMessage(), e);
}
lastAcquireFailed = true;
continue;
}
if (triggers != null && !triggers.isEmpty()) {
now = System.currentTimeMillis();
//这里为什么triggers的第一个对象就是最早需要被执行的?
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
//如果第一条下次触发时间大于当前时间则进入等待
while (timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if (timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
//等待的过程中看看有没有收到调度信号
if (releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
// this happens if releaseIfScheduleChangedSignificantly
// decided to release triggers
//这个可能在前面等待的时候被清理掉了
if (triggers.isEmpty())
continue;
-
g. qsRsrcs.getJobStore().triggersFired(triggers);JobStore中的triggersFired(triggers);方法会把trigger和其对应的JobStore封装到TriggerFiredBundle中,并把 TriggerFiredBundle 对象传给TriggerFiredResult,
-
h. shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); 创建线程对象构造执行对象,JobRunShell实现了Runnable
- 把Trigger和JobDetail封装一下,生成执行任务shell(shellFactory获取的)
//TriggerFiredResult-->TriggerFiredBundle-->(job, trigger, 一堆time)
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized (sigLock) {
goAhead = !halted.get();
}
if (goAhead) {
try {
// 通知jobStore开始执行了,根据trigger获取其对应的JobDetail, 封装成TriggerFiredResult
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if (res != null)
bndles = res;
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occurred while firing triggers '" + triggers + "'", se);
// QTZ-179 : a problem occurred interacting with
// the triggers from the db
// we release them and loop again
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}
}
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// it's possible to get 'null' if the triggers was
// paused,
// blocked, or other similar occurrences that
// prevent it being
// fired at this time... or if the scheduler was
// shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// 下面是开始执行任务
JobRunShell shell = null;
try {
//构造执行对象,JobRunShell实现了Runnable
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
//这个里面会用我们自定义的Job来new一个对象,并把相关执行Job是需要的数据传给JobExecutionContextImpl(这是我们自定义job的execute方法参数)
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(),
CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
- i. sRsrcs.getThreadPool().runInThread(shell); 提交线程池;
// 这里是把任务放入到线程池中
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is
// indicative of the
// scheduler being shutdown or a bug in the
// thread pool or
// a thread pool being used concurrently - which
// the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
//放到线程池失败后,通知jobStore完成
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(),
CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
- j. 执行线程池
JobRunShell 放到了线程池中,看SimpleThreadPool的runInThread(Runnable runnable)方法逻辑很简单,到可用线程集合中取一个线程执行任务并放到正在使用线程集合中,如果线程成被shutdown了则创建一个额外的线程,如下:
private List<WorkerThread> workers;
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();
public boolean runInThread(Runnable runnable) {
if (runnable == null) {
return false;
}
synchronized (nextRunnableLock) {
handoffPending = true;
// Wait until a worker thread is available
while ((availWorkers.size() < 1) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}
if (!isShutdown) {
//到可用线程集合里取出第一个
WorkerThread wt = (WorkerThread) availWorkers.removeFirst();
//放到该正在使用线程池中
busyWorkers.add(wt);
//运行任务
wt.run(runnable);
} else {
// If the thread pool is going down, execute the Runnable
// within a new additional worker thread (no thread from the
// pool).
//如果线程池关闭了则创建一个额外的线程
WorkerThread wt = new WorkerThread(this, threadGroup, "WorkerThread-LastJob", prio,
isMakeThreadsDaemons(), runnable);
busyWorkers.add(wt);
workers.add(wt);
wt.start();
}
nextRunnableLock.notifyAll();
handoffPending = false;
}
return true;
}
执行任务的是WorkerThread线程,把任务传给WorkerThread,它会调用任务的run方法,这里即是调用JobRunShell的run方法, JobRunShell 的run方法最终就是调自定义Job的execute方法。
//添加任务
public void run(Runnable newRunnable) {
synchronized (lock) {
if (runnable != null) {
throw new IllegalStateException("Already running a Runnable!");
}
runnable = newRunnable;
lock.notifyAll();
}
}
/**
* <p>
* Loop, executing targets as they are received.
* </p>
*/
@Override
public void run() {
boolean ran = false;
while (run.get()) {
try {
synchronized (lock) {
while (runnable == null && run.get()) {
lock.wait(500);
}
if (runnable != null) {
ran = true;
//调用任务的run方法
runnable.run();
}
}
}
.....
}
}
网友评论