我们在01 Quartz-初识Quartz-样例入手
篇中写了获取scheduler的代码如下:
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
scheduler = schedulerFactory.getScheduler();
SchedulerFacotory是一个接口,它有两个实现:
-
StdSchedulerFacotory根据配置文件来创建Scheduler;
-
DirectSchedulerFactory 主要通过编码对Scheduler控制;
考虑代码的侵入性程序、编写能力我们会采用StdSchedulerFacotory类型来创建StdScheduler,使用配置文件quartz.properties里面的配置都对应到这个StdSchedulerFactory中,所以对某个配置不明白已经该配置的默认值可以看StdSchedulerFactory中获取配置的代码。
一 Scheduler初始化
我们看下这个schedulerFactory.getScheduler()
方法(默认使用实现StdSchedulerFactory):
1.1 方法1:getScheduler()
public Scheduler getScheduler() throws SchedulerException {
//第一步:加载配置文件,System的properties覆盖前面的配置
if (cfg == null) {
initialize();
}
//第二步:先进行缓存查找,找到且不是关闭状态则返回,如不则进行第三步
SchedulerRepository schedRep = SchedulerRepository.getInstance();
Scheduler sched = schedRep.lookup(getSchedulerName());
//判断若存在且已停止运行,则从缓存中移除
if (sched != null) {
if (sched.isShutdown()) {
schedRep.remove(getSchedulerName());
} else {
return sched;
}
}
//第三步:如果第二步没有找到,则初始化scheduler 返回一个新的。
sched = instantiate();
return sched;
}
1.2 方法2:instantiate()
不知道作者怎么考虑的 一个方法写一千来行也是醉了:
private Scheduler instantiate() throws SchedulerException {
//作者这个initialize()方法为什么调两遍 instantiate() 方法全局就getScheduler()了
if (cfg == null) {
initialize();
}
if (initException != null) {
throw initException;
}
JobStore js = null;
ThreadPool tp = null;
QuartzScheduler qs = null;
DBConnectionManager dbMgr = null;
String instanceIdGeneratorClass = null;
Properties tProps = null;
String userTXLocation = null;
boolean wrapJobInTx = false;
boolean autoId = false;
long idleWaitTime = -1;
long dbFailureRetry = 15000L; // 15 secs
String classLoadHelperClass;
String jobFactoryClass;
ThreadExecutor threadExecutor;
SchedulerRepository schedRep = SchedulerRepository.getInstance();
...
// add plugins
for (int i = 0; i < plugins.length; i++) {
rsrcs.addSchedulerPlugin(plugins[I]);
}
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
qsInited = true;
// Create Scheduler ref...
Scheduler scheduler = instantiate(rsrcs, qs);
// set job factory if specified
if(jobFactory != null) {
qs.setJobFactory(jobFactory);
}
// Initialize plugins now that we have a Scheduler instance.
for (int i = 0; i < plugins.length; i++) {
plugins[i].initialize(pluginNames[i], scheduler, loadHelper);
}
// add listeners
for (int i = 0; i < jobListeners.length; i++) {
qs.getListenerManager().addJobListener(jobListeners[i], EverythingMatcher.allJobs());
}
for (int i = 0; i < triggerListeners.length; i++) {
qs.getListenerManager().addTriggerListener(triggerListeners[i], EverythingMatcher.allTriggers());
}
// set scheduler context data...
for(Object key: schedCtxtProps.keySet()) {
String val = schedCtxtProps.getProperty((String) key);
scheduler.getContext().put((String)key, val);
}
// fire up job store, and runshell factory
js.setInstanceId(schedInstId);
js.setInstanceName(schedName);
js.setThreadPoolSize(tp.getPoolSize());
js.initialize(loadHelper, qs.getSchedulerSignaler());
jrsf.initialize(scheduler);
qs.initialize();
getLog().info(
"Quartz scheduler '" + scheduler.getSchedulerName()
+ "' initialized from " + propSrc);
getLog().info("Quartz scheduler version: " + qs.getVersion());
// prevents the repository from being garbage collected
qs.addNoGCObject(schedRep);
// prevents the db manager from being garbage collected
if (dbMgr != null) {
qs.addNoGCObject(dbMgr);
}
schedRep.bind(scheduler);
return scheduler;
}
... 省略异常处理过程
}
1.3 主要就是做了一些对象的初始化操作:
...
- Set up any DataSources
- Set up any SchedulerPlugins
- Set up any JobListeners
- Set up any TriggerListeners
- Get ThreadExecutor Properties
- add plugins
- Create Scheduler ref...
- set job factory if specified
- Initialize plugins now that we have a Scheduler instance.
- add listeners
- set scheduler context data
...
我将会选取一些比较重要的方法参看一下: 其实基本涵盖了所有的方法。
1.3.1 初始化ClassLoadHelper
Create class load helper ClassLoadHelper主要提供之后通过反射来创建实例。
ClassLoadHelper loadHelper = null;
try {
loadHelper = (ClassLoadHelper) loadClass(classLoadHelperClass)
.newInstance();
} catch (Exception e) { ... 省略异常处理过程}
loadHelper.initialize();
1.3.2 初始化JobFactory
JobFactory 用于创建Job实例,我们在02 Quartz-初识Quartz-组件解释
中解说Job并发的时候说明了Job是原型模式不是单例,其实在这里我们可以得到应征
JobFactory jobFactory = null;
if(jobFactoryClass != null) {
try {
jobFactory = (JobFactory) loadHelper.loadClass(jobFactoryClass)
.newInstance();
} catch (Exception e) { ... 省略异常处理过程}
tProps = cfg.getPropertyGroup(PROP_SCHED_JOB_FACTORY_PREFIX, true);
try {
setBeanProps(jobFactory, tProps);
} catch (Exception e) { ... 省略异常处理过程}
}
下文是的JobFactory代码实现,每次都是创建一个新的Job。
public class SimpleJobFactory implements JobFactory {
private final Logger log = LoggerFactory.getLogger(getClass());
protected Logger getLog() {
return log;
}
public Job newJob(TriggerFiredBundle bundle, Scheduler Scheduler) throws SchedulerException {
JobDetail jobDetail = bundle.getJobDetail();
Class<? extends Job> jobClass = jobDetail.getJobClass();
try {
if(log.isDebugEnabled()) {
log.debug(
"Producing instance of Job '" + jobDetail.getKey() +
"', class=" + jobClass.getName());
}
return jobClass.newInstance();
} catch (Exception e) {... 省略异常处理过程}
}
}
1.3.3 初始化InstanceIdGenerator
loadHelper.loadClass(instanceIdGeneratorClass);主要是在集群模式下负责为 Scheduler 节点生成集群范围的唯一*实例ID。默认实现其实就是主机地址加上系统时间;
public class SimpleInstanceIdGenerator implements InstanceIdGenerator {
public String generateInstanceId() throws SchedulerException {
try {
return InetAddress.getLocalHost().getHostName() + System.currentTimeMillis();
} catch (Exception e) {
throw new SchedulerException("Couldn't get host name!", e);
}
}
}
1.3.4 初始化线程池
Get ThreadPool Properties
-
通过配置文件获取插件信息(PROP_THREAD_POOL_CLASS=org.quartz.threadPool.class);
-
loadHelper.loadClass(plugInClass).newInstance(); 通过反射的方式实例化线程池;
-
默认使用org.quartz.simpl.SimpleThreadPool。
String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());
if (tpClass == null) {... 省略异常处理过程}
try {
tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
} catch (Exception e) {... 省略异常处理过程}
tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true);
try {
setBeanProps(tp, tProps);
} catch (Exception e) {... 省略异常处理过程}
1.3.5 初始化JobStore
Get JobStore Properties
这里是创建JobStore的地方,负责保存作业和触发器。这里是默认的RAMJobStore
-
通过配置文件获取插件信息(PROP_JOB_STORE_CLASS=org.quartz.jobStore.class);
-
loadHelper.loadClass(plugInClass).newInstance(); 通过反射的方式实例化JobStore;
-
默认使用 org.quartz.simpl.RAMJobStore
-
但是其实RAMJobStore内部实现使用的是HashMap,为了安全,又在Map的操作上加了锁,其实这是比较影响性能的,不知道为什么没有考虑ConcurrentHashMap
String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS,
RAMJobStore.class.getName());
if (jsClass == null) {... 省略异常处理过程}
try {
js = (JobStore) loadHelper.loadClass(jsClass).newInstance();
} catch (Exception e) {... 省略异常处理过程}
SchedulerDetailsSetter.setDetails(js, schedName, schedInstId);
tProps = cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX, true, new String[] {PROP_JOB_STORE_LOCK_HANDLER_PREFIX});
try {
setBeanProps(js, tProps);
} catch (Exception e) {... 省略异常处理过程}
if (js instanceof JobStoreSupport) {
// Install custom lock handler (Semaphore)
String lockHandlerClass = cfg.getStringProperty(PROP_JOB_STORE_LOCK_HANDLER_CLASS);
if (lockHandlerClass != null) {
try {
Semaphore lockHandler = (Semaphore)loadHelper.loadClass(lockHandlerClass).newInstance();
tProps = cfg.getPropertyGroup(PROP_JOB_STORE_LOCK_HANDLER_PREFIX, true);
// If this lock handler requires the table prefix, add it to its properties.
if (lockHandler instanceof TablePrefixAware) {
tProps.setProperty(
PROP_TABLE_PREFIX, ((JobStoreSupport)js).getTablePrefix());
tProps.setProperty(
PROP_SCHED_NAME, schedName);
}
try {
setBeanProps(lockHandler, tProps);
} catch (Exception e) {... 省略异常处理过程}
}
}
1.3.6 初始化DataSources
Set up any DataSources
我主要是基于内存Store分析,所以这里就不多介绍,其实就是根据配置的DBSources初始化,quartz默认使用的c3p0
String[] dsNames = cfg.getPropertyGroups(PROP_DATASOURCE_PREFIX);
for (int i = 0; i < dsNames.length; i++) {
PropertiesParser pp = new PropertiesParser(cfg.getPropertyGroup(
PROP_DATASOURCE_PREFIX + "." + dsNames[i], true));
String cpClass = pp.getStringProperty(PROP_CONNECTION_PROVIDER_CLASS, null);
// custom connectionProvider
...
}
1.3.7 初始化插件
Set up any SchedulerPlugins
下面是我截取的配置文件的插件配置demo:
#===============================================================
#配置插件 格式:PROP_PLUGIN_PREFIX.[插件名].class=插件类全限定名
#===============================================================
org.quartz.plugin.tiggerHistory.class=org.quartz.plugins.history.LoggingJobHistoryPlugin
org.quartz.plugin.MultiSerialPlanJob.class=com.mzw.scheduler.quartz.listeners.MultiSerialJobListener
-
通过配置文件获取插件信息(PROP_PLUGIN_PREFIX=org.quartz.plugin);
-
loadHelper.loadClass(plugInClass).newInstance(); 通过反射的方式实例化插件;
-
然后实例化成功后放入SchedulerPlugin[] plugins 数组里面。
String[] pluginNames = cfg.getPropertyGroups(PROP_PLUGIN_PREFIX);
SchedulerPlugin[] plugins = new SchedulerPlugin[pluginNames.length];
for (int i = 0; i < pluginNames.length; i++) {
Properties pp = cfg.getPropertyGroup(PROP_PLUGIN_PREFIX + "."
+ pluginNames[i], true);
String plugInClass = pp.getProperty(PROP_PLUGIN_CLASS, null);
if (plugInClass == null) {
... 省略异常处理过程
}
SchedulerPlugin plugin = null;
try {
plugin = (SchedulerPlugin)
loadHelper.loadClass(plugInClass).newInstance();
}
... 省略异常处理过程
plugins[i] = plugin;
}
1.3.8 初始化JobListener
其实可以发现过程和初始化插件及其相似(其实这是一种通过配置文件配置的做法,还有一种是通过代码程序的方式显示添加,通常会采用第二种):
-
通过配置文件获取插件信息(PROP_JOB_LISTENER_PREFIX=org.quartz.jobListener);
-
loadHelper.loadClass(listenerClass).newInstance(); 通过反射的方式实例化JobListener;
-
然后实例化成功后放入JobListener[] jobListeners 数组里面。OK! 同样的Set up any TriggerListeners 就不一一分析了
Set up any JobListeners
// Set up any JobListeners
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Class<?>[] strArg = new Class[] { String.class };
String[] jobListenerNames = cfg.getPropertyGroups(PROP_JOB_LISTENER_PREFIX);
JobListener[] jobListeners = new JobListener[jobListenerNames.length];
for (int i = 0; i < jobListenerNames.length; i++) {
Properties lp = cfg.getPropertyGroup(PROP_JOB_LISTENER_PREFIX + "."
+ jobListenerNames[i], true);
String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null);
if (listenerClass == null) { ... 省略异常处理过程 }
JobListener listener = null;
try {
listener = (JobListener)
loadHelper.loadClass(listenerClass).newInstance();
} catch (Exception e) { ... 省略异常处理过程 }
try {
Method nameSetter = null;
try {
nameSetter = listener.getClass().getMethod("setName", strArg);
}
catch(NoSuchMethodException ignore) {
/* do nothing */
}
if(nameSetter != null) {
nameSetter.invoke(listener, new Object[] {jobListenerNames[i] } );
}
setBeanProps(listener, lp);
} catch (Exception e) {... 省略异常处理过程}
jobListeners[i] = listener;
}
1.3.9 初始化线程执行器
Get ThreadExecutor Properties
可以发现过程和初始化插件/监听器及其相似(quartz默认也没有配置这样,):
-
通过配置文件获取插件信息(PROP_THREAD_EXECUTOR_CLASS=org.org.quartz.threadExecutor.class);
-
loadHelper.loadClass(threadExecutorClass).newInstance(); 通过反射的方式实例化ThreadExecutor;
-
如果没有找到配置信息,则产生一个默认的ThreadExecutor。
// Get ThreadExecutor Properties
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
String threadExecutorClass = cfg.getStringProperty(PROP_THREAD_EXECUTOR_CLASS);
if (threadExecutorClass != null) {
tProps = cfg.getPropertyGroup(PROP_THREAD_EXECUTOR, true);
try {
threadExecutor = (ThreadExecutor) loadHelper.loadClass(threadExecutorClass).newInstance();
log.info("Using custom implementation for ThreadExecutor: " + threadExecutorClass);
setBeanProps(threadExecutor, tProps);
} catch (Exception e) {... 省略异常处理过程;}
} else {
log.info("Using default implementation for ThreadExecutor");
threadExecutor = new DefaultThreadExecutor();
}
1.3.10 初始化线程对象工厂
JobRunShellFactory
JobRunShellFactory其实就是用来产生JobRunShell对象,JobRunShell 其实就是一个Runnable对象,因为它实现的Runnalbe。
** Create correct run-shell factory...**
之后就开始使用这些对象:
1.3.11 集群模式
如果是集群模式就创建节点实例Id
if (autoId) {
try {
schedInstId = DEFAULT_INSTANCE_ID;
if (js.isClustered()) {
schedInstId = instanceIdGenerator.generateInstanceId();
}
} catch (Exception e) {... 省略异常处理过程;}
}
1.3.12 组件封装,然后初始化Scheduler
-
Scheduler 其实就是交互API,里面还有一个QuartzScheduler的实例变量
-
真正Quartz的核心是在QuartzScheduler
-
绑定scheduler :SchedulerRepository内部维护一个 HashMap<String, Scheduler> schedulers;
QuartzSchedulerResources rsrcs = new QuartzSchedulerResources();
rsrcs.setName(schedName);
rsrcs.setThreadName(threadName);
rsrcs.setInstanceId(schedInstId);
rsrcs.setJobRunShellFactory(jrsf);
rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon);
...
rsrcs.setJobStore(js); // add plugins
for (int i = 0; i < plugins.length; i++) {
rsrcs.addSchedulerPlugin(plugins[i]);
}
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
qsInited = true;
// Create Scheduler ref...
Scheduler scheduler = instantiate(rsrcs, qs);
schedRep.bind(scheduler);
return scheduler;
1.3.13 初始化QuartzScheduler
-
创建Quartz的主线程。
-
QuartzSchedulerThread对象会在QuartzScheduler的构造函数中会将其自启动,进入run的等待中
-
paused 就是等待外界的信号量,
*QuartzSchedulerThread run() 方法就是主程序,在下一个篇幅中解说。
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval) throws SchedulerException {
this.resources = resources;
if (resources.getJobStore() instanceof JobListener) {
addInternalJobListener((JobListener)resources.getJobStore());
}
//创建Quartz的主线程。
this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
//提交QuartzSchedulerThread对象 给线程池
schedThreadExecutor.execute(this.schedThread);
if (idleWaitTime > 0) {
this.schedThread.setIdleWaitTime(idleWaitTime);
}
jobMgr = new ExecutingJobsManager();
addInternalJobListener(jobMgr);
errLogger = new ErrorLogger();
addInternalSchedulerListener(errLogger);
signaler = new SchedulerSignalerImpl(this, this.schedThread);
getLog().info("Quartz Scheduler v." + getVersion() + " created.");
}
以上就是所以组件的初始化完成。
网友评论