大概内容
- scheduler.scheduleJob(jobDetail, trigger)
- scheduler.start()
scheduler.scheduleJob()
Scheduler使用StdScheduler
StdScheduler的方法基本上都代理给QuartzScheduler类来处理。
public class StdScheduler implements Scheduler {
private QuartzScheduler sched;
public StdScheduler(QuartzScheduler sched) {
this.sched = sched;
}
public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
throws SchedulerException {
return sched.scheduleJob(jobDetail, trigger);
}
public void start() throws SchedulerException {
sched.start();
}
/**
* 只有这个方法没有委托给QuartzScheduler
* 除了getClass(),其他方法在QuartzScheduler都可以拿到
*/
public SchedulerMetaData getMetaData() {
return new SchedulerMetaData(getSchedulerName(),
getSchedulerInstanceId(), getClass(), false, isStarted(),
isInStandbyMode(), isShutdown(), sched.runningSince(),
sched.numJobsExecuted(), sched.getJobStoreClass(),
sched.supportsPersistence(), sched.isClustered(), sched.getThreadPoolClass(),
sched.getThreadPoolSize(), sched.getVersion());
}
// 其他代码
}
QuartzScheduler
Quartz的小心脏,org.quartz.Scheduler接口的间接实现。
public class QuartzScheduler implements RemotableQuartzScheduler {
// QuartzSchedulerResources对象是通过构造器放进去的
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
throws SchedulerException {
this.resources = resources;
if (resources.getJobStore() instanceof JobListener) {
addInternalJobListener((JobListener)resources.getJobStore());
}
this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
schedThreadExecutor.execute(this.schedThread);
if (idleWaitTime > 0) {
this.schedThread.setIdleWaitTime(idleWaitTime);
}
jobMgr = new ExecutingJobsManager();
addInternalJobListener(jobMgr);
errLogger = new ErrorLogger();
addInternalSchedulerListener(errLogger);
signaler = new SchedulerSignalerImpl(this, this.schedThread);
getLog().info("Quartz Scheduler v." + getVersion() + " created.");
}
public Date scheduleJob(JobDetail jobDetail, Trigger trigger) throws SchedulerException {
validateState();
if (jobDetail == null) {
throw new SchedulerException("JobDetail cannot be null");
}
if (trigger == null) {
throw new SchedulerException("Trigger cannot be null");
}
if (jobDetail.getKey() == null) {
throw new SchedulerException("Job's key cannot be null");
}
if (jobDetail.getJobClass() == null) {
throw new SchedulerException("Job's class cannot be null");
}
// TriggerBuilder.build()会生成一个OperableTrigger实例。
OperableTrigger trig = (OperableTrigger)trigger;
if (trigger.getJobKey() == null) {
trig.setJobKey(jobDetail.getKey());
} else if (!trigger.getJobKey().equals(jobDetail.getKey())) {
throw new SchedulerException(
"Trigger does not reference given job!");
}
trig.validate();
Calendar cal = null;
if (trigger.getCalendarName() != null) {
cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
}
// TODO: 解析各种类型的Trigger
Date ft = trig.computeFirstFireTime(cal);
if (ft == null) {
throw new SchedulerException(
"Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");
}
// 关键代码就是下面这一行
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
notifySchedulerListenersJobAdded(jobDetail);
notifySchedulerThread(trigger.getNextFireTime().getTime());
notifySchedulerListenersSchduled(trigger);
return ft;
}
// 其他代码
}
RAMJobStore
介绍一下RAMJobStore的属性
- HashMap对象:也是通过空间来换取查询时间的策略,把JobDetail和Trigger的信息放进这些HashMap对象中,方便程序可以根据key或者group来匹配相关的JobDetail和Trigger。
- TreeSet<TriggerWrapper> timeTriggers:利用TreeSet排重和有序的特性,timeTriggers.first()方法总能返回最先要处理的Trigger。
class RAMJobStore {
#HashMap<JobKey,JobWrapper> jobsByKey
#HashMap<TriggerKey,TriggerWrapper> triggersByKey
#HashMap<String,HashMap<JobKey,JobWrapper>> jobsByGroup
#HashMap<String,HashMap<TriggerKey,TriggerWrapper>> triggersByGroup
#TreeSet<TriggerWrapper> timeTriggers
#HashMap<String,Calendar> calendarsByName
#Map<JobKey,List<TriggerWrapper>> triggersByJob
#Object lock
#HashSet<String> pausedTriggerGroups
#HashSet<String> pausedJobGroups
#HashSet<JobKey> blockedJobs
#long misfireThreshold
#SchedulerSignaler signaler
-Logger log
-{static}AtomicLong ftrCtr
}
下面是RAMJobStore.storeJob()代码解析,storeTrigger()方法的逻辑类似。
/**
* 内部类JobWrapper是一个包括jobKey和jobDetail的类。
* 克隆一个新的JobDetail来创建一个JobWrapper,然后维护到jobsByKey和jobsByGroup属性中。
* 维护HashMap系列对象的时候,通过lock的synchronized代码块来做线程同步
*/
public void storeJob(JobDetail newJob,boolean replaceExisting) throws ObjectAlreadyExistsException {
JobWrapper jw = new JobWrapper((JobDetail)newJob.clone());
boolean repl = false;
synchronized (lock) {
if (jobsByKey.get(jw.key) != null) {
if (!replaceExisting) {
throw new ObjectAlreadyExistsException(newJob);
}
repl = true;
}
if (!repl) {
// get job group
HashMap<JobKey, JobWrapper> grpMap = jobsByGroup.get(newJob.getKey().getGroup());
if (grpMap == null) {
grpMap = new HashMap<JobKey, JobWrapper>(100);
jobsByGroup.put(newJob.getKey().getGroup(), grpMap);
}
// add to jobs by group
grpMap.put(newJob.getKey(), jw);
// add to jobs by FQN map
jobsByKey.put(jw.key, jw);
} else {
// update job detail
JobWrapper orig = jobsByKey.get(jw.key);
orig.jobDetail = jw.jobDetail; // already cloned
}
}
}
scheduler.start()
QuartzScheduler
案例用的是RAMJobStore,其中的schedulerStarted()和schedulerResumed()是空方法,没有代码立即。对于JobStoreSupport,这两个方法是有很多逻辑的,后面的篇章再做解析。
public class QuartzScheduler implements RemotableQuartzScheduler {
public void start() throws SchedulerException {
if (shuttingDown|| closed) {
throw new SchedulerException(
"The Scheduler cannot be restarted after shutdown() has been called.");
}
// QTZ-212 : calling new schedulerStarting() method on the listeners
// right after entering start()
notifySchedulerListenersStarting();
if (initialStart == null) {//初始化标识为null,进行初始化操作
initialStart = new Date();
// RAMJobStore 啥都不做
// JobStoreSupport 判断是否集群,恢复Job等
this.resources.getJobStore().schedulerStarted();
startPlugins();
} else {
resources.getJobStore().schedulerResumed();// 如果已经初始化过,则恢复jobStore
}
schedThread.togglePause(false);// 唤醒所有等待的线程
getLog().info("Scheduler " + resources.getUniqueIdentifier() + " started.");
notifySchedulerListenersStarted();
}
// 其他代码
}
QuartzSchedulerThread
public class QuartzSchedulerThread extends Thread {
/**
* pause为true,发出让主循环暂停的信号,以便线程在下一个可处理的时刻暂停
* pause为false,唤醒sigLock对象的所有等待队列的线程
*/
void togglePause(boolean pause) {
synchronized (sigLock) {
paused = pause;
if (paused) {
signalSchedulingChange(0);
} else {
sigLock.notifyAll();
}
}
}
// 其他代码
}
Listener事件监听
Listener事件监听是观察者模式的一个应用。
QuartzScheduler的scheduleJob()start()方法都有notifyXXX代码逻辑,这些就是JobDetail、Trigger和Scheduler事件监听的代码逻辑。
在《Scheduler的初始化》篇章里面,初始化一个Scheduler,里面有"根据PropertiesParser创建Listeners"的步骤,Listeners就包括JobListener和TriggerListener的List对象。
SchedulerListener不支持配置在quartz.properties里面,初始化Scheduler的过程中没有这一块的代码逻辑。如果要添加一个观察者,那么可以通过StdScheduler.getListenerManager()获取ListenerManager实例,通过它可以拿到所有观察者的引用。
类图
Quartz Listener类图角色说明
类名 | 角色 |
---|---|
QuartzScheduler | Subject |
SchedulerListener | Observer |
JobListener | Observer |
TriggerListener | Observer |
代码示例
Subject通知Observer,都是遍历Observer列表,触发相应的通知,实现事件监听的效果。
这里特别说明一下,获取Listeners集合的时候,是通过新建一个不可改变的集合对象来实现。如果是为了避免多线程的读写问题,这和CopyOnWriteList写时复制的做法相反,而且这里读的场景大于写的场景。况且,ListenerManagerImpl的add()方法都做了代码块的synchronized。新建一个不可改变的集合来返回,这么做的目的没有想明白。
public void notifySchedulerListenersJobAdded(JobDetail jobDetail) {
// build a list of all scheduler listeners that are to be notified...
List<SchedulerListener> schedListeners = buildSchedulerListenerList();
// notify all scheduler listeners
for(SchedulerListener sl: schedListeners) {
try {
sl.jobAdded(jobDetail);
} catch (Exception e) {
getLog().error(
"Error while notifying SchedulerListener of JobAdded.",
e);
}
}
}
ListenerManagerImpl
public class ListenerManagerImpl implements ListenerManager {
// 其他代码
public List<SchedulerListener> getSchedulerListeners() {
synchronized (schedulerListeners) {
return java.util.Collections.unmodifiableList(new ArrayList<SchedulerListener>(schedulerListeners));
}
}
}
网友评论