美文网首页Quartz 源码解读-初识Quartz
03 Quartz-初识Quartz-Scheduler初始化

03 Quartz-初识Quartz-Scheduler初始化

作者: 花神子 | 来源:发表于2019-05-20 16:15 被阅读0次
quartz

我们在01 Quartz-初识Quartz-样例入手 篇中写了获取scheduler的代码如下:

SchedulerFactory schedulerFactory = new StdSchedulerFactory();
scheduler = schedulerFactory.getScheduler();

SchedulerFacotory是一个接口,它有两个实现:

  • StdSchedulerFacotory根据配置文件来创建Scheduler;

  • DirectSchedulerFactory 主要通过编码对Scheduler控制;

考虑代码的侵入性程序、编写能力我们会采用StdSchedulerFacotory类型来创建StdScheduler,使用配置文件quartz.properties里面的配置都对应到这个StdSchedulerFactory中,所以对某个配置不明白已经该配置的默认值可以看StdSchedulerFactory中获取配置的代码。

一 Scheduler初始化

我们看下这个schedulerFactory.getScheduler()方法(默认使用实现StdSchedulerFactory):

1.1 方法1:getScheduler()

public Scheduler getScheduler() throws SchedulerException {
    //第一步:加载配置文件,System的properties覆盖前面的配置
    if (cfg == null) {
        initialize();
    }
    //第二步:先进行缓存查找,找到且不是关闭状态则返回,如不则进行第三步
    SchedulerRepository schedRep = SchedulerRepository.getInstance();
    Scheduler sched = schedRep.lookup(getSchedulerName());
    //判断若存在且已停止运行,则从缓存中移除
    if (sched != null) {
        if (sched.isShutdown()) {
            schedRep.remove(getSchedulerName());
        } else {
            return sched;
        }
    }
    //第三步:如果第二步没有找到,则初始化scheduler 返回一个新的。
    sched = instantiate();
    return sched;
}

1.2 方法2:instantiate()

不知道作者怎么考虑的 一个方法写一千来行也是醉了:

private Scheduler instantiate() throws SchedulerException {
    //作者这个initialize()方法为什么调两遍 instantiate() 方法全局就getScheduler()了
    if (cfg == null) {
        initialize();
    }
    
    if (initException != null) {
        throw initException;
    }
    
    JobStore js = null;
    ThreadPool tp = null;
    QuartzScheduler qs = null;
    DBConnectionManager dbMgr = null;
    String instanceIdGeneratorClass = null;
    Properties tProps = null;
    String userTXLocation = null;
    boolean wrapJobInTx = false;
    boolean autoId = false;
    long idleWaitTime = -1;
    long dbFailureRetry = 15000L; // 15 secs
    String classLoadHelperClass;
    String jobFactoryClass;
    ThreadExecutor threadExecutor;
    SchedulerRepository schedRep = SchedulerRepository.getInstance();
    ...

    // add plugins
    for (int i = 0; i < plugins.length; i++) {
        rsrcs.addSchedulerPlugin(plugins[I]);
    }

    qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
    qsInited = true;

    // Create Scheduler ref...
    Scheduler scheduler = instantiate(rsrcs, qs);

    // set job factory if specified
    if(jobFactory != null) {
        qs.setJobFactory(jobFactory);
    }

    // Initialize plugins now that we have a Scheduler instance.
    for (int i = 0; i < plugins.length; i++) {
        plugins[i].initialize(pluginNames[i], scheduler, loadHelper);
    }

    // add listeners
    for (int i = 0; i < jobListeners.length; i++) {
        qs.getListenerManager().addJobListener(jobListeners[i], EverythingMatcher.allJobs());
    }
    for (int i = 0; i < triggerListeners.length; i++) {
        qs.getListenerManager().addTriggerListener(triggerListeners[i], EverythingMatcher.allTriggers());
    }

    // set scheduler context data...
    for(Object key: schedCtxtProps.keySet()) {
        String val = schedCtxtProps.getProperty((String) key);    
        scheduler.getContext().put((String)key, val);
    }

    // fire up job store, and runshell factory

    js.setInstanceId(schedInstId);
    js.setInstanceName(schedName);
    js.setThreadPoolSize(tp.getPoolSize());
    js.initialize(loadHelper, qs.getSchedulerSignaler());

    jrsf.initialize(scheduler);
    
    qs.initialize();

    getLog().info(
            "Quartz scheduler '" + scheduler.getSchedulerName()
                    + "' initialized from " + propSrc);

    getLog().info("Quartz scheduler version: " + qs.getVersion());

    // prevents the repository from being garbage collected
    qs.addNoGCObject(schedRep);
    // prevents the db manager from being garbage collected
    if (dbMgr != null) {
        qs.addNoGCObject(dbMgr);
    }

    schedRep.bind(scheduler);
    return scheduler;
    }
    ... 省略异常处理过程
}

1.3 主要就是做了一些对象的初始化操作:

...

  • Set up any DataSources
  • Set up any SchedulerPlugins
  • Set up any JobListeners
  • Set up any TriggerListeners
  • Get ThreadExecutor Properties
  • add plugins
  • Create Scheduler ref...
  • set job factory if specified
  • Initialize plugins now that we have a Scheduler instance.
  • add listeners
  • set scheduler context data
    ...

我将会选取一些比较重要的方法参看一下: 其实基本涵盖了所有的方法。

1.3.1 初始化ClassLoadHelper

Create class load helper ClassLoadHelper主要提供之后通过反射来创建实例。

ClassLoadHelper loadHelper = null;
try {
    loadHelper = (ClassLoadHelper) loadClass(classLoadHelperClass)
            .newInstance();
} catch (Exception e) { ... 省略异常处理过程}
loadHelper.initialize();

1.3.2 初始化JobFactory

JobFactory 用于创建Job实例,我们在02 Quartz-初识Quartz-组件解释 中解说Job并发的时候说明了Job是原型模式不是单例,其实在这里我们可以得到应征

JobFactory jobFactory = null;
if(jobFactoryClass != null) {
    try {
        jobFactory = (JobFactory) loadHelper.loadClass(jobFactoryClass)
                .newInstance();
    } catch (Exception e) { ... 省略异常处理过程}

    tProps = cfg.getPropertyGroup(PROP_SCHED_JOB_FACTORY_PREFIX, true);
    try {
        setBeanProps(jobFactory, tProps);
    } catch (Exception e) { ... 省略异常处理过程}
}

下文是的JobFactory代码实现,每次都是创建一个新的Job。

public class SimpleJobFactory implements JobFactory {
    private final Logger log = LoggerFactory.getLogger(getClass());
    
    protected Logger getLog() {
        return log;
    }
    
    public Job newJob(TriggerFiredBundle bundle, Scheduler Scheduler) throws SchedulerException {
        JobDetail jobDetail = bundle.getJobDetail();
        Class<? extends Job> jobClass = jobDetail.getJobClass();
        try {
            if(log.isDebugEnabled()) {
                log.debug(
                    "Producing instance of Job '" + jobDetail.getKey() + 
                    "', class=" + jobClass.getName());
            }
            
            return jobClass.newInstance();
        } catch (Exception e) {... 省略异常处理过程}
    }
}

1.3.3 初始化InstanceIdGenerator

loadHelper.loadClass(instanceIdGeneratorClass);主要是在集群模式下负责为 Scheduler 节点生成集群范围的唯一*实例ID。默认实现其实就是主机地址加上系统时间;

public class SimpleInstanceIdGenerator implements InstanceIdGenerator {
    public String generateInstanceId() throws SchedulerException {
        try {
            return InetAddress.getLocalHost().getHostName() + System.currentTimeMillis();
        } catch (Exception e) {
            throw new SchedulerException("Couldn't get host name!", e);
        }
    }
}

1.3.4 初始化线程池

Get ThreadPool Properties

  • 通过配置文件获取插件信息(PROP_THREAD_POOL_CLASS=org.quartz.threadPool.class);

  • loadHelper.loadClass(plugInClass).newInstance(); 通过反射的方式实例化线程池;

  • 默认使用org.quartz.simpl.SimpleThreadPool。

String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());
if (tpClass == null) {... 省略异常处理过程}

try {
    tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
} catch (Exception e) {... 省略异常处理过程}

tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true);

try {
    setBeanProps(tp, tProps);
} catch (Exception e) {... 省略异常处理过程}

1.3.5 初始化JobStore

Get JobStore Properties

这里是创建JobStore的地方,负责保存作业和触发器。这里是默认的RAMJobStore

  • 通过配置文件获取插件信息(PROP_JOB_STORE_CLASS=org.quartz.jobStore.class);

  • loadHelper.loadClass(plugInClass).newInstance(); 通过反射的方式实例化JobStore;

  • 默认使用 org.quartz.simpl.RAMJobStore

  • 但是其实RAMJobStore内部实现使用的是HashMap,为了安全,又在Map的操作上加了锁,其实这是比较影响性能的,不知道为什么没有考虑ConcurrentHashMap

String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS,
                RAMJobStore.class.getName());

if (jsClass == null) {... 省略异常处理过程}

try {
    js = (JobStore) loadHelper.loadClass(jsClass).newInstance();
} catch (Exception e) {... 省略异常处理过程}

SchedulerDetailsSetter.setDetails(js, schedName, schedInstId);

tProps = cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX, true, new String[] {PROP_JOB_STORE_LOCK_HANDLER_PREFIX});
try {
    setBeanProps(js, tProps);
} catch (Exception e) {... 省略异常处理过程}

if (js instanceof JobStoreSupport) {
    // Install custom lock handler (Semaphore)
    String lockHandlerClass = cfg.getStringProperty(PROP_JOB_STORE_LOCK_HANDLER_CLASS);
    if (lockHandlerClass != null) {
        try {
            Semaphore lockHandler = (Semaphore)loadHelper.loadClass(lockHandlerClass).newInstance();

            tProps = cfg.getPropertyGroup(PROP_JOB_STORE_LOCK_HANDLER_PREFIX, true);

            // If this lock handler requires the table prefix, add it to its properties.
            if (lockHandler instanceof TablePrefixAware) {
                tProps.setProperty(
                        PROP_TABLE_PREFIX, ((JobStoreSupport)js).getTablePrefix());
                tProps.setProperty(
                        PROP_SCHED_NAME, schedName);
            }

            try {
                setBeanProps(lockHandler, tProps);
            } catch (Exception e) {... 省略异常处理过程}
    }
}

1.3.6 初始化DataSources

Set up any DataSources

我主要是基于内存Store分析,所以这里就不多介绍,其实就是根据配置的DBSources初始化,quartz默认使用的c3p0

String[] dsNames = cfg.getPropertyGroups(PROP_DATASOURCE_PREFIX);
for (int i = 0; i < dsNames.length; i++) {
    PropertiesParser pp = new PropertiesParser(cfg.getPropertyGroup(
            PROP_DATASOURCE_PREFIX + "." + dsNames[i], true));

    String cpClass = pp.getStringProperty(PROP_CONNECTION_PROVIDER_CLASS, null);

    // custom connectionProvider
    ...    
}

1.3.7 初始化插件

Set up any SchedulerPlugins

下面是我截取的配置文件的插件配置demo:

#===============================================================
#配置插件 格式:PROP_PLUGIN_PREFIX.[插件名].class=插件类全限定名
#===============================================================
org.quartz.plugin.tiggerHistory.class=org.quartz.plugins.history.LoggingJobHistoryPlugin
org.quartz.plugin.MultiSerialPlanJob.class=com.mzw.scheduler.quartz.listeners.MultiSerialJobListener
  • 通过配置文件获取插件信息(PROP_PLUGIN_PREFIX=org.quartz.plugin);

  • loadHelper.loadClass(plugInClass).newInstance(); 通过反射的方式实例化插件;

  • 然后实例化成功后放入SchedulerPlugin[] plugins 数组里面。

String[] pluginNames = cfg.getPropertyGroups(PROP_PLUGIN_PREFIX);
SchedulerPlugin[] plugins = new SchedulerPlugin[pluginNames.length];
for (int i = 0; i < pluginNames.length; i++) {
    Properties pp = cfg.getPropertyGroup(PROP_PLUGIN_PREFIX + "."
            + pluginNames[i], true);

    String plugInClass = pp.getProperty(PROP_PLUGIN_CLASS, null);

    if (plugInClass == null) {
        ... 省略异常处理过程
    }
    SchedulerPlugin plugin = null;
    try {
        plugin = (SchedulerPlugin)
                loadHelper.loadClass(plugInClass).newInstance();
    } 
    ... 省略异常处理过程

    plugins[i] = plugin;
}

1.3.8 初始化JobListener

其实可以发现过程和初始化插件及其相似(其实这是一种通过配置文件配置的做法,还有一种是通过代码程序的方式显示添加,通常会采用第二种):

  • 通过配置文件获取插件信息(PROP_JOB_LISTENER_PREFIX=org.quartz.jobListener);

  • loadHelper.loadClass(listenerClass).newInstance(); 通过反射的方式实例化JobListener;

  • 然后实例化成功后放入JobListener[] jobListeners 数组里面。OK! 同样的Set up any TriggerListeners 就不一一分析了

Set up any JobListeners

// Set up any JobListeners
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Class<?>[] strArg = new Class[] { String.class };
String[] jobListenerNames = cfg.getPropertyGroups(PROP_JOB_LISTENER_PREFIX);
JobListener[] jobListeners = new JobListener[jobListenerNames.length];
for (int i = 0; i < jobListenerNames.length; i++) {
    Properties lp = cfg.getPropertyGroup(PROP_JOB_LISTENER_PREFIX + "."
            + jobListenerNames[i], true);

    String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null);

    if (listenerClass == null) { ... 省略异常处理过程 }
    JobListener listener = null;
    try {
        listener = (JobListener)
               loadHelper.loadClass(listenerClass).newInstance();
    } catch (Exception e) { ... 省略异常处理过程 }
    try {
        Method nameSetter = null;
        try { 
            nameSetter = listener.getClass().getMethod("setName", strArg);
        }
        catch(NoSuchMethodException ignore) { 
            /* do nothing */ 
        }
        if(nameSetter != null) {
            nameSetter.invoke(listener, new Object[] {jobListenerNames[i] } );
        }
        setBeanProps(listener, lp);
    } catch (Exception e) {... 省略异常处理过程}
    jobListeners[i] = listener;
}

1.3.9 初始化线程执行器

Get ThreadExecutor Properties

可以发现过程和初始化插件/监听器及其相似(quartz默认也没有配置这样,):

  • 通过配置文件获取插件信息(PROP_THREAD_EXECUTOR_CLASS=org.org.quartz.threadExecutor.class);

  • loadHelper.loadClass(threadExecutorClass).newInstance(); 通过反射的方式实例化ThreadExecutor;

  • 如果没有找到配置信息,则产生一个默认的ThreadExecutor。

// Get ThreadExecutor Properties
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

String threadExecutorClass = cfg.getStringProperty(PROP_THREAD_EXECUTOR_CLASS);
if (threadExecutorClass != null) {
    tProps = cfg.getPropertyGroup(PROP_THREAD_EXECUTOR, true);
    try {
        threadExecutor = (ThreadExecutor) loadHelper.loadClass(threadExecutorClass).newInstance();
        log.info("Using custom implementation for ThreadExecutor: " + threadExecutorClass);

        setBeanProps(threadExecutor, tProps);
    } catch (Exception e) {... 省略异常处理过程;}
} else {
    log.info("Using default implementation for ThreadExecutor");
    threadExecutor = new DefaultThreadExecutor();
}

1.3.10 初始化线程对象工厂

JobRunShellFactory
JobRunShellFactory其实就是用来产生JobRunShell对象,JobRunShell 其实就是一个Runnable对象,因为它实现的Runnalbe。

** Create correct run-shell factory...**

之后就开始使用这些对象:

1.3.11 集群模式

如果是集群模式就创建节点实例Id

if (autoId) {
    try {
      schedInstId = DEFAULT_INSTANCE_ID;
      if (js.isClustered()) {
      schedInstId = instanceIdGenerator.generateInstanceId();
      }
    } catch (Exception e) {... 省略异常处理过程;}
}

1.3.12 组件封装,然后初始化Scheduler

  • Scheduler 其实就是交互API,里面还有一个QuartzScheduler的实例变量

  • 真正Quartz的核心是在QuartzScheduler

  • 绑定scheduler :SchedulerRepository内部维护一个 HashMap<String, Scheduler> schedulers;

QuartzSchedulerResources rsrcs = new QuartzSchedulerResources();
    rsrcs.setName(schedName);
    rsrcs.setThreadName(threadName);
    rsrcs.setInstanceId(schedInstId);
    rsrcs.setJobRunShellFactory(jrsf);
    rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon);
    ...
    rsrcs.setJobStore(js);   // add plugins 
    for (int i = 0; i < plugins.length; i++) {
        rsrcs.addSchedulerPlugin(plugins[i]); 
    }
    qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry); 
    qsInited = true;   
    // Create Scheduler ref... 
    Scheduler scheduler = instantiate(rsrcs, qs);

    schedRep.bind(scheduler); 
    return scheduler;

1.3.13 初始化QuartzScheduler

  • 创建Quartz的主线程。

  • QuartzSchedulerThread对象会在QuartzScheduler的构造函数中会将其自启动,进入run的等待中

  • paused 就是等待外界的信号量,

*QuartzSchedulerThread run() 方法就是主程序,在下一个篇幅中解说。

public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval) throws SchedulerException {
    this.resources = resources;
    if (resources.getJobStore() instanceof JobListener) {
        addInternalJobListener((JobListener)resources.getJobStore());
    }
    //创建Quartz的主线程。
    this.schedThread = new QuartzSchedulerThread(this, resources);
    ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
    //提交QuartzSchedulerThread对象 给线程池
    schedThreadExecutor.execute(this.schedThread);
    if (idleWaitTime > 0) {
        this.schedThread.setIdleWaitTime(idleWaitTime);
    }

    jobMgr = new ExecutingJobsManager();
    addInternalJobListener(jobMgr);
    errLogger = new ErrorLogger();
    addInternalSchedulerListener(errLogger);

    signaler = new SchedulerSignalerImpl(this, this.schedThread);
    
    getLog().info("Quartz Scheduler v." + getVersion() + " created.");
}

以上就是所以组件的初始化完成。


相关文章

网友评论

    本文标题:03 Quartz-初识Quartz-Scheduler初始化

    本文链接:https://www.haomeiwen.com/subject/tigkzqtx.html