美文网首页
Quartz 源码解析(四) —— QuartzSchedule

Quartz 源码解析(四) —— QuartzSchedule

作者: icameisaw | 来源:发表于2018-09-03 22:43 被阅读352次

    大概内容

    • scheduler.scheduleJob(jobDetail, trigger)
    • scheduler.start()

    scheduler.scheduleJob()

    Scheduler使用

    StdScheduler

    StdScheduler的方法基本上都代理给QuartzScheduler类来处理。

    public class StdScheduler implements Scheduler {
        private QuartzScheduler sched;
    
        public StdScheduler(QuartzScheduler sched) {
            this.sched = sched;
        }
    
        public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
            throws SchedulerException {
            return sched.scheduleJob(jobDetail, trigger);
        }
    
        public void start() throws SchedulerException {
            sched.start();
        }
    
        /**
         * 只有这个方法没有委托给QuartzScheduler
         * 除了getClass(),其他方法在QuartzScheduler都可以拿到
         */
        public SchedulerMetaData getMetaData() {
            return new SchedulerMetaData(getSchedulerName(),
                    getSchedulerInstanceId(), getClass(), false, isStarted(),
                    isInStandbyMode(), isShutdown(), sched.runningSince(),
                    sched.numJobsExecuted(), sched.getJobStoreClass(),
                    sched.supportsPersistence(), sched.isClustered(), sched.getThreadPoolClass(),
                    sched.getThreadPoolSize(), sched.getVersion());
        }
    
        // 其他代码
    
    }
    

    QuartzScheduler

    Quartz的小心脏,org.quartz.Scheduler接口的间接实现。

    public class QuartzScheduler implements RemotableQuartzScheduler {
    
        // QuartzSchedulerResources对象是通过构造器放进去的
        public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
            throws SchedulerException {
            this.resources = resources;
            if (resources.getJobStore() instanceof JobListener) {
                addInternalJobListener((JobListener)resources.getJobStore());
            }
    
            this.schedThread = new QuartzSchedulerThread(this, resources);
            ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
            schedThreadExecutor.execute(this.schedThread);
            if (idleWaitTime > 0) {
                this.schedThread.setIdleWaitTime(idleWaitTime);
            }
    
            jobMgr = new ExecutingJobsManager();
            addInternalJobListener(jobMgr);
            errLogger = new ErrorLogger();
            addInternalSchedulerListener(errLogger);
    
            signaler = new SchedulerSignalerImpl(this, this.schedThread);
    
            getLog().info("Quartz Scheduler v." + getVersion() + " created.");
        }
    
        public Date scheduleJob(JobDetail jobDetail, Trigger trigger) throws SchedulerException {
            validateState();
    
            if (jobDetail == null) {
                throw new SchedulerException("JobDetail cannot be null");
            }
            if (trigger == null) {
                throw new SchedulerException("Trigger cannot be null");
            }
            if (jobDetail.getKey() == null) {
                throw new SchedulerException("Job's key cannot be null");
            }
            if (jobDetail.getJobClass() == null) {
                throw new SchedulerException("Job's class cannot be null");
            }
            // TriggerBuilder.build()会生成一个OperableTrigger实例。
            OperableTrigger trig = (OperableTrigger)trigger;
    
            if (trigger.getJobKey() == null) {
                trig.setJobKey(jobDetail.getKey());
            } else if (!trigger.getJobKey().equals(jobDetail.getKey())) {
                throw new SchedulerException(
                    "Trigger does not reference given job!");
            }
    
            trig.validate();
    
            Calendar cal = null;
            if (trigger.getCalendarName() != null) {
                cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
            }
            // TODO: 解析各种类型的Trigger
            Date ft = trig.computeFirstFireTime(cal);
    
            if (ft == null) {
                throw new SchedulerException(
                        "Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");
            }
                // 关键代码就是下面这一行
            resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
            notifySchedulerListenersJobAdded(jobDetail);
            notifySchedulerThread(trigger.getNextFireTime().getTime());
            notifySchedulerListenersSchduled(trigger);
    
            return ft;
        }
    
        // 其他代码
    
    }
    

    RAMJobStore

    介绍一下RAMJobStore的属性

    • HashMap对象:也是通过空间来换取查询时间的策略,把JobDetail和Trigger的信息放进这些HashMap对象中,方便程序可以根据key或者group来匹配相关的JobDetail和Trigger。
    • TreeSet<TriggerWrapper> timeTriggers:利用TreeSet排重和有序的特性,timeTriggers.first()方法总能返回最先要处理的Trigger。
    class RAMJobStore {
        #HashMap<JobKey,JobWrapper> jobsByKey
        #HashMap<TriggerKey,TriggerWrapper> triggersByKey
        #HashMap<String,HashMap<JobKey,JobWrapper>> jobsByGroup
        #HashMap<String,HashMap<TriggerKey,TriggerWrapper>> triggersByGroup
        #TreeSet<TriggerWrapper> timeTriggers
        #HashMap<String,Calendar> calendarsByName
        #Map<JobKey,List<TriggerWrapper>> triggersByJob
        #Object lock
        #HashSet<String> pausedTriggerGroups
        #HashSet<String> pausedJobGroups
        #HashSet<JobKey> blockedJobs
        #long misfireThreshold
        #SchedulerSignaler signaler
        -Logger log
        -{static}AtomicLong ftrCtr
    }
    

    下面是RAMJobStore.storeJob()代码解析,storeTrigger()方法的逻辑类似。

    /**
     * 内部类JobWrapper是一个包括jobKey和jobDetail的类。
     * 克隆一个新的JobDetail来创建一个JobWrapper,然后维护到jobsByKey和jobsByGroup属性中。
     * 维护HashMap系列对象的时候,通过lock的synchronized代码块来做线程同步
     */
    public void storeJob(JobDetail newJob,boolean replaceExisting) throws ObjectAlreadyExistsException {
        JobWrapper jw = new JobWrapper((JobDetail)newJob.clone());
        boolean repl = false;
    
        synchronized (lock) {
            if (jobsByKey.get(jw.key) != null) {
                if (!replaceExisting) {
                    throw new ObjectAlreadyExistsException(newJob);
                }
                repl = true;
            }
    
            if (!repl) {
                // get job group
                HashMap<JobKey, JobWrapper> grpMap = jobsByGroup.get(newJob.getKey().getGroup());
                if (grpMap == null) {
                    grpMap = new HashMap<JobKey, JobWrapper>(100);
                    jobsByGroup.put(newJob.getKey().getGroup(), grpMap);
                }
                // add to jobs by group
                grpMap.put(newJob.getKey(), jw);
                // add to jobs by FQN map
                jobsByKey.put(jw.key, jw);
            } else {
                // update job detail
                JobWrapper orig = jobsByKey.get(jw.key);
                orig.jobDetail = jw.jobDetail; // already cloned
            }
        }
    }
    

    scheduler.start()

    QuartzScheduler

    案例用的是RAMJobStore,其中的schedulerStarted()和schedulerResumed()是空方法,没有代码立即。对于JobStoreSupport,这两个方法是有很多逻辑的,后面的篇章再做解析。

    public class QuartzScheduler implements RemotableQuartzScheduler {
    
        public void start() throws SchedulerException {
            if (shuttingDown|| closed) {
                throw new SchedulerException(
                        "The Scheduler cannot be restarted after shutdown() has been called.");
            }
    
            // QTZ-212 : calling new schedulerStarting() method on the listeners
            // right after entering start()
            notifySchedulerListenersStarting();
    
            if (initialStart == null) {//初始化标识为null,进行初始化操作
                initialStart = new Date();
                // RAMJobStore 啥都不做
                // JobStoreSupport 判断是否集群,恢复Job等
                this.resources.getJobStore().schedulerStarted();           
                startPlugins();
            } else {
                resources.getJobStore().schedulerResumed();// 如果已经初始化过,则恢复jobStore
            }
    
            schedThread.togglePause(false);// 唤醒所有等待的线程
    
            getLog().info("Scheduler " + resources.getUniqueIdentifier() + " started.");
    
            notifySchedulerListenersStarted();
        }
    
        // 其他代码
    
    }
    

    QuartzSchedulerThread

    public class QuartzSchedulerThread extends Thread {
    
        /**
         * pause为true,发出让主循环暂停的信号,以便线程在下一个可处理的时刻暂停
         * pause为false,唤醒sigLock对象的所有等待队列的线程
         */
        void togglePause(boolean pause) {
            synchronized (sigLock) {
                paused = pause;
    
                if (paused) {
                    signalSchedulingChange(0);
                } else {
                    sigLock.notifyAll();
                }
            }
        }
    
        // 其他代码
    
    }
    

    Listener事件监听

    Listener事件监听是观察者模式的一个应用。
    QuartzScheduler的scheduleJob()start()方法都有notifyXXX代码逻辑,这些就是JobDetail、Trigger和Scheduler事件监听的代码逻辑。
    在《Scheduler的初始化》篇章里面,初始化一个Scheduler,里面有"根据PropertiesParser创建Listeners"的步骤,Listeners就包括JobListener和TriggerListener的List对象。
    SchedulerListener不支持配置在quartz.properties里面,初始化Scheduler的过程中没有这一块的代码逻辑。如果要添加一个观察者,那么可以通过StdScheduler.getListenerManager()获取ListenerManager实例,通过它可以拿到所有观察者的引用。

    类图

    Quartz Listener类图

    角色说明

    类名 角色
    QuartzScheduler Subject
    SchedulerListener Observer
    JobListener Observer
    TriggerListener Observer

    代码示例

    Subject通知Observer,都是遍历Observer列表,触发相应的通知,实现事件监听的效果。
    这里特别说明一下,获取Listeners集合的时候,是通过新建一个不可改变的集合对象来实现。如果是为了避免多线程的读写问题,这和CopyOnWriteList写时复制的做法相反,而且这里读的场景大于写的场景。况且,ListenerManagerImpl的add()方法都做了代码块的synchronized。新建一个不可改变的集合来返回,这么做的目的没有想明白。

    public void notifySchedulerListenersJobAdded(JobDetail jobDetail) {
        // build a list of all scheduler listeners that are to be notified...
        List<SchedulerListener> schedListeners = buildSchedulerListenerList();
    
        // notify all scheduler listeners
        for(SchedulerListener sl: schedListeners) {
            try {
                sl.jobAdded(jobDetail);
            } catch (Exception e) {
                getLog().error(
                        "Error while notifying SchedulerListener of JobAdded.",
                        e);
            }
        }
    }
    

    ListenerManagerImpl

    public class ListenerManagerImpl implements ListenerManager {
      // 其他代码
      public List<SchedulerListener> getSchedulerListeners() {
          synchronized (schedulerListeners) {
              return java.util.Collections.unmodifiableList(new ArrayList<SchedulerListener>(schedulerListeners));
          }
      }
    }
    

    系列文章

    相关文章

      网友评论

          本文标题:Quartz 源码解析(四) —— QuartzSchedule

          本文链接:https://www.haomeiwen.com/subject/szhwwftx.html