美文网首页
Elastic-Job源码分析之JobScheduler类分析

Elastic-Job源码分析之JobScheduler类分析

作者: 端木轩 | 来源:发表于2017-05-10 20:13 被阅读85次

    JobScheduler这个类是EJ中比较核心的一个类,我们现在开始解析这个类。

    一、构造器

    首先我们看一下JobScheduler的几个构造器。

        private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
            JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
            this.liteJobConfig = liteJobConfig;
            this.regCenter = regCenter;
            List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
            setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
            schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
            jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
        }
    

    这个构造器比较清晰,第一个参数regCenter表示的是注册中心,这里我们用的是zk作为我们的注册中心,所以这块的配置我们一般从xml配置文件中读取即可。我们按照官方文档配置即可。

        <!--配置作业注册中心 -->
        <reg:zookeeper id="regCenter" server-lists="ip1:2181,ip2:2181"
                       namespace="caocao-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000"
                       max-retries="3"/>
    

    然后从xml文件中取出该配置即可。

    ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-job.xml");
            ZookeeperRegistryCenter zookeeperRegistryCenter = context.getBean(ZookeeperRegistryCenter.class);
    

    第二个参数liteJobConfig表示的是作业的一些配置,这块我们后续再细细了解。

    第三个参数jobEventBus表示作业运行痕迹总线。如果想把作业运行的内容写到DB中,我们需要用到另一个构造器,同时定义自己的JobEventConfiguration,目前来说实现这个接口的只有一个类JobEventRdbConfiguration,通过这个可以将作业运行的痕迹进行持久化到DB的操作。

    第四个参数elasticJobListeners表示一些监听器,这里我们可以定义一些监听器,具体的监听器我们可以参考官方文档。

    二、初始化

    定义好我们的作业后,怎么将我们的作业运行起来呢?这块就需要用到我们的初始化作业的方法init()。

    public void init() {
        JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfig.getJobName(), liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount());
        JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfig.getTypeConfig().getJobClass()), liteJobConfig.getJobName());
        JobRegistry.getInstance().registerJob(liteJobConfig.getJobName(), jobScheduleController, regCenter);
        schedulerFacade.registerStartUpInfo(liteJobConfig);
        jobScheduleController.scheduleJob(liteJobConfig.getTypeConfig().getCoreConfig().getCron());
        }
    

    初始化主要做了以下几个动作:

    • 设置当前分片总数setCurrentShardingTotalCount
    • 创建调度器createScheduler
    • 创建任务详情createJobDetail
    • 将作业在zk上面进行注册registerJob
    • 注册作业启动信息registerStartUpInfo
    • 开始进行作业调度scheduleJob

    这几个动作是一个作业能够最终被调度的关键,我们来一一分析。

    2.1 createScheduler

        private Scheduler createScheduler() {
            Scheduler result;
            try {
                StdSchedulerFactory factory = new StdSchedulerFactory();
                factory.initialize(getBaseQuartzProperties());//TODO 每次新建任务都会创建一个线程,会导致线程数过多
                result = factory.getScheduler();
            } catch (final SchedulerException ex) {
                throw new JobSystemException(ex);
            }
            return result;
        }
    

    这个方法调用了quartz底层的方法,创建了一个调度器。我们看看quartz中的一些配置:

    private Properties getBaseQuartzProperties() {
        Properties result = new Properties();
        result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
        result.put("org.quartz.threadPool.threadCount", "1");
        result.put("org.quartz.scheduler.instanceName", liteJobConfig.getJobName());
        result.put("org.quartz.jobStore.misfireThreshold", "1");
        result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
        result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
        return result;
    }
    

    我们可以看到一个比较重要的参数:org.quartz.threadPool.threadCount,这里设置为了1,也就是说每个任务,EJ默认都会创建一个线程来进行调度,所以如果想动态的创建任务,你就要考虑好你的内存够不够这些线程使用了。

    2.2 createJobDetail

    private JobDetail createJobDetail(final String jobClass) {
        JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
        result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
        Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
        if (elasticJobInstance.isPresent()) {
            result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
        } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
            try {
                result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
            } catch (final ReflectiveOperationException ex) {
                throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
            }
        }
        return result;
    }
    

    这个方法创建了任务的一些属性。

    • 首先调用JobBuilder的构造器方法,生成了一个详细的任务。
    • 然后将作业的门面放入到作业内部的一个作业数据map中,相当于是一个缓存。
    • 调用createElasticJobInstance创建实例,生成一个Optional的类,这个Optional是Guava中的一个类,后续咱们再研究。我们只需要知道创建了一个实例就行了,实例放在了Optional这个容器中,而Optional可以做一些额外的判断。我们可以联想一下,利用Optional来处理null?
    • 判断实例是否创建成功,通过isPresent()方法来判断。Optional的这个方法判断实例是否为空。
      • 如果不为空,就把实例放入到缓存map中。
      • 否则,调用反射,把实例放入缓存map中。如果异常的话,直接抛出异常,说明实例化失败。

    2.3 registerJob

    看方法名我们就能知道,这个方法的作用是注册我们的任务。

    /**
     * 添加作业调度控制器.
     * 
     * @param jobName 作业名称
     * @param jobScheduleController 作业调度控制器
     * @param regCenter 注册中心
     */
    public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) {
        schedulerMap.put(jobName, jobScheduleController);
        regCenterMap.put(jobName, regCenter);
        regCenter.addCacheData("/" + jobName);
    }
    

    这步的操作,把作业控制调度器放入到调度缓存中,把任务信息放入到regCenter的缓存中。另外还把任务名称加入到本地缓存中。

    2.4 registerStartUpInfo

    这个方法是注册一些任务的启动信息,这块的步骤比较多,需要细细理解一下。

        public void registerStartUpInfo(final boolean enabled) {
            listenerManager.startAllListeners();
            leaderService.electLeader();
            serverService.persistOnline(enabled);
            instanceService.persistOnline();
            shardingService.setReshardingFlag();
            monitorService.listen();
            if (!reconcileService.isRunning()) {
                reconcileService.startAsync();
            }
        }
    
    • 第一步,启动所有监听器。
        public void startAllListeners() {
            electionListenerManager.start();//主节点选举
            shardingListenerManager.start();//分片
            failoverListenerManager.start();//失效转移
            shutdownListenerManager.start();//关闭
            triggerListenerManager.start();//触发器
            rescheduleListenerManager.start();//重新调度
            guaranteeListenerManager.start();//保证分布式任务全部开始和结束状态
            jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);//注册中心与任务节点的连接状态
        }
    
    • 第二步,选举主节点。这块底层调用的是curator的方法,一个zk的客户端。
    • 第三步,持久化作业信息到zk中。先把服务器信息持久化,再把作业实例信息持久化,
    • 第四步,设置重新分片的信息,同样也是在zk中。
    • 第五步,初始化作业监听服务。
    • 最后,调解分布式作业不一致状态服务异步启动。

    可以看到,这块做的东西比较多,而且最近版本这块修改的内容也比较多。总的来说,就是启动监听器,同时将作业的信息持久化到zk中。

    2.5 scheduleJob

    调度作业,这块是一个比较基础的服务。底层调用的是quartz的方法。

    初始化这块就分析完了,里面的东西还是挺多的。今天就先分析到这里,咱们以后再细细分析吧。

    相关文章

      网友评论

          本文标题:Elastic-Job源码分析之JobScheduler类分析

          本文链接:https://www.haomeiwen.com/subject/ojagtxtx.html