美文网首页
elastic job

elastic job

作者: sxt_5cc3 | 来源:发表于2018-05-24 22:32 被阅读0次

    1. 介绍

    1.1 简介

    1.2 测试代码

    public static void main(String[] args) {
            // zokeeper注册中心
            CoordinatorRegistryCenter registerCenter = createRegistryCenter();
            // lite job 配置
            LiteJobConfiguration liteJobCfg = createJobConfiguration();
            new JobScheduler(registerCenter, liteJobCfg).init();
        }
    
        private static CoordinatorRegistryCenter createRegistryCenter() {
            CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(
                    // zookeeper地址和任务节点
                    new ZookeeperConfiguration("localhost:2181", "elastic-job-demo"));
            // 初始化zookeeper注册中心
            regCenter.init();
            return regCenter;
        }
    
        private static LiteJobConfiguration createJobConfiguration() {
            // 任务作业配置
            JobCoreConfiguration simpleCoreConfig = 
                    JobCoreConfiguration.newBuilder("demoSimpleJob", "*/5 * * * * ?", 3).build();
            // 任务
            SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig,
                    DemoJob.class.getCanonicalName());
            // 定义Lite作业根配置
            LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
            return simpleJobRootConfig;
        }
    

    1.3 流程简介

    1.3.1 初始化启动
    启动quartz.png
    1.3.2 执行
    执行.png
    1.3.3 zk节点
    节点大体分配.png
    • 蓝色代表主节点

    2. 初始化

    2.1 初始化zk

    public final class ZookeeperConfiguration {
        
        /**
         * 连接Zookeeper服务器的列表.
         * 包括IP地址和端口号.
         * 多个地址用逗号分隔.
         * 如: host1:2181,host2:2181
         */
        private final String serverLists;
        
        /**
         * 命名空间.
         */
        private final String namespace;
        
        /**
         * 等待重试的间隔时间的初始值.
         * 单位毫秒.
         */
        private int baseSleepTimeMilliseconds = 1000;
        
        /**
         * 等待重试的间隔时间的最大值.
         * 单位毫秒.
         */
        private int maxSleepTimeMilliseconds = 3000;
        
        /**
         * 最大重试次数.
         */
        private int maxRetries = 3;
        
        /**
         * 会话超时时间.
         * 单位毫秒.
         */
        private int sessionTimeoutMilliseconds;
        
        /**
         * 连接超时时间.
         * 单位毫秒.
         */
        private int connectionTimeoutMilliseconds;
        
        /**
         * 连接Zookeeper的权限令牌.
         * 缺省为不需要权限验证.
         */
        private String digest;
    }
    
    public void init() {
            log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                    .connectString(zkConfig.getServerLists())
                    .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
                    .namespace(zkConfig.getNamespace());
            if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
                builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
            }
            if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
                builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
            }
            if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
                builder.authorization("digest", zkConfig.getDigest().getBytes(Charsets.UTF_8))
                        .aclProvider(new ACLProvider() {
                        
                            @Override
                            public List<ACL> getDefaultAcl() {
                                return ZooDefs.Ids.CREATOR_ALL_ACL;
                            }
                        
                            @Override
                            public List<ACL> getAclForPath(final String path) {
                                return ZooDefs.Ids.CREATOR_ALL_ACL;
                            }
                        });
            }
            client = builder.build();
            client.start();
            try {
                if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
                    client.close();
                    throw new KeeperException.OperationTimeoutException();
                }
                //CHECKSTYLE:OFF
            } catch (final Exception ex) {
                //CHECKSTYLE:ON
                RegExceptionHandler.handleException(ex);
            }
        }
    

    2.2 初始化配置

    2.2.1 JobCoreConfiguration
        private final String jobName;
        
        private final String cron;
        
        private final int shardingTotalCount;
        
        /**
         * 设置分片序列号和个性化参数对照表.
         *
         * <p>
         * 分片序列号和参数用等号分隔, 多个键值对用逗号分隔. 类似map.
         * 分片序列号从0开始, 不可大于或等于作业分片总数.
         * 如:
         * 0=a,1=b,2=c
         * </p>
         *
         * @param shardingItemParameters 分片序列号和个性化参数对照表
         *
         * @return 作业配置构建器
         */
        private final String shardingItemParameters;
        
        private final String jobParameter;
        
        // 是否开启失效转移
        private final boolean failover;
        // 是否执行错过作业
        private final boolean misfire;
        
        private final String description;
        
        private final JobProperties jobProperties;
    
    2.2.2 JobConfiguration
    JobConfiguration.png

    SimpleJobConfiguration

        // 通用作业配置
        private final JobCoreConfiguration coreConfig;
        
        // 作业类型
        private final JobType jobType = JobType.SIMPLE;
        
        // 作业类
        private final String jobClass;
    

    2.2.3 LiteJobConfiguration

        // 作业配置
        private final JobTypeConfiguration typeConfig;
        
        // 是否监控作业状态
        private final boolean monitorExecution;
        
        // 允许本机与注册中心的时间误差秒数
        private final int maxTimeDiffSeconds;
        
        // 监控通信端口
        private final int monitorPort;
        
        // 分片策略类
        private final String jobShardingStrategyClass;
        
        // 设置修复作业服务器不一致状态服务执行间隔分钟数
        private final int reconcileIntervalMinutes;
        
        // 设置作业是否启动时禁止
        private final boolean disabled;
        
        // 本地配置是否覆盖注册中心配置
        private final boolean overwrite;
    

    3. 启动

    3.1 通用service

    // 作业配置服务, 就是LiteJobConfiguration
    new ConfigurationService(regCenter, jobName);
    // 主节点服务
    new LeaderService(regCenter, jobName);
    // 可用作业服务器服务
    new ServerService(regCenter, jobName);
    // 运行作业节点服务
    new InstanceService(regCenter, jobName);
    // 分片服务
    new ShardingService(regCenter, jobName);
    // 作业运行服务
    new ExecutionService(regCenter, jobName);
     // 作业监控服务
    new MonitorService(regCenter, jobName);
    // 调解分布式作业不一致状态服务
    new ReconcileService(regCenter, jobName);
    // 作业失效转移服务
    new FailoverService(regCenter, jobName)
    

    3.2 JobScheduler

       /**
         * 作业调度
         * 
         * @param regCenter
         * @param liteJobConfig
         * @param jobEventBus
         * @param elasticJobListeners
         */
        private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
            // 全局运行信息
            // 作业名称  = 作业实例主键(IP + @-@ + xxxxx(JVM-PID)
            JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
            this.liteJobConfig = liteJobConfig;
            this.regCenter = regCenter;
            List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
            // 向AbstractDistributeOnceElasticJobListener类型listener, 添加注册中心
            setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
            // 调度门面类
            schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
            // 作业服务门面类
            jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
        }
    
    3.2.1 SchedulerFacade
    public SchedulerFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners) {
            this.jobName = jobName;
            // 作业配置服务, 就是LiteJobConfiguration
            configService = new ConfigurationService(regCenter, jobName);
            // 主节点服务
            leaderService = new LeaderService(regCenter, jobName);
            // 可用作业服务器服务
            serverService = new ServerService(regCenter, jobName);
            // 运行作业节点服务
            instanceService = new InstanceService(regCenter, jobName);
            // 分片服务
            shardingService = new ShardingService(regCenter, jobName);
            // 作业运行服务
            executionService = new ExecutionService(regCenter, jobName);
            // 作业监控服务
            monitorService = new MonitorService(regCenter, jobName);
            // 调解分布式作业不一致状态服务
            reconcileService = new ReconcileService(regCenter, jobName);
            // 节点监听
            // 监听的${namespace}/${job-name}节点
            listenerManager = new ListenerManager(regCenter, jobName, elasticJobListeners);
        }
    
    • ListenerManager 监听${namespace}/${job-name}节点, 重要!!!
    3.2.2 LiteJobFacade
      public LiteJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners, final JobEventBus jobEventBus) {
            configService = new ConfigurationService(regCenter, jobName);
            shardingService = new ShardingService(regCenter, jobName);
            executionContextService = new ExecutionContextService(regCenter, jobName);
            executionService = new ExecutionService(regCenter, jobName);
            failoverService = new FailoverService(regCenter, jobName);
            this.elasticJobListeners = elasticJobListeners;
            this.jobEventBus = jobEventBus;
        }
    

    3.3 init

       public void init() {
            // 更新并获得 ${job-name}/config 的节点配置数据(LiteJobConfiguration)
            LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
            // 全局运行信息
            // job-name : 分片数
            JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
            // 创建JobScheduleController 封装了quartz的信息
            JobScheduleController jobScheduleController = new JobScheduleController(
                     // 创建org.quartz.Scheduler信息
                     createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
            // 全局运行信息
            // job-name : JobScheduleController : registerCenter
            JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
            // 启动前操作
            schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
            // 启动本地quartz
            jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
        }
    
    • schedulerFacade.updateJobConfiguration(liteJobConfig); 添加或者更新${namespace}/${job-name}/config节点信息
    3.3.1 createScheduler
     private Scheduler createScheduler() {
            Scheduler result;
            try {
                StdSchedulerFactory factory = new StdSchedulerFactory();
                factory.initialize(getBaseQuartzProperties());
                result = factory.getScheduler();
                result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
            } catch (final SchedulerException ex) {
                throw new JobSystemException(ex);
            }
            return result;
        }
        
        private Properties getBaseQuartzProperties() {
            Properties result = new Properties();
            result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
            result.put("org.quartz.threadPool.threadCount", "1");
            result.put("org.quartz.scheduler.instanceName", liteJobConfig.getJobName());
            result.put("org.quartz.jobStore.misfireThreshold", "1");
            result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
            result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
            return result;
        }
    
    3.3.2 createJobDetail
    • 统一LiteJob执行
      private JobDetail createJobDetail(final String jobClass) {
            // 设置执行任务
            JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
            result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
            Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
            if (elasticJobInstance.isPresent()) {
                result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
            } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
                try {
                    result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
                } catch (final ReflectiveOperationException ex) {
                    throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
                }
            }
            return result;
        }
    
    3.3.3 registerStartUpInfo !!!
      public void registerStartUpInfo(final boolean enabled) {
            // 启动zk节点监听器
            listenerManager.startAllListeners();
            // 选主: ${namespace}/${job-name}/leader/election
            // 监听器发现节点变化: ${namespace}/${job-name}/leader/sharding/necessary
            leaderService.electLeader();
            // 添加作业服务器节点: ${namespace}/${job-name}/server节点
            serverService.persistOnline(enabled);
            // 运行实例: ${namespace}/${job-name}/instances
            instanceService.persistOnline();
            // 重新设置需要分片标记: ${namespace}/${job-name}/leader/sharding/necessary
            shardingService.setReshardingFlag();
            // 作业监控服务
            monitorService.listen();
            // 是否开启动态监听是否重新分片服务
            if (!reconcileService.isRunning()) {
                reconcileService.startAsync();
            }
        }
    
    3.3.3.1 leaderService.electLeader()

    主节点选举成功, 回调LeaderElectionExecutionCallback

        /**
         * 选举主节点.
         */
        public void electLeader() {
            log.debug("Elect a new leader now.");
            jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
            log.debug("Leader election completed.");
        }
    
       /**
         * 在主节点执行操作.
         * 
         * @param latchNode 分布式锁使用的作业节点名称
         * @param callback 执行操作的回调
         */
        public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
            try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
                latch.start();
                latch.await();
                callback.execute();
            //CHECKSTYLE:OFF
            } catch (final Exception ex) {
            //CHECKSTYLE:ON
                handleException(ex);
            }
        }
    
        @RequiredArgsConstructor
        class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
            
            @Override
            public void execute() {
                // 判断是否有${namespace}/${job-name}/leader/election/instance节点
                if (!hasLeader()) {
                    jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
                }
            }
        }
    
        /**
         * 判断是否已经有主节点.
         * 
         * @return 是否已经有主节点
         */
        public boolean hasLeader() {
            return jobNodeStorage.isJobNodeExisted(LeaderNode.INSTANCE);
        }
    
    • 选主相当于一个分布式的锁,保证一个节点成为主节点
    3.3.3.2 reconcileService.startAsync()

    动态监听是否重新分片服务

    public final class ReconcileService extends AbstractScheduledService {
        
        private long lastReconcileTime;
        
        private final ConfigurationService configService;
        
        private final ShardingService shardingService;
        
        private final LeaderService leaderService;
        
        public ReconcileService(final CoordinatorRegistryCenter regCenter, final String jobName) {
            lastReconcileTime = System.currentTimeMillis();
            configService = new ConfigurationService(regCenter, jobName);
            shardingService = new ShardingService(regCenter, jobName);
            leaderService = new LeaderService(regCenter, jobName);
        }
        
        @Override
        protected void runOneIteration() throws Exception {
            LiteJobConfiguration config = configService.load(true);
            int reconcileIntervalMinutes = null == config ? -1 : config.getReconcileIntervalMinutes();
            // 检测间隔时间
            if (reconcileIntervalMinutes > 0 && (System.currentTimeMillis() - lastReconcileTime >= reconcileIntervalMinutes * 60 * 1000)) {
                lastReconcileTime = System.currentTimeMillis();
                //
                if (leaderService.isLeaderUntilBlock() //主节点操作
                        && !shardingService.isNeedSharding() // 当前不需要分片
                        && shardingService.hasShardingInfoInOfflineServers()) { // 可作业实例和"分片节点作业实例"不一致
                    log.warn("Elastic Job: job status node has inconsistent value,start reconciling...");
                    shardingService.setReshardingFlag();
                }
            }
        }
        
       /**
        *  一分钟执行一次
        */
        @Override
        protected Scheduler scheduler() {
            return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MINUTES);
        }
    }
    

    3.4 执行LiteJob

    quartz定时执行

    Executor.png
    public final class LiteJob implements Job {
        
        @Setter
        private ElasticJob elasticJob;
        
        @Setter
        private JobFacade jobFacade;
        
        @Override
        public void execute(final JobExecutionContext context) throws JobExecutionException {
            JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
        }
    }
    
    public final void execute() {
            try {
                // 检查本机与注册中心的时间误差秒数是否在允许范围
                // 误差范围外还继续执行
                jobFacade.checkJobExecutionEnvironment();
            } catch (final JobExecutionEnvironmentException cause) {
                jobExceptionHandler.handleException(jobName, cause);
            }
            // 获得分片
            ShardingContexts shardingContexts = jobFacade.getShardingContexts();
            // JobEvent
            if (shardingContexts.isAllowSendJobEvent()) {
                jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));
            }
            // 如果当前分片项仍在运行
            // 设置任务被错过执行的标记: ${namespace}/${job-name}/sharding/${item}/misfire
            // 并停止执行
            if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
                if (shardingContexts.isAllowSendJobEvent()) {
                    jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
                            "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, 
                            shardingContexts.getShardingItemParameters().keySet()));
                }
                return;
            }
            // before ElasticJobListener
            try {
                jobFacade.beforeJobExecuted(shardingContexts);
                //CHECKSTYLE:OFF
            } catch (final Throwable cause) {
                //CHECKSTYLE:ON
                jobExceptionHandler.handleException(jobName, cause);
            }
            execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
            // 不需要重新分片
            // 存在错过执行任务
            // 并且config可以执行错过任务
            // while循环, 防止错误再次启动而未执行的任务
            while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
                // 清除错过任务标记
                jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
                // 重新执行
                execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
            }
           // 开启失败转移
            // 当前任务执行结束
            // ${namespace}/${job-name}/leader/FAILOVER/latch下选取主节点
            // 主节点-锁, 获得${namespace}/${job-name}/leader/FAILOVER/${items} 节点下一个失败转移分片
            // FailoverLeaderExecutionCallback设置${namespace}/${job-name}/sharding/${item}/FAILOVER = 当前作业实例
            // 删除${namespace}/${job-name}/leader/FAILOVER/${items}
            // 重新调度任务
            jobFacade.failoverIfNecessary();
            try {
                // after ElasticJobListener
                jobFacade.afterJobExecuted(shardingContexts);
                //CHECKSTYLE:OFF
            } catch (final Throwable cause) {
                //CHECKSTYLE:ON
                jobExceptionHandler.handleException(jobName, cause);
            }
        }
    
    3.4.1 分片
       @Override
        public ShardingContexts getShardingContexts() {
            boolean isFailover = configService.load(true).isFailover();
            // 开启失败转移, 获得失败转移的分片
            if (isFailover) {
                // 获得${namespace}/${job-name}/sharding/${item}/FAILOVER = 当前运行实例, 的分片
                List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
                if (!failoverShardingItems.isEmpty()) {
                    return executionContextService.getJobShardingContext(failoverShardingItems);
                }
            }
            // 分片
            shardingService.shardingIfNecessary();
            // 获得${namespace}/${job-name}/sharding/${item}/instance = 当前运行实例, 的分片
            List<Integer> shardingItems = shardingService.getLocalShardingItems();
            if (isFailover) {
                // 移除失效的分片, 交给获得失败转移的作业实例执行
                shardingItems.removeAll(failoverService.getLocalTakeOffItems());
            }
            // 移除被禁用的分片
            shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
            return executionContextService.getJobShardingContext(shardingItems);
        }
    
       public void shardingIfNecessary() {
            // 获得可作业的运行实例: ${namespace}/${job-name}/instances下节点
            List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
            // 不需要分片 || 没有运行实例
            if (!isNeedSharding() || availableJobInstances.isEmpty()) {
                return;
            }
            // 不是主节点
            // 等待主节点, 分片结束
            // 就是不存在 ${namespace}/${job-name}/leader/sharding/necessary 和 processing节点
            if (!leaderService.isLeaderUntilBlock()) {
                blockUntilShardingCompleted();
                return;
            }
            /**--------------------**/
            /**     主节点分片      **/
            /**--------------------**/
    
            // 等待正在运行的分片实例结束
            // 判断running节点
            waitingOtherShardingItemCompleted();
            LiteJobConfiguration liteJobConfig = configService.load(false);
            int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
            log.debug("Job '{}' sharding begin.", jobName);
            // 正在分片节点
            jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
            // 创建或者刷新分片节点${namespace}/${job-name}/sharding/${item}
            resetShardingInfo(shardingTotalCount);
            // 获得分片策略
            JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());
            // JobShardingStrategy分片
            // PersistShardingInfoTransactionExecutionCallback
            // 创建对应的创建或者刷新分片节点${namespace}/${job-name}/sharding/${item}/instance = 当前运行实例
            // 移除ShardingNode.NECESSARY 和 ShardingNode.PROCESSING
            // 释放掉等待分片的从节点
            jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
            log.debug("Job '{}' sharding complete.", jobName);
        }
    
    3.4.2 运行前和后zk节点处理
    • 设置 ${namespace}/${job-name}/sharding/${item}/running
    • 执行结束清除
      删除${namespace}/${job-name}/sharding/${item}/running
      删除${namespace}/${job-name}/sharding/${item}/failover
    private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
            // 空分片不处理
            if (shardingContexts.getShardingItemParameters().isEmpty()) {
                if (shardingContexts.isAllowSendJobEvent()) {
                    jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
                }
                return;
            }
            // 作业运行标识节点
            // ${namespace}/${job-name}/sharding/${item}/running
            jobFacade.registerJobBegin(shardingContexts);
            String taskId = shardingContexts.getTaskId();
            // 事件记录
            if (shardingContexts.isAllowSendJobEvent()) {
                jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
            }
            try {
                process(shardingContexts, executionSource);
            } finally {
                // TODO 考虑增加作业失败的状态,并且考虑如何处理作业失败的整体回路
                // 作业结束
                // 删除${namespace}/${job-name}/sharding/${item}/running
                // 删除${namespace}/${job-name}/sharding/${item}/failover
                jobFacade.registerJobCompleted(shardingContexts);
                if (itemErrorMessages.isEmpty()) {
                    // event 结束处理
                    if (shardingContexts.isAllowSendJobEvent()) {
                        jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
                    }
                } else {
                    if (shardingContexts.isAllowSendJobEvent()) {
                        // event 处理错误信息
                        jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
                    }
                }
            }
        }
    
    3.4.3 执行job
    private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
            Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
            // 一个分片直接执行了
            if (1 == items.size()) {
                int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
                JobExecutionEvent jobExecutionEvent =  new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
                process(shardingContexts, item, jobExecutionEvent);
                return;
            }
            // 闭锁, 等待所有分片任务执行结束, 返回
            final CountDownLatch latch = new CountDownLatch(items.size());
            for (final int each : items) {
                final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
                if (executorService.isShutdown()) {
                    return;
                }
                executorService.submit(new Runnable() {
                    
                    @Override
                    public void run() {
                        try {
                            process(shardingContexts, each, jobExecutionEvent);
                        } finally {
                            latch.countDown();
                        }
                    }
                });
            }
            try {
                latch.await();
            } catch (final InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
        }
    
    private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
            if (shardingContexts.isAllowSendJobEvent()) {
                jobFacade.postJobExecutionEvent(startEvent);
            }
            log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
            JobExecutionEvent completeEvent;
            try {
                // 执行job
                process(new ShardingContext(shardingContexts, item));
                completeEvent = startEvent.executionSuccess();
                log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
                if (shardingContexts.isAllowSendJobEvent()) {
                    jobFacade.postJobExecutionEvent(completeEvent);
                }
                // CHECKSTYLE:OFF
            } catch (final Throwable cause) {
                // 异常处理
                // CHECKSTYLE:ON
                completeEvent = startEvent.executionFailure(cause);
                jobFacade.postJobExecutionEvent(completeEvent);
                itemErrorMessages.put(item, ExceptionUtil.transform(cause));
                jobExceptionHandler.handleException(jobName, cause);
            }
        }
    

    4. 其他

    4.1 监听器

    • ListenerManager管理
    • 基类
    public abstract class AbstractJobListener implements TreeCacheListener {
        
        @Override
        public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
            ChildData childData = event.getData();
            if (null == childData) {
                return;
            }
            String path = childData.getPath();
            if (path.isEmpty()) {
                return;
            }
            dataChanged(path, event.getType(), null == childData.getData() ? "" : new String(childData.getData(), Charsets.UTF_8));
        }
        
        protected abstract void dataChanged(final String path, final Type eventType, final String data);
    }
    
    • 主节点相关: ElectionListenerManager.start()
         /**
         * 重新选主
         */
        class LeaderElectionJobListener extends AbstractJobListener {
            
            @Override
            protected void dataChanged(final String path, final Type eventType, final String data) {
                // 没有关闭
                // 不是主节点 && 当前作业节点存活
                // ${namespace}/${job-name}/leander/election/instace 移除了 && 当前作业节点可用
                if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(path, data) || isPassiveElection(path, eventType))) {
                    // 重新选举主
                    leaderService.electLeader();
                }
            }
            
            private boolean isActiveElection(final String path, final String data) {
                return !leaderService.hasLeader() && isLocalServerEnabled(path, data);
            }
            
            private boolean isPassiveElection(final String path, final Type eventType) {
                return isLeaderCrashed(path, eventType) && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp());
            }
            
            private boolean isLeaderCrashed(final String path, final Type eventType) {
                return leaderNode.isLeaderInstancePath(path) && Type.NODE_REMOVED == eventType;
            }
            
            private boolean isLocalServerEnabled(final String path, final String data) {
                return serverNode.isLocalServerPath(path) && !ServerStatus.DISABLED.name().equals(data);
            }
        }
    
        //-------------------------------------------------------------
    
        /**
         * 当前job节点失效, 并且是主节点删除${namespace}/${job-name}/leander/election/instace 主节点
         */
        class LeaderAbdicationJobListener extends AbstractJobListener {
            
            @Override
            protected void dataChanged(final String path, final Type eventType, final String data) {
                if (leaderService.isLeader() && isLocalServerDisabled(path, data)) {
                    leaderService.removeLeader();
                }
            }
            
            private boolean isLocalServerDisabled(final String path, final String data) {
                return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(data);
            }
        }
    
    • 分片相关: ShardingListenerManager.start()
     /**
         * 重新分片标识
         */
        class ShardingTotalCountChangedJobListener extends AbstractJobListener {
            
            @Override
            protected void dataChanged(final String path, final Type eventType, final String data) {
                // config节点变化
                // 总分片数变化
                if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
                    int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getShardingTotalCount();
                    if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
                        // 重新分片
                        shardingService.setReshardingFlag();
                        JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
                    }
                }
            }
        }
    
        //-------------------------------------------------------------
    
        class ListenServersChangedJobListener extends AbstractJobListener {
            
            @Override
            protected void dataChanged(final String path, final Type eventType, final String data) {
                // 作业正在运行
                // 作业节点server变化 || 作业实例insatnces变化
                if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
                    // 需要重新分片
                    shardingService.setReshardingFlag();
                }
            }
            
            private boolean isInstanceChange(final Type eventType, final String path) {
                return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
            }
            
            private boolean isServerChange(final String path) {
                return serverNode.isServerPath(path);
            }
        }
    
    • 失效转移: FailoverListenerManager.start()
    class JobCrashedJobListener extends AbstractJobListener {
            
            @Override
            protected void dataChanged(final String path, final Type eventType, final String data) {
                // 开启失败转移
                // 删除类型
                // ${namespace}/${job-name}/instances 作业实例无效
                if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
                    // 获得无效作业实例id
                    String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
                    // 相同就算了
                    if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
                        return;
                    }
                    // 获得无效实例的需要执行的失效转移分片
                    List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
                    if (!failoverItems.isEmpty()) {
                        for (int each : failoverItems) {
                            // 添加leader下
                            failoverService.setCrashedFailoverFlag(each);
                            // 设置当前实例执行失效转移分片
                            failoverService.failoverIfNecessary();
                        }
                    } else {
                        for (int each : shardingService.getShardingItems(jobInstanceId)) {
                            failoverService.setCrashedFailoverFlag(each);
                            failoverService.failoverIfNecessary();
                        }
                    }
                }
            }
        }
        
        class FailoverSettingsChangedJobListener extends AbstractJobListener {
            
            @Override
            protected void dataChanged(final String path, final Type eventType, final String data) {
                // 不需要进行失效转移了
                // 删除${namespace}/${job-name}/sharding/${item}/failover节点
                if (configNode.isConfigPath(path) && Type.NODE_UPDATED == eventType && !LiteJobConfigurationGsonFactory.fromJson(data).isFailover()) {
                    failoverService.removeFailoverInfo();
                }
            }
        }
    
    • 其他不做介绍了

    4.2 失效转移

    运行中的某一作业节点崩溃,崩溃节点的分片会被分配到其他存活作业节点执行。执行崩溃节点分片,不会重新分片,只会在下次作业重新启动时重新分片。
    
    • job-node1、job-node2、job-node3, 9个分片
    1. job-node1失效, job-node2、job-node3的JobCrashedJobListener 监听到
    2. job-node2或job-node3会优先添加job-node1的分片到${namespace}/${job-name}/leader/failover/${item},不存在在添加${namespace}/${job-name}/sharding下面的分片
    • 因为只有在节点执行完${namespace}/${job-name}/sharding下面的分片, 才会添加${namespace}/${job-name}/leader/failover/${item}失效分片
       public void failoverIfNecessary() {
            if (needFailover()) {
                jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
            }
        }
        
        private boolean needFailover() {
                   // 失效后failoverService.setCrashedFailoverFlag(each)添加了失效分片
            return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()
                    // 当前作业运行完${namespace}/${job-name}/sharding下面的分片
                    && !JobRegistry.getInstance().isJobRunning(jobName);
        }
    
    1. leader(相当于分布式锁), 领取一个失效分片执行
    • FailoverLeaderExecutionCallback.execute
    int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
                
    
    1. job-node2和job-node3分别领取一个失败转移的分片, 还会剩下一个
    2. 等待下次重新启动任务, 3.4获得剩余分片执行
    jobFacade.failoverIfNecessary();
    

    相关文章

      网友评论

          本文标题:elastic job

          本文链接:https://www.haomeiwen.com/subject/jeqcjftx.html