美文网首页Elastic-Job源码分析
Elastic-Job-Lite 源码分析-作业初始化过程

Elastic-Job-Lite 源码分析-作业初始化过程

作者: Lexus90511 | 来源:发表于2018-11-18 17:04 被阅读0次

    最近对分布式调度系统比较感兴趣,Elastic-Job就是其中一款比较常用的开源分布式调度系统,为了更深入的了解他,打算对他的核心代码做一个全面的分析,今天先让我们来分析下他的整个初始化过程。

    从一个SimpleJob入手

    Elastic-Job支持3中调度作业:SimpleJobDataflowJobScriptJOb

    我们从SimpleJob的使用Demo入手,由浅入深的打开整个作业的初始化过程的神秘面纱。

    main方法中整个流程如下:

    首先,通过createRegistryCenter()方法创建一个用于协调分布式服务的注册中心,说白了就是个ZK Client;

    然后,调用createJobConfiguration()创建一个作业配置;

    最后用上面创建的ZK注册中心和作业配置来配置作业调度器,并初始化。

    接下来分析下createJobConfiguration() ,然后进入我们今天的主题JobScheduler.init() 调度作业初始化过程。

    public class Application {
    
        public static void main(String[] args) {
            // 创建、初始化zk client
            CoordinatorRegistryCenter registryCenter = createRegistryCenter();
            // 创建作业配置
            LiteJobConfiguration liteJobConfiguration = createJobConfiguration();
            // 创建调度作业并初始化
            new JobScheduler(registryCenter, liteJobConfiguration).init();
        }
    
         /**
         * 创建注册中心
         * @return
         */
        private static CoordinatorRegistryCenter createRegistryCenter() {
            String zkServerLists = "127.0.0.1:2181";
            String namespace = "namespace";
            
            // zk配置
            ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(zkServerLists, namespace);
            
            // 创建、初始化zk client
            CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zkConfig);
            regCenter.init();
            return regCenter;
        }
    
        /**
         * 创建作业配置
         * @return
         */
        private static LiteJobConfiguration createJobConfiguration() {
            // 定义作业核心配置
            String jobName = "myElasticJob";
            String cron = "0/3 * * * * ?";
            int shardingTotalCount = 1;
            JobCoreConfiguration jobCoreConfig = JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount).build();
    
            // 定义SIMPLE类型配置
            String jobClasName = MyElasticJob.class.getCanonicalName();
            SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(jobCoreConfig, jobClasName);
    
            // 定义Lite作业根配置
            LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
            return simpleJobRootConfig;
        }
    }
    

    createJobConfiguration()

    我们来看下createJobConfiguration()方法里做了什么:

    通过建造者模式创建出Job核心配置类的JobCoreConfiguration一个实例

    public final class JobCoreConfiguration {
        // 必填
        private final String jobName;
        // 必填
        private final String cron;
        
        /**
         * 作业分片总数。如果一个作业启动超过作业分片总数的节点,只有 shardingTotalCount 会执行作业。必填。
         */
        private final int shardingTotalCount;
        
        /**
         * 分片序列号和参数用等号分隔,多个键值对用逗号分隔
         * 分片序列号从0开始,不可大于或等于作业分片总数
         * 如:
         * 0=a,1=b,2=c
         */
        private final String shardingItemParameters;
    
        /**
         * 作业自定义参数,可通过传递该参数为作业调度的业务方法传参,用于实现带参数的作业
         * 例:每次获取的数据量、作业实例从数据库读取的主键等
         */
        private final String jobParameter;
    
        /**
         * 是否开启作业执行失效转移。开启表示如果作业在一次作业执行中途宕机,允许将该次未完成的作业在另一作业节点上补偿执行。
         * 默认为 false。选填。
         */
        private final boolean failover;
    
        /**
         * 是否开启错过作业重新执行。
         * 默认为 true。选填。
         */
        private final boolean misfire;
        
        private final String description;
    
        /**
         * 作业属性配置。选填。
         */
        private final JobProperties jobProperties;
    }
    

    JobCoreConfiguration中主要是一些作业基础信息,其中jobName和cron以及shardingTotalCount是必填项,其他选填。

    然后new 一个SimpleJobConfiguration对象,SimpleJobConfiguration类很简单,只有三个成员变量:

    public final class SimpleJobConfiguration implements JobTypeConfiguration {
        
        // job核心配置
        private final JobCoreConfiguration coreConfig;
        
        // job类型,Elastic-Job 有SIMPLE, DATAFLOW和SCRIPT这三种类型,这里创建的是SIMPLE类型
        private final JobType jobType = JobType.SIMPLE;
    
        // 自定义的job全路径名
        private final String jobClass;
    }   
    

    其中,jobClass表示用户自定义的Job类的全路径,例如我们创建的MyElasticJob类:

    public class MyElasticJob implements SimpleJob {
    
        @Override
        public void execute(ShardingContext shardingContext) {
            log.info("shardingContext = {}", shardingContext);
    
            switch (shardingContext.getShardingItem()) {
                case 0:
                    log.info("Item = {}", 0);
                    break;
                case 1:
                    log.info("Item = {}", 1);
                    break;
                case 2:
                    log.info("Item = {}", 2);
                    break;
            }
        }
    }
    

    对应的jobClass=io.elasticjob.lite.example.MyElasticJob

    最后一样使用创建者模式构建出一个LiteJobConfiguration类的对象:

    public final class LiteJobConfiguration implements JobRootConfiguration {
    
        /**
         * 作业类型配置,例如:SimpleJobConfiguration.
         */
        private final JobTypeConfiguration typeConfig;
    
        /**
         * 监控作业执行时状态, 默认为true
         * 1. 每次作业执行时间和间隔时间均非常短的情况, 建议不监控作业运行时状态以提升效率, 
         * 因为是瞬时状态, 所以无必要监控. 请用户自行增加数据堆积监控. 并且不能保证数据重复选取, 
         * 应在作业中实现幂等性. 也无法实现作业失效转移.
         * 2. 每次作业执行时间和间隔时间均较长短的情况, 建议监控作业运行时状态, 可保证数据不会重复选取.
         */
        private final boolean monitorExecution;
    
        /**
         * 设置最大容忍的本机与注册中心的时间误差秒数。默认为 -1,不检查时间误差。选填。
         */
        private final int maxTimeDiffSeconds;
    
        // 作业辅助监控端口.
        private final int monitorPort;
    
        // 作业分片策略实现类全路径
        private final String jobShardingStrategyClass;
    
        /**
         * 服务器不一致状态服务调度间隔时间,配置为小于1的任意值表示不执行修复。默认为 10。
         */
        private final int reconcileIntervalMinutes;
    
        // 作业是否启动时禁止.
        private final boolean disabled;
    
        // 本地配置是否可覆盖注册中心配置.
        private final boolean overwrite;
        
        /**
         * 获取作业名称.
         * 
         * @return 作业名称
         */
        public String getJobName() {
            return typeConfig.getCoreConfig().getJobName();
        }
        
        /**
         * 是否开启失效转移.
         * 开启后,如果某个作业节点挂掉了,主节点会将作业分配到另外一个正常的作业节点上,
         * 保证定时任务到点时能被正常执行。
         * @return 是否开启失效转移
         */
        public boolean isFailover() {
            return typeConfig.getCoreConfig().isFailover();
        }
    }
    

    注册中心实例和作用配置实例创建完成, 接下来用他们来实例化JobScheduler,并初始化。

    JobScheduler.init()

    上面的热身结束,接下来开始今天的重点,分析下作业调度器JobScheduler的实例创建和初始化过程:

    /**
     * 作业调度器.
     */
    public class JobScheduler {
        
        public static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob";
        
        private static final String JOB_FACADE_DATA_MAP_KEY = "jobFacade";
        
        private final LiteJobConfiguration liteJobConfig;
        
        private final CoordinatorRegistryCenter regCenter;
        
        // 为调度器提供内部服务的门面类.
        @Getter
        private final SchedulerFacade schedulerFacade;
        
        // 作业内部服务门面服务.
        private final JobFacade jobFacade;
        
        public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final ElasticJobListener... elasticJobListeners) {
            this(regCenter, liteJobConfig, new JobEventBus(), elasticJobListeners);
        }
        
        public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventConfiguration jobEventConfig, 
                            final ElasticJobListener... elasticJobListeners) {
            this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);
        }
        
        private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
            JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
            this.liteJobConfig = liteJobConfig;
            this.regCenter = regCenter;
            List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
            // 给[分布式作业中只执行一次的监听器]设置[保证分布式任务全部开始和结束状态的服务].
            setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
            // 创建[为调度器提供内部服务的门面类]对象.
            schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
            // 创建[为作业提供内部服务的门面类]对象.
            jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
        }
        
         /**
         * 给[分布式作业中只执行一次的监听器]设置[保证分布式任务全部开始和结束状态的服务].
         * @param regCenter
         * @param elasticJobListeners
         */
        private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter, final List<ElasticJobListener> elasticJobListeners) {
            GuaranteeService guaranteeService = new GuaranteeService(regCenter, liteJobConfig.getJobName());
            for (ElasticJobListener each : elasticJobListeners) {
                if (each instanceof AbstractDistributeOnceElasticJobListener) {
                    ((AbstractDistributeOnceElasticJobListener) each).setGuaranteeService(guaranteeService);
                }
            }
        }
        
        /**
         * 初始化作业.
         */
        public void init() {
            
             /**
             * 1. 更新、拉取ZK上的作业配置.
             * 这里有三种情况:
             *
             * 1)作业第一次启动时ZK上还没有该作业的配置信息
             *    将作业配置持久化为ZK上以该作业名命名的节点下的config节点的值;
             *
             * 2)作业非第一次启动,但是作业配置的overwrite=true,即覆盖作业配置
             *    用本地的作业配置更新ZK上的config节点值;
             *
             * 3)非上面两钟情况,则拉取ZK上的config节点配置,作为本次作业的配置信息。
             */
            LiteJobConfiguration liteJobConfigFromRegCenter =   
                schedulerFacade.updateJobConfiguration(liteJobConfig);
    
             /**
             * 2. 设置分片总数.
             *
             * 作业注册表中维护中一张[作业名-分片数]的对照表:
             * currentShardingTotalCountMap 使用线程安全的ConcurrentHashMap,
             * 记录着每个作业的分片数.
             */
            JobRegistry.getInstance(). setCurrentShardingTotalCount(
                liteJobConfigFromRegCenter.getJobName(),                        
                liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig()
                .getShardingTotalCount() );
            
             // 3. 创建调度控制器, 控制作业的调度、启、停.
            JobScheduleController jobScheduleController = 
                new JobScheduleController(
                    createScheduler(), 
                    createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), 
                    liteJobConfigFromRegCenter.getJobName()
                );
    
            /**
             * 4. 添加作业调度控制器.
             *
             * 作业注册表中维护中一张[作业名-作业控制器]的对照表:
             * schedulerMap 使用线程安全的ConcurrentHashMap,
             * 记录着每个作业的调度控制器.
             */
            JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), 
                                                  jobScheduleController, regCenter);
    
            /**
             * 5. 注册作业启动信息.
             */
            schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
    
            // 6. 开启作业调度.       
            jobScheduleController.scheduleJob(
                liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron()
            );
        }
        
         /**
         * 创建jobDetail.
         * 将jobFacade和我们最开始新建的MyElasticJob类的实例放入JobDataMap中.
         * @param jobClass
         * @return
         */
        private JobDetail createJobDetail(final String jobClass) {
            JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
            result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
            Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
            if (elasticJobInstance.isPresent()) {
                result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
            } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
                try {
                    result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
                } catch (final ReflectiveOperationException ex) {
                    throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
                }
            }
            return result;
        }
        
        ...
        
    }
    

    JobScheduler.init()方法主要做:

    1. 更新、拉取ZK上的作业配置.
    2. 作业注册表中记录分片总数.
    3. 创建调度控制器.
    4. 添加作业调度控制器.
    5. 注册作业启动信息.
    6. 调度作业.

    其中调用SchedulerFacade.registerStartUpInfo(final boolean enabled)注册作业启动信息:

    /**
     * 为调度器提供内部服务的门面类.
     */
    public final class SchedulerFacade {
        
        ...
        
        /**
         * 注册作业启动信息.
         * 
         * @param enabled 作业是否启用
         */
        public void registerStartUpInfo(final boolean enabled) {
    
            // 开启所有监听器.
            listenerManager.startAllListeners();
            // 选举成功后,将临时节点/jobName/leader/elections/instance的值设置为
            // 调度作业唯一标示:ip@-@进程id.
            leaderService.electLeader();
    
            /** 
             * 持久化作业服务器上线信息.
             * 将永久节点/jobName/servers/ip 的值设置为""空字符串或"DISABLED".
             */
           serverService.persistOnline(enabled);
    
            /**
             * 持久化作业运行实例上线相关信息.
             * 在/jobName/instances节点下新建临时节点[ip@-@进程id], 值为""空字符串
             */ 
            instanceService.persistOnline();
    
            /**
              * 设置需要重新分片的标记.
              * 设置永久节点/jobName/leader/sharding/necessary, 值为""空字符串 
              */
           shardingService.setReshardingFlag();
            // 初始化作业监听服务.
            monitorService.listen();
            // 开启调解分布式作业不一致状态服务.
            if (!reconcileService.isRunning()) {
                reconcileService.startAsync();
            }
        }
        
        /**
         * 终止作业调度.
         */
        public void shutdownInstance() {
            if (leaderService.isLeader()) {
                leaderService.removeLeader();
            }
            monitorService.close();
            if (reconcileService.isRunning()) {
                reconcileService.stopAsync();
            }
            JobRegistry.getInstance().shutdown(jobName);
        }
    }
    

    至此Elastic-Job-Lite的整个调度作业的初始化过程完成,最后我们看下JobScheduler这个类的继承关系

    JobScheduler.png

    可以看到有个子类, SpringJobScheduler继承了JobScheduler, 本文中从一个例子入手,我们这里的例子在引入Elastic-Job时用的是编码方式,而Elastic-Job其实还提供Spring XML配置的方式来引入,所以此处的SpringJobScheduler就是一个基于Spring的作业启动器。

    总结

    本文主要结合源码分析了下Elastic-Job的作业初始化过程,通过分析了解到作业初始化需要哪些条件;和ZK之间如何交互;如何对Quartz的Scheduler进行封装等。

    Elastic-Job作为分布式调度系统,是建立在Quartz基础上实现分布式的,功能上最大的区别在于Elastic-Job提供了弹性扩容缩容、分片和失效转移等满足分布式集群对于调度任务的需求。

    而这些功能都建立在ZK这个分布式一致性协调服务器上,通过在ZK上建立复杂的节点,再结合ZK节点的监听机制,完成这一系列功能。

    所以在下一篇文章中,将重点介绍下Elastic-Job在ZK上建立了哪些节点、节点作用以及节点之间的复关系。

    相关文章

      网友评论

        本文标题:Elastic-Job-Lite 源码分析-作业初始化过程

        本文链接:https://www.haomeiwen.com/subject/mvnbfqtx.html