美文网首页征服Spring源码Java学习笔记
定时任务管理系统(Quartz和Spring的整合)开源和源码简

定时任务管理系统(Quartz和Spring的整合)开源和源码简

作者: holly_wang_王小飞 | 来源:发表于2017-03-09 22:22 被阅读631次

    利用学习的时间这里写了个Spring和Quartz结合的一个web项目,纯后端的项目,restful接口
    实现对定时任务的增、删、改、查、停止, 启动、定时规则修改、立即执行等。github地址:holly-quartz-web,这里刚开始是为了学习源码,后来有了一些改动,再后来就想做一些业务上的改造,所以clone了一个quartz-core的项目进行改造,后期打算对其集群方式进行改造等等。github地址:quartz-core,有一起感兴趣的朋友可以一起改造,目前的项目比较简单可以作为学习入门的项目,也可以作为搭建job管理系统的初期项目,慢慢迭代。

    定时任务管理系统Quartz和Sping的整合开源和源码简述(二)中说到方法createScheduler时,其中有一段代码

    Scheduler newScheduler = schedulerFactory.getScheduler();
    

    没有深入去说,今天来深入了解下。
      其实这个方法涉及到整个scheduler的创建并涉及到核心类QuartzSchedulerThread。
    这里的schedulerFactory我们知道是默认的实现类StdSchedulerFactory 来看一下它的getScheduler方法

     public Scheduler getScheduler() throws SchedulerException {
            if (cfg == null) {
                initialize();
            }
    
            SchedulerRepository schedRep = SchedulerRepository.getInstance();
    
            Scheduler sched = schedRep.lookup(getSchedulerName());
    
            if (sched != null) {
                if (sched.isShutdown()) {
                    schedRep.remove(getSchedulerName());
                } else {
                    return sched;
                }
            }
    
            sched = instantiate();
    
            return sched;
        }
    

    重要的方法instantiate在这里亮相了

     private Scheduler instantiate() throws SchedulerException {
            if (cfg == null) {
                initialize();
            }
    
            if (initException != null) {
                throw initException;
            }
    
            JobStore js = null;
            ThreadPool tp = null;
            QuartzScheduler qs = null;
            DBConnectionManager dbMgr = null;
            String instanceIdGeneratorClass = null;
            Properties tProps = null;
            String userTXLocation = null;
            boolean wrapJobInTx = false;
            boolean autoId = false;
            long idleWaitTime = -1;
            long dbFailureRetry = 15000L; // 15 secs
            String classLoadHelperClass;
            String jobFactoryClass;
            ThreadExecutor threadExecutor;
    
    
            SchedulerRepository schedRep = SchedulerRepository.getInstance();
    
            // Get Scheduler Properties
            // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    
            String schedName = cfg.getStringProperty(PROP_SCHED_INSTANCE_NAME,
                    "QuartzScheduler");
    
            String threadName = cfg.getStringProperty(PROP_SCHED_THREAD_NAME,
                    schedName + "_QuartzSchedulerThread");
    
            String schedInstId = cfg.getStringProperty(PROP_SCHED_INSTANCE_ID,
                    DEFAULT_INSTANCE_ID);
    
            if (schedInstId.equals(AUTO_GENERATE_INSTANCE_ID)) {
                autoId = true;
                instanceIdGeneratorClass = cfg.getStringProperty(
                        PROP_SCHED_INSTANCE_ID_GENERATOR_CLASS,
                        "org.quartz.simpl.SimpleInstanceIdGenerator");
            }
            else if (schedInstId.equals(SYSTEM_PROPERTY_AS_INSTANCE_ID)) {
                autoId = true;
                instanceIdGeneratorClass = 
                        "org.quartz.simpl.SystemPropertyInstanceIdGenerator";
            }
    
            userTXLocation = cfg.getStringProperty(PROP_SCHED_USER_TX_URL,
                    userTXLocation);
            if (userTXLocation != null && userTXLocation.trim().length() == 0) {
                userTXLocation = null;
            }
    
            classLoadHelperClass = cfg.getStringProperty(
                    PROP_SCHED_CLASS_LOAD_HELPER_CLASS,
                    "org.quartz.simpl.CascadingClassLoadHelper");
            wrapJobInTx = cfg.getBooleanProperty(PROP_SCHED_WRAP_JOB_IN_USER_TX,
                    wrapJobInTx);
    
            jobFactoryClass = cfg.getStringProperty(
                    PROP_SCHED_JOB_FACTORY_CLASS, null);
    
            idleWaitTime = cfg.getLongProperty(PROP_SCHED_IDLE_WAIT_TIME,
                    idleWaitTime);
            if(idleWaitTime > -1 && idleWaitTime < 1000) {
                throw new SchedulerException("org.quartz.scheduler.idleWaitTime of less than 1000ms is not legal.");
            }
            
            dbFailureRetry = cfg.getLongProperty(PROP_SCHED_DB_FAILURE_RETRY_INTERVAL, dbFailureRetry);
            if (dbFailureRetry < 0) {
                throw new SchedulerException(PROP_SCHED_DB_FAILURE_RETRY_INTERVAL + " of less than 0 ms is not legal.");
            }
    
            boolean makeSchedulerThreadDaemon =
                cfg.getBooleanProperty(PROP_SCHED_MAKE_SCHEDULER_THREAD_DAEMON);
    
            boolean threadsInheritInitalizersClassLoader =
                cfg.getBooleanProperty(PROP_SCHED_SCHEDULER_THREADS_INHERIT_CONTEXT_CLASS_LOADER_OF_INITIALIZING_THREAD);
    
            long batchTimeWindow = cfg.getLongProperty(PROP_SCHED_BATCH_TIME_WINDOW, 0L);
            int maxBatchSize = cfg.getIntProperty(PROP_SCHED_MAX_BATCH_SIZE, 1);
    
            boolean interruptJobsOnShutdown = cfg.getBooleanProperty(PROP_SCHED_INTERRUPT_JOBS_ON_SHUTDOWN, false);
            boolean interruptJobsOnShutdownWithWait = cfg.getBooleanProperty(PROP_SCHED_INTERRUPT_JOBS_ON_SHUTDOWN_WITH_WAIT, false);
    
            boolean jmxExport = cfg.getBooleanProperty(PROP_SCHED_JMX_EXPORT);
            String jmxObjectName = cfg.getStringProperty(PROP_SCHED_JMX_OBJECT_NAME);
            
            boolean jmxProxy = cfg.getBooleanProperty(PROP_SCHED_JMX_PROXY);
            String jmxProxyClass = cfg.getStringProperty(PROP_SCHED_JMX_PROXY_CLASS);
    
            boolean rmiExport = cfg.getBooleanProperty(PROP_SCHED_RMI_EXPORT, false);
            boolean rmiProxy = cfg.getBooleanProperty(PROP_SCHED_RMI_PROXY, false);
            String rmiHost = cfg.getStringProperty(PROP_SCHED_RMI_HOST, "localhost");
            int rmiPort = cfg.getIntProperty(PROP_SCHED_RMI_PORT, 1099);
            int rmiServerPort = cfg.getIntProperty(PROP_SCHED_RMI_SERVER_PORT, -1);
            String rmiCreateRegistry = cfg.getStringProperty(
                    PROP_SCHED_RMI_CREATE_REGISTRY,
                    QuartzSchedulerResources.CREATE_REGISTRY_NEVER);
            String rmiBindName = cfg.getStringProperty(PROP_SCHED_RMI_BIND_NAME);
    
            if (jmxProxy && rmiProxy) {
                throw new SchedulerConfigException("Cannot proxy both RMI and JMX.");
            }
            
            boolean managementRESTServiceEnabled = cfg.getBooleanProperty(MANAGEMENT_REST_SERVICE_ENABLED, false);
            String managementRESTServiceHostAndPort = cfg.getStringProperty(MANAGEMENT_REST_SERVICE_HOST_PORT, "0.0.0.0:9889");
    
            Properties schedCtxtProps = cfg.getPropertyGroup(PROP_SCHED_CONTEXT_PREFIX, true);
    
            // If Proxying to remote scheduler, short-circuit here...
            // ~~~~~~~~~~~~~~~~~~
            if (rmiProxy) {
    
                if (autoId) {
                    schedInstId = DEFAULT_INSTANCE_ID;
                }
    
                String uid = (rmiBindName == null) ? QuartzSchedulerResources.getUniqueIdentifier(
                        schedName, schedInstId) : rmiBindName;
    
                RemoteScheduler remoteScheduler = new RemoteScheduler(uid, rmiHost, rmiPort);
    
                schedRep.bind(remoteScheduler);
    
                return remoteScheduler;
            }
    
    
            // Create class load helper
            ClassLoadHelper loadHelper = null;
            try {
                loadHelper = (ClassLoadHelper) loadClass(classLoadHelperClass)
                        .newInstance();
            } catch (Exception e) {
                throw new SchedulerConfigException(
                        "Unable to instantiate class load helper class: "
                                + e.getMessage(), e);
            }
            loadHelper.initialize();
    
            // If Proxying to remote JMX scheduler, short-circuit here...
            // ~~~~~~~~~~~~~~~~~~
            if (jmxProxy) {
                if (autoId) {
                    schedInstId = DEFAULT_INSTANCE_ID;
                }
    
                if (jmxProxyClass == null) {
                    throw new SchedulerConfigException("No JMX Proxy Scheduler class provided");
                }
    
                RemoteMBeanScheduler jmxScheduler = null;
                try {
                    jmxScheduler = (RemoteMBeanScheduler)loadHelper.loadClass(jmxProxyClass)
                            .newInstance();
                } catch (Exception e) {
                    throw new SchedulerConfigException(
                            "Unable to instantiate RemoteMBeanScheduler class.", e);
                }
    
                if (jmxObjectName == null) {
                    jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId);
                }
    
                jmxScheduler.setSchedulerObjectName(jmxObjectName);
    
                tProps = cfg.getPropertyGroup(PROP_SCHED_JMX_PROXY, true);
                try {
                    setBeanProps(jmxScheduler, tProps);
                } catch (Exception e) {
                    initException = new SchedulerException("RemoteMBeanScheduler class '"
                            + jmxProxyClass + "' props could not be configured.", e);
                    throw initException;
                }
    
                jmxScheduler.initialize();
    
                schedRep.bind(jmxScheduler);
    
                return jmxScheduler;
            }
    
            
            JobFactory jobFactory = null;
            if(jobFactoryClass != null) {
                try {
                    jobFactory = (JobFactory) loadHelper.loadClass(jobFactoryClass)
                            .newInstance();
                } catch (Exception e) {
                    throw new SchedulerConfigException(
                            "Unable to instantiate JobFactory class: "
                                    + e.getMessage(), e);
                }
    
                tProps = cfg.getPropertyGroup(PROP_SCHED_JOB_FACTORY_PREFIX, true);
                try {
                    setBeanProps(jobFactory, tProps);
                } catch (Exception e) {
                    initException = new SchedulerException("JobFactory class '"
                            + jobFactoryClass + "' props could not be configured.", e);
                    throw initException;
                }
            }
    
            InstanceIdGenerator instanceIdGenerator = null;
            if(instanceIdGeneratorClass != null) {
                try {
                    instanceIdGenerator = (InstanceIdGenerator) loadHelper.loadClass(instanceIdGeneratorClass)
                        .newInstance();
                } catch (Exception e) {
                    throw new SchedulerConfigException(
                            "Unable to instantiate InstanceIdGenerator class: "
                            + e.getMessage(), e);
                }
    
                tProps = cfg.getPropertyGroup(PROP_SCHED_INSTANCE_ID_GENERATOR_PREFIX, true);
                try {
                    setBeanProps(instanceIdGenerator, tProps);
                } catch (Exception e) {
                    initException = new SchedulerException("InstanceIdGenerator class '"
                            + instanceIdGeneratorClass + "' props could not be configured.", e);
                    throw initException;
                }
            }
    
            // Get ThreadPool Properties
            // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    
            String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());
    
            if (tpClass == null) {
                initException = new SchedulerException(
                        "ThreadPool class not specified. ");
                throw initException;
            }
    
            try {
                tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
            } catch (Exception e) {
                initException = new SchedulerException("ThreadPool class '"
                        + tpClass + "' could not be instantiated.", e);
                throw initException;
            }
            tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true);
            try {
                setBeanProps(tp, tProps);
            } catch (Exception e) {
                initException = new SchedulerException("ThreadPool class '"
                        + tpClass + "' props could not be configured.", e);
                throw initException;
            }
    
            // Get JobStore Properties
            // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    
            String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS,
                    RAMJobStore.class.getName());
    
            if (jsClass == null) {
                initException = new SchedulerException(
                        "JobStore class not specified. ");
                throw initException;
            }
    
            try {
                js = (JobStore) loadHelper.loadClass(jsClass).newInstance();
            } catch (Exception e) {
                initException = new SchedulerException("JobStore class '" + jsClass
                        + "' could not be instantiated.", e);
                throw initException;
            }
    
            SchedulerDetailsSetter.setDetails(js, schedName, schedInstId);
    
            tProps = cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX, true, new String[] {PROP_JOB_STORE_LOCK_HANDLER_PREFIX});
            try {
                setBeanProps(js, tProps);
            } catch (Exception e) {
                initException = new SchedulerException("JobStore class '" + jsClass
                        + "' props could not be configured.", e);
                throw initException;
            }
    
            if (js instanceof JobStoreSupport) {
                // Install custom lock handler (Semaphore)
                String lockHandlerClass = cfg.getStringProperty(PROP_JOB_STORE_LOCK_HANDLER_CLASS);
                if (lockHandlerClass != null) {
                    try {
                        Semaphore lockHandler = (Semaphore)loadHelper.loadClass(lockHandlerClass).newInstance();
    
                        tProps = cfg.getPropertyGroup(PROP_JOB_STORE_LOCK_HANDLER_PREFIX, true);
    
                        // If this lock handler requires the table prefix, add it to its properties.
                        if (lockHandler instanceof TablePrefixAware) {
                            tProps.setProperty(
                                    PROP_TABLE_PREFIX, ((JobStoreSupport)js).getTablePrefix());
                            tProps.setProperty(
                                    PROP_SCHED_NAME, schedName);
                        }
    
                        try {
                            setBeanProps(lockHandler, tProps);
                        } catch (Exception e) {
                            initException = new SchedulerException("JobStore LockHandler class '" + lockHandlerClass
                                    + "' props could not be configured.", e);
                            throw initException;
                        }
    
                        ((JobStoreSupport)js).setLockHandler(lockHandler);
                        getLog().info("Using custom data access locking (synchronization): " + lockHandlerClass);
                    } catch (Exception e) {
                        initException = new SchedulerException("JobStore LockHandler class '" + lockHandlerClass
                                + "' could not be instantiated.", e);
                        throw initException;
                    }
                }
            }
    
            // Set up any DataSources
            // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    
            String[] dsNames = cfg.getPropertyGroups(PROP_DATASOURCE_PREFIX);
            for (int i = 0; i < dsNames.length; i++) {
                PropertiesParser pp = new PropertiesParser(cfg.getPropertyGroup(
                        PROP_DATASOURCE_PREFIX + "." + dsNames[i], true));
    
                String cpClass = pp.getStringProperty(PROP_CONNECTION_PROVIDER_CLASS, null);
    
                // custom connectionProvider...
                if(cpClass != null) {
                    ConnectionProvider cp = null;
                    try {
                        cp = (ConnectionProvider) loadHelper.loadClass(cpClass).newInstance();
                    } catch (Exception e) {
                        initException = new SchedulerException("ConnectionProvider class '" + cpClass
                                + "' could not be instantiated.", e);
                        throw initException;
                    }
    
                    try {
                        // remove the class name, so it isn't attempted to be set
                        pp.getUnderlyingProperties().remove(
                                PROP_CONNECTION_PROVIDER_CLASS);
    
                        if (cp instanceof PoolingConnectionProvider) {
                            populateProviderWithExtraProps((PoolingConnectionProvider)cp, pp.getUnderlyingProperties());
                        } else {
                            setBeanProps(cp, pp.getUnderlyingProperties());
                        }
                        cp.initialize();
                    } catch (Exception e) {
                        initException = new SchedulerException("ConnectionProvider class '" + cpClass
                                + "' props could not be configured.", e);
                        throw initException;
                    }
    
                    dbMgr = DBConnectionManager.getInstance();
                    dbMgr.addConnectionProvider(dsNames[i], cp);
                } else {
                    String dsJndi = pp.getStringProperty(PROP_DATASOURCE_JNDI_URL, null);
    
                    if (dsJndi != null) {
                        boolean dsAlwaysLookup = pp.getBooleanProperty(
                                PROP_DATASOURCE_JNDI_ALWAYS_LOOKUP);
                        String dsJndiInitial = pp.getStringProperty(
                                PROP_DATASOURCE_JNDI_INITIAL);
                        String dsJndiProvider = pp.getStringProperty(
                                PROP_DATASOURCE_JNDI_PROVDER);
                        String dsJndiPrincipal = pp.getStringProperty(
                                PROP_DATASOURCE_JNDI_PRINCIPAL);
                        String dsJndiCredentials = pp.getStringProperty(
                                PROP_DATASOURCE_JNDI_CREDENTIALS);
                        Properties props = null;
                        if (null != dsJndiInitial || null != dsJndiProvider
                                || null != dsJndiPrincipal || null != dsJndiCredentials) {
                            props = new Properties();
                            if (dsJndiInitial != null) {
                                props.put(PROP_DATASOURCE_JNDI_INITIAL,
                                        dsJndiInitial);
                            }
                            if (dsJndiProvider != null) {
                                props.put(PROP_DATASOURCE_JNDI_PROVDER,
                                        dsJndiProvider);
                            }
                            if (dsJndiPrincipal != null) {
                                props.put(PROP_DATASOURCE_JNDI_PRINCIPAL,
                                        dsJndiPrincipal);
                            }
                            if (dsJndiCredentials != null) {
                                props.put(PROP_DATASOURCE_JNDI_CREDENTIALS,
                                        dsJndiCredentials);
                            }
                        }
                        JNDIConnectionProvider cp = new JNDIConnectionProvider(dsJndi,
                                props, dsAlwaysLookup);
                        dbMgr = DBConnectionManager.getInstance();
                        dbMgr.addConnectionProvider(dsNames[i], cp);
                    } else {
                        String dsDriver = pp.getStringProperty(PoolingConnectionProvider.DB_DRIVER);
                        String dsURL = pp.getStringProperty(PoolingConnectionProvider.DB_URL);
    
                        if (dsDriver == null) {
                            initException = new SchedulerException(
                                    "Driver not specified for DataSource: "
                                            + dsNames[i]);
                            throw initException;
                        }
                        if (dsURL == null) {
                            initException = new SchedulerException(
                                    "DB URL not specified for DataSource: "
                                            + dsNames[i]);
                            throw initException;
                        }
                        try {
                            PoolingConnectionProvider cp = new PoolingConnectionProvider(pp.getUnderlyingProperties());
                            dbMgr = DBConnectionManager.getInstance();
                            dbMgr.addConnectionProvider(dsNames[i], cp);
    
                            // Populate the underlying C3P0 data source pool properties
                            populateProviderWithExtraProps(cp, pp.getUnderlyingProperties());
                        } catch (Exception sqle) {
                            initException = new SchedulerException(
                                    "Could not initialize DataSource: " + dsNames[i],
                                    sqle);
                            throw initException;
                        }
                    }
    
                }
    
            }
    
            // Set up any SchedulerPlugins
            // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    
            String[] pluginNames = cfg.getPropertyGroups(PROP_PLUGIN_PREFIX);
            SchedulerPlugin[] plugins = new SchedulerPlugin[pluginNames.length];
            for (int i = 0; i < pluginNames.length; i++) {
                Properties pp = cfg.getPropertyGroup(PROP_PLUGIN_PREFIX + "."
                        + pluginNames[i], true);
    
                String plugInClass = pp.getProperty(PROP_PLUGIN_CLASS, null);
    
                if (plugInClass == null) {
                    initException = new SchedulerException(
                            "SchedulerPlugin class not specified for plugin '"
                                    + pluginNames[i] + "'");
                    throw initException;
                }
                SchedulerPlugin plugin = null;
                try {
                    plugin = (SchedulerPlugin)
                            loadHelper.loadClass(plugInClass).newInstance();
                } catch (Exception e) {
                    initException = new SchedulerException(
                            "SchedulerPlugin class '" + plugInClass
                                    + "' could not be instantiated.", e);
                    throw initException;
                }
                try {
                    setBeanProps(plugin, pp);
                } catch (Exception e) {
                    initException = new SchedulerException(
                            "JobStore SchedulerPlugin '" + plugInClass
                                    + "' props could not be configured.", e);
                    throw initException;
                }
    
                plugins[i] = plugin;
            }
    
            // Set up any JobListeners
            // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    
            Class<?>[] strArg = new Class[] { String.class };
            String[] jobListenerNames = cfg.getPropertyGroups(PROP_JOB_LISTENER_PREFIX);
            JobListener[] jobListeners = new JobListener[jobListenerNames.length];
            for (int i = 0; i < jobListenerNames.length; i++) {
                Properties lp = cfg.getPropertyGroup(PROP_JOB_LISTENER_PREFIX + "."
                        + jobListenerNames[i], true);
    
                String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null);
    
                if (listenerClass == null) {
                    initException = new SchedulerException(
                            "JobListener class not specified for listener '"
                                    + jobListenerNames[i] + "'");
                    throw initException;
                }
                JobListener listener = null;
                try {
                    listener = (JobListener)
                           loadHelper.loadClass(listenerClass).newInstance();
                } catch (Exception e) {
                    initException = new SchedulerException(
                            "JobListener class '" + listenerClass
                                    + "' could not be instantiated.", e);
                    throw initException;
                }
                try {
                    Method nameSetter = null;
                    try { 
                        nameSetter = listener.getClass().getMethod("setName", strArg);
                    }
                    catch(NoSuchMethodException ignore) { 
                        /* do nothing */ 
                    }
                    if(nameSetter != null) {
                        nameSetter.invoke(listener, new Object[] {jobListenerNames[i] } );
                    }
                    setBeanProps(listener, lp);
                } catch (Exception e) {
                    initException = new SchedulerException(
                            "JobListener '" + listenerClass
                                    + "' props could not be configured.", e);
                    throw initException;
                }
                jobListeners[i] = listener;
            }
    
            // Set up any TriggerListeners
            // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    
            String[] triggerListenerNames = cfg.getPropertyGroups(PROP_TRIGGER_LISTENER_PREFIX);
            TriggerListener[] triggerListeners = new TriggerListener[triggerListenerNames.length];
            for (int i = 0; i < triggerListenerNames.length; i++) {
                Properties lp = cfg.getPropertyGroup(PROP_TRIGGER_LISTENER_PREFIX + "."
                        + triggerListenerNames[i], true);
    
                String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null);
    
                if (listenerClass == null) {
                    initException = new SchedulerException(
                            "TriggerListener class not specified for listener '"
                                    + triggerListenerNames[i] + "'");
                    throw initException;
                }
                TriggerListener listener = null;
                try {
                    listener = (TriggerListener)
                           loadHelper.loadClass(listenerClass).newInstance();
                } catch (Exception e) {
                    initException = new SchedulerException(
                            "TriggerListener class '" + listenerClass
                                    + "' could not be instantiated.", e);
                    throw initException;
                }
                try {
                    Method nameSetter = null;
                    try { 
                        nameSetter = listener.getClass().getMethod("setName", strArg);
                    }
                    catch(NoSuchMethodException ignore) { /* do nothing */ }
                    if(nameSetter != null) {
                        nameSetter.invoke(listener, new Object[] {triggerListenerNames[i] } );
                    }
                    setBeanProps(listener, lp);
                } catch (Exception e) {
                    initException = new SchedulerException(
                            "TriggerListener '" + listenerClass
                                    + "' props could not be configured.", e);
                    throw initException;
                }
                triggerListeners[i] = listener;
            }
    
            boolean tpInited = false;
            boolean qsInited = false;
    
    
            // Get ThreadExecutor Properties
            // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    
            String threadExecutorClass = cfg.getStringProperty(PROP_THREAD_EXECUTOR_CLASS);
            if (threadExecutorClass != null) {
                tProps = cfg.getPropertyGroup(PROP_THREAD_EXECUTOR, true);
                try {
                    threadExecutor = (ThreadExecutor) loadHelper.loadClass(threadExecutorClass).newInstance();
                    log.info("Using custom implementation for ThreadExecutor: " + threadExecutorClass);
    
                    setBeanProps(threadExecutor, tProps);
                } catch (Exception e) {
                    initException = new SchedulerException(
                            "ThreadExecutor class '" + threadExecutorClass + "' could not be instantiated.", e);
                    throw initException;
                }
            } else {
                log.info("Using default implementation for ThreadExecutor");
                threadExecutor = new DefaultThreadExecutor();
            }
    
    
    
            // Fire everything up
            // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
            try {
                    
        
                JobRunShellFactory jrsf = null; // Create correct run-shell factory...
        
                if (userTXLocation != null) {
                    UserTransactionHelper.setUserTxLocation(userTXLocation);
                }
        
                if (wrapJobInTx) {
                    jrsf = new JTAJobRunShellFactory();
                } else {
                    jrsf = new JTAAnnotationAwareJobRunShellFactory();
                }
        
                if (autoId) {
                    try {
                      schedInstId = DEFAULT_INSTANCE_ID;
                      if (js.isClustered()) {
                          schedInstId = instanceIdGenerator.generateInstanceId();
                      }
                    } catch (Exception e) {
                        getLog().error("Couldn't generate instance Id!", e);
                        throw new IllegalStateException("Cannot run without an instance id.");
                    }
                }
    
                if (js.getClass().getName().startsWith("org.terracotta.quartz")) {
                    try {
                        String uuid = (String) js.getClass().getMethod("getUUID").invoke(js);
                        if(schedInstId.equals(DEFAULT_INSTANCE_ID)) {
                            schedInstId = "TERRACOTTA_CLUSTERED,node=" + uuid;
                            if (jmxObjectName == null) {
                                jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId);
                            }
                        } else if(jmxObjectName == null) {
                            jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId + ",node=" + uuid);
                        }
                    } catch(Exception e) {
                        throw new RuntimeException("Problem obtaining node id from TerracottaJobStore.", e);
                    }
    
                    if(null == cfg.getStringProperty(PROP_SCHED_JMX_EXPORT)) {
                        jmxExport = true;
                    }
                }
                
                if (js instanceof JobStoreSupport) {
                    JobStoreSupport jjs = (JobStoreSupport)js;
                    jjs.setDbRetryInterval(dbFailureRetry);
                    if(threadsInheritInitalizersClassLoader)
                        jjs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
                    
                    jjs.setThreadExecutor(threadExecutor);
                }
        
                QuartzSchedulerResources rsrcs = new QuartzSchedulerResources();
                rsrcs.setName(schedName);
                rsrcs.setThreadName(threadName);
                rsrcs.setInstanceId(schedInstId);
                rsrcs.setJobRunShellFactory(jrsf);
                rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon);
                rsrcs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
                rsrcs.setBatchTimeWindow(batchTimeWindow);
                rsrcs.setMaxBatchSize(maxBatchSize);
                rsrcs.setInterruptJobsOnShutdown(interruptJobsOnShutdown);
                rsrcs.setInterruptJobsOnShutdownWithWait(interruptJobsOnShutdownWithWait);
                rsrcs.setJMXExport(jmxExport);
                rsrcs.setJMXObjectName(jmxObjectName);
    
                if (managementRESTServiceEnabled) {
                    ManagementRESTServiceConfiguration managementRESTServiceConfiguration = new ManagementRESTServiceConfiguration();
                    managementRESTServiceConfiguration.setBind(managementRESTServiceHostAndPort);
                    managementRESTServiceConfiguration.setEnabled(managementRESTServiceEnabled);
                    rsrcs.setManagementRESTServiceConfiguration(managementRESTServiceConfiguration);
                }
        
                if (rmiExport) {
                    rsrcs.setRMIRegistryHost(rmiHost);
                    rsrcs.setRMIRegistryPort(rmiPort);
                    rsrcs.setRMIServerPort(rmiServerPort);
                    rsrcs.setRMICreateRegistryStrategy(rmiCreateRegistry);
                    rsrcs.setRMIBindName(rmiBindName);
                }
        
                SchedulerDetailsSetter.setDetails(tp, schedName, schedInstId);
    
                rsrcs.setThreadExecutor(threadExecutor);
                threadExecutor.initialize();
    
                rsrcs.setThreadPool(tp);
                if(tp instanceof SimpleThreadPool) {
                    if(threadsInheritInitalizersClassLoader)
                        ((SimpleThreadPool)tp).setThreadsInheritContextClassLoaderOfInitializingThread(threadsInheritInitalizersClassLoader);
                }
                tp.initialize();
                tpInited = true;
        
                rsrcs.setJobStore(js);
        
                // add plugins
                for (int i = 0; i < plugins.length; i++) {
                    rsrcs.addSchedulerPlugin(plugins[i]);
                }
        
                qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
                qsInited = true;
        
                // Create Scheduler ref...
                Scheduler scheduler = instantiate(rsrcs, qs);
        
                // set job factory if specified
                if(jobFactory != null) {
                    qs.setJobFactory(jobFactory);
                }
        
                // Initialize plugins now that we have a Scheduler instance.
                for (int i = 0; i < plugins.length; i++) {
                    plugins[i].initialize(pluginNames[i], scheduler, loadHelper);
                }
        
                // add listeners
                for (int i = 0; i < jobListeners.length; i++) {
                    qs.getListenerManager().addJobListener(jobListeners[i], EverythingMatcher.allJobs());
                }
                for (int i = 0; i < triggerListeners.length; i++) {
                    qs.getListenerManager().addTriggerListener(triggerListeners[i], EverythingMatcher.allTriggers());
                }
        
                // set scheduler context data...
                for(Object key: schedCtxtProps.keySet()) {
                    String val = schedCtxtProps.getProperty((String) key);    
                    scheduler.getContext().put((String)key, val);
                }
        
                // fire up job store, and runshell factory
        
                js.setInstanceId(schedInstId);
                js.setInstanceName(schedName);
                js.setThreadPoolSize(tp.getPoolSize());
                js.initialize(loadHelper, qs.getSchedulerSignaler());
    
                jrsf.initialize(scheduler);
                
                qs.initialize();
        
                getLog().info(
                        "Quartz scheduler '" + scheduler.getSchedulerName()
                                + "' initialized from " + propSrc);
        
                getLog().info("Quartz scheduler version: " + qs.getVersion());
        
                // prevents the repository from being garbage collected
                qs.addNoGCObject(schedRep);
                // prevents the db manager from being garbage collected
                if (dbMgr != null) {
                    qs.addNoGCObject(dbMgr);
                }
        
                schedRep.bind(scheduler);
                return scheduler;
            }
            catch(SchedulerException e) {
                shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
                throw e;
            }
            catch(RuntimeException re) {
                shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
                throw re;
            }
            catch(Error re) {
                shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
                throw re;
            }
        }
    
    

    其长度就可知所做的事情之多及其重要,这里大概说一下,首先是配置的加载和没有配置选项的时候默认值的设置。接着判断是不是remote scheduler方式即rmi方式,如果是在这里即切断( // If Proxying to remote scheduler, short-circuit here...)返回对应的RemoteScheduler。
    我们这里显然不是rmi方式。接着往下走 // Create class load helper接着判断是不是jmxschedule(// If Proxying to remote JMX scheduler, short-circuit here...)显然不是继续往下走,接着是InstanceIdGenerator的实例化,这里的实现类是SimpleInstanceIdGenerator类。看到其方法是

    public String generateInstanceId() throws SchedulerException {
            try {
                return InetAddress.getLocalHost().getHostName() + System.currentTimeMillis();
            } catch (Exception e) {
                throw new SchedulerException("Couldn't get host name!", e);
            }
        }
    

    起初这个方法我没太注意,后看到数据库的数据得时候,发现名字和我的机器名有关才注意到的。我的数据库里面是这样的数据,感觉还很亲切

    SimpleInstanceIdGenerator的generateInstanceId方法生成的值

    接着是Get ThreadPool Properties。获取配置中的线程池的属性。上一个模块部分和这个模块部分都出现了一个方法setBeanProps方法

     private void setBeanProps(Object obj, Properties props)
            throws NoSuchMethodException, IllegalAccessException,
                java.lang.reflect.InvocationTargetException,
                IntrospectionException, SchedulerConfigException {
            props.remove("class");
    
            BeanInfo bi = Introspector.getBeanInfo(obj.getClass());
            PropertyDescriptor[] propDescs = bi.getPropertyDescriptors();
            PropertiesParser pp = new PropertiesParser(props);
    
            java.util.Enumeration<Object> keys = props.keys();
            while (keys.hasMoreElements()) {
                String name = (String) keys.nextElement();
                String c = name.substring(0, 1).toUpperCase(Locale.US);
                String methName = "set" + c + name.substring(1);
    
                java.lang.reflect.Method setMeth = getSetMethod(methName, propDescs);
    
                try {
                    if (setMeth == null) {
                        throw new NoSuchMethodException(
                                "No setter for property '" + name + "'");
                    }
    
                    Class<?>[] params = setMeth.getParameterTypes();
                    if (params.length != 1) {
                        throw new NoSuchMethodException(
                            "No 1-argument setter for property '" + name + "'");
                    }
                    
                    // does the property value reference another property's value? If so, swap to look at its value
                    PropertiesParser refProps = pp;
                    String refName = pp.getStringProperty(name);
                    if(refName != null && refName.startsWith("$@")) {
                        refName =  refName.substring(2);
                        refProps = cfg;
                    }
                    else
                        refName = name;
                    
                    if (params[0].equals(int.class)) {
                        setMeth.invoke(obj, new Object[]{Integer.valueOf(refProps.getIntProperty(refName))});
                    } else if (params[0].equals(long.class)) {
                        setMeth.invoke(obj, new Object[]{Long.valueOf(refProps.getLongProperty(refName))});
                    } else if (params[0].equals(float.class)) {
                        setMeth.invoke(obj, new Object[]{Float.valueOf(refProps.getFloatProperty(refName))});
                    } else if (params[0].equals(double.class)) {
                        setMeth.invoke(obj, new Object[]{Double.valueOf(refProps.getDoubleProperty(refName))});
                    } else if (params[0].equals(boolean.class)) {
                        setMeth.invoke(obj, new Object[]{Boolean.valueOf(refProps.getBooleanProperty(refName))});
                    } else if (params[0].equals(String.class)) {
                        setMeth.invoke(obj, new Object[]{refProps.getStringProperty(refName)});
                    } else {
                        throw new NoSuchMethodException(
                                "No primitive-type setter for property '" + name
                                        + "'");
                    }
                } catch (NumberFormatException nfe) {
                    throw new SchedulerConfigException("Could not parse property '"
                            + name + "' into correct data type: " + nfe.toString());
                }
            }
        }
    

    其实就是反射机制实例化了bean 并将配置的属性组的值set到实例化的bean中。这里相当于结合xml的配置实例化了org.quartz.threadPool这个线程池接口的实现类org.quartz.simpl.SimpleThreadPool。
    接着是Get JobStore Properties模块实例化了org.quartz.impl.jdbcjobstore.JobStoreTX,并借助SchedulerDetailsSetter类相当于调用了setInstanceName和setInstanceId两个方法。接着同样的调用方法setBeanProps进行属性组的属性注入。记着判断是不是JobStoreSupport(JDBC-based JobStore implementations)类或者实现类,这里就是选用的jdbc的模式存贮job信息等,所以会设置table的前缀现象等并初始化了Semaphore类的实现类StdRowLockSemaphore,在这里是信号的意思,正式通过这个类结合数据库实现了分布式的Quzrtz(这个后续继续分析一下怎么实现的分布式job)。接着是Set up any DataSources,很明显是数据源的初始化这个比较简单。接着是 Set up any SchedulerPlugins,quartz有plugin的设计模式。配置文件中如果有plugin 会挨个初始化一遍的。接着是 Set up any JobListeners,接注册接听方式,实例化jobListeners。接着是Set up any TriggerListeners ,trigger的listener的实例化。有线程池就需要执行者。接着就是 Get ThreadExecutor Properties。ThreadExecutor 的实例化。现在到了最后一步 // Fire everything up。把所有的东西组装起来。组装成scheduler。这个初始化话真的是有好多东西。所以现在回过头来想为什么要用FactoryBean方式初始化了。
    在组装的时候比较重要的一个代码

     qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
    

    点进去查看

     public QuartzScheduler(QuartzSchedulerResources resources,
                long idleWaitTime, @Deprecated long dbRetryInterval)
                throws SchedulerException {
            this.resources = resources;
            if (resources.getJobStore() instanceof JobListener) {
                addInternalJobListener((JobListener) resources.getJobStore());
            }
    
            this.schedThread = new QuartzSchedulerThread(this, resources);
            ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
            schedThreadExecutor.execute(this.schedThread);
            if (idleWaitTime > 0) {
                this.schedThread.setIdleWaitTime(idleWaitTime);
            }
    
            jobMgr = new ExecutingJobsManager();
            addInternalJobListener(jobMgr);
            errLogger = new ErrorLogger();
            addInternalSchedulerListener(errLogger);
    
            signaler = new SchedulerSignalerImpl(this, this.schedThread);
    
            getLog().info("Quartz Scheduler v." + getVersion() + " created.");
        }
    

    这里创建了类QuartzSchedulerThread,这个类也是十分重要的。承担着很重大的责任,并在此时已经execute了。说明此事已经启动了。整个Scheduler类算是组装起来了涉及到细节。quartz运行时由QuartzSchedulerThread类作为主体,循环执行调度流程。JobStore作为中间层,按照quartz的并发策略执行数据库操作,完成主要的调度逻辑。JobRunShellFactory负责实例化JobDetail对象,将其放入线程池运行。LockHandler负责获取LOCKS表中的数据库锁。下一节将进入QuartzSchedulerThread类的讲解,说明quartz分布式框架的实现。

    相关文章

      网友评论

        本文标题:定时任务管理系统(Quartz和Spring的整合)开源和源码简

        本文链接:https://www.haomeiwen.com/subject/hheigttx.html