美文网首页
领略Quartz源码架构之美——源码实弹之Scheduler(五

领略Quartz源码架构之美——源码实弹之Scheduler(五

作者: 向光奔跑_ | 来源:发表于2018-11-28 18:09 被阅读0次

    本章阅读收获:可了解Quartz框架中的Scheduler部分源码

    继上一节内容

    上一节内容我们讲到了instantiate初始化方法中的数据库连接相关操作,本节内容将结束整个schedule初始化操作,让我们继续耐心往下看~~~~

    instantiate初始化过程源码分析

    在数据库相关操作之后,接下来是插件相关操作~

     // Set up any SchedulerPlugins
            // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    
            String[] pluginNames = cfg.getPropertyGroups(PROP_PLUGIN_PREFIX);
            SchedulerPlugin[] plugins = new SchedulerPlugin[pluginNames.length];
            for (int i = 0; i < pluginNames.length; i++) {
                Properties pp = cfg.getPropertyGroup(PROP_PLUGIN_PREFIX + "."
                        + pluginNames[i], true);
    
                String plugInClass = pp.getProperty(PROP_PLUGIN_CLASS, null);
    
                if (plugInClass == null) {
                    initException = new SchedulerException(
                            "SchedulerPlugin class not specified for plugin '"
                                    + pluginNames[i] + "'");
                    throw initException;
                }
                SchedulerPlugin plugin = null;
                try {
                    plugin = (SchedulerPlugin)
                            loadHelper.loadClass(plugInClass).newInstance();
                } catch (Exception e) {
                    initException = new SchedulerException(
                            "SchedulerPlugin class '" + plugInClass
                                    + "' could not be instantiated.", e);
                    throw initException;
                }
                try {
                    setBeanProps(plugin, pp);
                } catch (Exception e) {
                    initException = new SchedulerException(
                            "JobStore SchedulerPlugin '" + plugInClass
                                    + "' props could not be configured.", e);
                    throw initException;
                }
    
                plugins[i] = plugin;
            }
    

    这段代码非常明显,就是根据我们配置的插件类来进行初始化注入参数工作。目前也不做过度的展开~~~

            // Set up any JobListeners
            // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    
            Class<?>[] strArg = new Class[] { String.class };
            String[] jobListenerNames = cfg.getPropertyGroups(PROP_JOB_LISTENER_PREFIX);
            JobListener[] jobListeners = new JobListener[jobListenerNames.length];
            for (int i = 0; i < jobListenerNames.length; i++) {
                Properties lp = cfg.getPropertyGroup(PROP_JOB_LISTENER_PREFIX + "."
                        + jobListenerNames[i], true);
    
                String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null);
    
                if (listenerClass == null) {
                    initException = new SchedulerException(
                            "JobListener class not specified for listener '"
                                    + jobListenerNames[i] + "'");
                    throw initException;
                }
                JobListener listener = null;
                try {
                    listener = (JobListener)
                           loadHelper.loadClass(listenerClass).newInstance();
                } catch (Exception e) {
                    initException = new SchedulerException(
                            "JobListener class '" + listenerClass
                                    + "' could not be instantiated.", e);
                    throw initException;
                }
                try {
                    Method nameSetter = null;
                    try { 
                        nameSetter = listener.getClass().getMethod("setName", strArg);
                    }
                    catch(NoSuchMethodException ignore) { 
                        /* do nothing */ 
                    }
                    if(nameSetter != null) {
                        nameSetter.invoke(listener, new Object[] {jobListenerNames[i] } );
                    }
                    setBeanProps(listener, lp);
                } catch (Exception e) {
                    initException = new SchedulerException(
                            "JobListener '" + listenerClass
                                    + "' props could not be configured.", e);
                    throw initException;
                }
                jobListeners[i] = listener;
            }
    

    这段源码内容作用相信大家大致也能猜的出来,就是关于Job的监听器初始化工作。我们看到可以配置多个Job监听器,这里有一个特殊点就是:

    try { 
                    nameSetter = listener.getClass().getMethod("setName", strArg);
                    }
                    catch(NoSuchMethodException ignore) { 
                        /* do nothing */ 
                    }
                    if(nameSetter != null) {
                        nameSetter.invoke(listener, new Object[] {jobListenerNames[i] } );
                    }
    

    也不难理解,就是获取setName方法,并把配置的jobListener名字注入进去。至于为什么这里不直接使用setBeanProps(listener, lp);方式直接注入,这是个疑问点?

    同理我们看下触发器监听器的初始化代码,代码基本一致,也不做过多介绍:

            // Set up any TriggerListeners
            // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    
            String[] triggerListenerNames = cfg.getPropertyGroups(PROP_TRIGGER_LISTENER_PREFIX);
            TriggerListener[] triggerListeners = new TriggerListener[triggerListenerNames.length];
            for (int i = 0; i < triggerListenerNames.length; i++) {
                Properties lp = cfg.getPropertyGroup(PROP_TRIGGER_LISTENER_PREFIX + "."
                        + triggerListenerNames[i], true);
    
                String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null);
    
                if (listenerClass == null) {
                    initException = new SchedulerException(
                            "TriggerListener class not specified for listener '"
                                    + triggerListenerNames[i] + "'");
                    throw initException;
                }
                TriggerListener listener = null;
                try {
                    listener = (TriggerListener)
                           loadHelper.loadClass(listenerClass).newInstance();
                } catch (Exception e) {
                    initException = new SchedulerException(
                            "TriggerListener class '" + listenerClass
                                    + "' could not be instantiated.", e);
                    throw initException;
                }
                try {
                    Method nameSetter = null;
                    try { 
                        nameSetter = listener.getClass().getMethod("setName", strArg);
                    }
                    catch(NoSuchMethodException ignore) { /* do nothing */ }
                    if(nameSetter != null) {
                        nameSetter.invoke(listener, new Object[] {triggerListenerNames[i] } );
                    }
                    setBeanProps(listener, lp);
                } catch (Exception e) {
                    initException = new SchedulerException(
                            "TriggerListener '" + listenerClass
                                    + "' props could not be configured.", e);
                    throw initException;
                }
                triggerListeners[i] = listener;
            }
    

    最后一个初始化属性是线程执行器:

            // Get ThreadExecutor Properties
            // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    
            String threadExecutorClass = cfg.getStringProperty(PROP_THREAD_EXECUTOR_CLASS);
            if (threadExecutorClass != null) {
                tProps = cfg.getPropertyGroup(PROP_THREAD_EXECUTOR, true);
                try {
                    threadExecutor = (ThreadExecutor) loadHelper.loadClass(threadExecutorClass).newInstance();
                    log.info("Using custom implementation for ThreadExecutor: " + threadExecutorClass);
    
                    setBeanProps(threadExecutor, tProps);
                } catch (Exception e) {
                    initException = new SchedulerException(
                            "ThreadExecutor class '" + threadExecutorClass + "' could not be instantiated.", e);
                    throw initException;
                }
            } else {
                log.info("Using default implementation for ThreadExecutor");
                threadExecutor = new DefaultThreadExecutor();
            }
    

    这里也是千篇一律的加载threadExecutor类。

    接下来终于要到初始化的高潮了,也就是最后一步了~~~~开搞!!!

             // Fire everything up
            // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    
            boolean tpInited = false;
            boolean qsInited = false;
            try {
                    
                //这是一个封装执行Job的工厂类,这里会有一个JobRunShell类,主要是为了一些listener的执行和异常处理,这里不做深入展开
                JobRunShellFactory jrsf = null; // Create correct run-shell factory...
                //是否设置事务
                if (userTXLocation != null) {
                    UserTransactionHelper.setUserTxLocation(userTXLocation);
                }
                //包装任务进事务中,默认false
                if (wrapJobInTx) {
                    jrsf = new JTAJobRunShellFactory();
                } else {
                    jrsf = new JTAAnnotationAwareJobRunShellFactory();
                }
                //设置任务调度器唯一标识符
                if (autoId) {
                    try {
                      schedInstId = DEFAULT_INSTANCE_ID;
                      //如果JobStore是集群,必须要有调度器唯一标识符生成器
                      if (js.isClustered()) {
                          schedInstId = instanceIdGenerator.generateInstanceId();
                      }
                    } catch (Exception e) {
                        getLog().error("Couldn't generate instance Id!", e);
                        throw new IllegalStateException("Cannot run without an instance id.");
                    }
                }
    
                //应该和JMX相关,而我们默认实现的是RAMJobStore,开头是org.quartz.simpl,所以不做深度进去解析
                if (js.getClass().getName().startsWith("org.terracotta.quartz")) {
                    try {
                        String uuid = (String) js.getClass().getMethod("getUUID").invoke(js);
                        if(schedInstId.equals(DEFAULT_INSTANCE_ID)) {
                            schedInstId = "TERRACOTTA_CLUSTERED,node=" + uuid;
                            if (jmxObjectName == null) {
                                jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId);
                            }
                        } else if(jmxObjectName == null) {
                            jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId + ",node=" + uuid);
                        }
                    } catch(Exception e) {
                        throw new RuntimeException("Problem obtaining node id from TerracottaJobStore.", e);
                    }
    
                    if(null == cfg.getStringProperty(PROP_SCHED_JMX_EXPORT)) {
                        jmxExport = true;
                    }
                }
                //我们默认实现的是RAMJobStore,也不是JobStoreSupport的实现类,JobStoreSupport主要是用于数据库存储信息
                if (js instanceof JobStoreSupport) {
                    JobStoreSupport jjs = (JobStoreSupport)js;
                    //存储失败重试时间
                    jjs.setDbRetryInterval(dbFailureRetry);
                    //这部分代码目前还不太懂
                    if(threadsInheritInitalizersClassLoader)
                        jjs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
                    //把之前初始化成功的线程执行器放入
                    jjs.setThreadExecutor(threadExecutor);
                }
                //开始创建调度器资源类
                QuartzSchedulerResources rsrcs = new QuartzSchedulerResources();
                rsrcs.setName(schedName);
                rsrcs.setThreadName(threadName);
                rsrcs.setInstanceId(schedInstId);
                rsrcs.setJobRunShellFactory(jrsf);
                rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon);
                rsrcs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
                rsrcs.setBatchTimeWindow(batchTimeWindow);
                rsrcs.setMaxBatchSize(maxBatchSize);
                rsrcs.setInterruptJobsOnShutdown(interruptJobsOnShutdown);
                rsrcs.setInterruptJobsOnShutdownWithWait(interruptJobsOnShutdownWithWait);
                rsrcs.setJMXExport(jmxExport);
                rsrcs.setJMXObjectName(jmxObjectName);
                //这块也不是太懂
                if (managementRESTServiceEnabled) {
                    ManagementRESTServiceConfiguration managementRESTServiceConfiguration = new ManagementRESTServiceConfiguration();
                    managementRESTServiceConfiguration.setBind(managementRESTServiceHostAndPort);
                    managementRESTServiceConfiguration.setEnabled(managementRESTServiceEnabled);
                    rsrcs.setManagementRESTServiceConfiguration(managementRESTServiceConfiguration);
                }
                //这里是rmi相关,也跳过
                if (rmiExport) {
                    rsrcs.setRMIRegistryHost(rmiHost);
                    rsrcs.setRMIRegistryPort(rmiPort);
                    rsrcs.setRMIServerPort(rmiServerPort);
                    rsrcs.setRMICreateRegistryStrategy(rmiCreateRegistry);
                    rsrcs.setRMIBindName(rmiBindName);
                }
                // ThreadPool tp 是线程池,这里是对setInstanceName,setInstanceId赋值
                SchedulerDetailsSetter.setDetails(tp, schedName, schedInstId);
                //在调度器资源类中放入线程执行器
                rsrcs.setThreadExecutor(threadExecutor);
                //线程执行器初始化,看了下默认的,就是一个空方法
                threadExecutor.initialize();
                //设置线程池
                rsrcs.setThreadPool(tp);
                if(tp instanceof SimpleThreadPool) {
                    if(threadsInheritInitalizersClassLoader)
                        ((SimpleThreadPool)tp).setThreadsInheritContextClassLoaderOfInitializingThread(threadsInheritInitalizersClassLoader);
                }
                tp.initialize();
                tpInited = true;
        
                rsrcs.setJobStore(js);
        
                // add plugins
                for (int i = 0; i < plugins.length; i++) {
                    rsrcs.addSchedulerPlugin(plugins[i]);
                }
        
                qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
                qsInited = true;
        
                // 关键点,创建出Scheduler,默认是StdScheduler
                Scheduler scheduler = instantiate(rsrcs, qs);
        
                // set job factory if specified
                if(jobFactory != null) {
                    qs.setJobFactory(jobFactory);
                }
        
                // Initialize plugins now that we have a Scheduler instance.
                for (int i = 0; i < plugins.length; i++) {
                    plugins[i].initialize(pluginNames[i], scheduler, loadHelper);
                }
        
                // add listeners
                for (int i = 0; i < jobListeners.length; i++) {
                    qs.getListenerManager().addJobListener(jobListeners[i], EverythingMatcher.allJobs());
                }
                for (int i = 0; i < triggerListeners.length; i++) {
                    qs.getListenerManager().addTriggerListener(triggerListeners[i], EverythingMatcher.allTriggers());
                }
        
                // set scheduler context data...
                for(Object key: schedCtxtProps.keySet()) {
                    String val = schedCtxtProps.getProperty((String) key);    
                    scheduler.getContext().put((String)key, val);
                }
        
                // 开始初始化JobStore
                js.setInstanceId(schedInstId);
                js.setInstanceName(schedName);
                js.setThreadPoolSize(tp.getPoolSize());
                js.initialize(loadHelper, qs.getSchedulerSignaler());
                //其实就是在jrsf注入scheduler变量
                jrsf.initialize(scheduler);
                //这里做了远程绑定,如果没有的话会直接跳过
                qs.initialize();
        
                getLog().info(
                        "Quartz scheduler '" + scheduler.getSchedulerName()
                                + "' initialized from " + propSrc);
        
                getLog().info("Quartz scheduler version: " + qs.getVersion());
        
                // prevents the repository from being garbage collected
                qs.addNoGCObject(schedRep);
                // prevents the db manager from being garbage collected
                if (dbMgr != null) {
                    qs.addNoGCObject(dbMgr);
                }
                //调用程序库SchedulerRepository中新添加scheduler
                schedRep.bind(scheduler);
                return scheduler;
            }
            catch(SchedulerException e) {
                shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
                throw e;
            }
            catch(RuntimeException re) {
                shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
                throw re;
            }
            catch(Error re) {
                shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
                throw re;
            }   
    

    这里做了什么呢?总结来说,就是各种绑定。分布来说的话就是:

    1. 创建一个JobRunShellFactory,这是JobRunShell类的工厂类,主要是为了一些listener的执行和异常处理
    2. 创建QuartzSchedulerResources,也就是QuartzScheduler的资源类,然后把各类资源都赋值进去
    3. 创建出关键的StdScheduler,这个和QuartzScheduler不同,StdScheduler持有QuartzScheduler
    4. SchedulerRepository添加新的StdScheduler
    5. 最后返回StdScheduler

    结束语

    本节已经结束了所有的instantiate初始化操作,是实话的话还有很多细节点没有深入进去,所以可能会造成读者部分会有点懵。但是在后面如果执行调用的时候,我们会继续来深入分析。

    相关文章

      网友评论

          本文标题:领略Quartz源码架构之美——源码实弹之Scheduler(五

          本文链接:https://www.haomeiwen.com/subject/ojczqqtx.html