美文网首页
ElasticJob分片机制

ElasticJob分片机制

作者: 圣村的希望 | 来源:发表于2020-10-06 18:20 被阅读0次

        ElasticJob是一个弹性的分布式任务调度框架,这里的分布式就是采用分片的来进行任务调度和业务执行的解耦,分片信息就是中间进行解耦的。ElasticJob任务调度框架调度触发执行的是分片,然后业务可以在框架触发对应分片信息的时候,增加自己业务的处理。分片这个思想挺不错的,把任务调度框架和实际业务解耦的相当好。

    ShardingListenerManager:这个是分片监听的开始
    @Override
        public void start() {
            addDataListener(new ShardingTotalCountChangedJobListener());
            addDataListener(new ListenServersChangedJobListener());
        }
    

         ShardingListenerManager分片管理监听器在ElasticJob启动的就是开启了监听,这里是开启了2个监听器ShardingTotalCountChangedJobListener(分片节点总数变化监听器)和ListenServersChangedJobListener(服务器改变监听器)。

    ShardingTotalCountChangedJobListener:分片总数变化监听器
    class ShardingTotalCountChangedJobListener extends AbstractJobListener {
            
            @Override
            protected void dataChanged(final String path, final Type eventType, final String data) {
                if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
                    int newShardingTotalCount = YamlEngine.unmarshal(data, JobConfigurationPOJO.class).toJobConfiguration().getShardingTotalCount();
                    if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
                        shardingService.setReshardingFlag();
                        JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
                    }
                }
            }
        }
    

        获取zookeeper下发的分片个数变化事件的通知,判断新分片数和原分片数是否相等,不相等的话设置需要重新分片的标记,创建/leader/sharding/neccessary持久节点。

    ListenServersChangedJobListener:服务器状态变更监听器
    class ListenServersChangedJobListener extends AbstractJobListener {
            
            @Override
            protected void dataChanged(final String path, final Type eventType, final String data) {
                if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
                    shardingService.setReshardingFlag();
                }
            }
            
            private boolean isInstanceChange(final Type eventType, final String path) {
                return instanceNode.isInstancePath(path) && Type.NODE_CHANGED != eventType;
            }
            
            private boolean isServerChange(final String path) {
                return serverNode.isServerPath(path);
            }
        }
    

        instance和servers节点下有子节点变化会被监听到,这个时候也会去设置下需要重新分片的标记/leader/sharding/neccessary节点。

        上面是触发生成了需要重新分片的标记,具体分片的执行时在任务执行的过程中。在作业任务执行的时候需要获取分片信息,这个时候会完成重新分片的执行。

        public final void execute() {
            //...
            //获取当前作业服务器的分片上下文信息
            ShardingContexts shardingContexts = jobFacade.getShardingContexts();
    
            //...
        }
    
        @Override
        public ShardingContexts getShardingContexts() {
            boolean isFailover = configService.load(true).isFailover();
            if (isFailover) {
                //TODO 获取故障转移到当前节点的分片信息
                List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
                if (!failoverShardingItems.isEmpty()) {
                    //TODO 获取故障转移分片信息
                    return executionContextService.getJobShardingContext(failoverShardingItems);
                }
            }
            shardingService.shardingIfNecessary();
            List<Integer> shardingItems = shardingService.getLocalShardingItems();
            if (isFailover) {
                //TODO 删除本节点被故障转移的分片信息
                shardingItems.removeAll(failoverService.getLocalTakeOffItems());
            }
            shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
            return executionContextService.getJobShardingContext(shardingItems);
        }
    
    • 获取当前节点服务器的分片上下文信息,如果开启了故障转移机制,会优先获取故障转移到当前作业服务器的分片任务。
       shardingService.shardingIfNecessary();
    
        public void shardingIfNecessary() {
            List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();//TODO 获取job存活运行实例
            //不必要分片或作业运行实例为空,则不进行分片操作
            if (!isNeedSharding() || availableJobInstances.isEmpty()) {
                return;
            }
            //等待选举完成,非leader的话,等待完成分片
            if (!leaderService.isLeaderUntilBlock()) {
                blockUntilShardingCompleted();
                return;
            }
            waitingOtherShardingItemCompleted();
            JobConfiguration jobConfig = configService.load(false);
            int shardingTotalCount = jobConfig.getShardingTotalCount();
            log.debug("Job '{}' sharding begin.", jobName);
            jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");//临时processing节点,表示当前正在进行分片
            resetShardingInfo(shardingTotalCount);
            JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType());
            jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
            log.debug("Job '{}' sharding complete.", jobName);
        }
    
    • 如果需要就执行重新分片,这里是去判断/leader/sharding/neccessary节点是否存在,如果存在就需要执行重新分片操作。重新分片操作是只能在主服务器才能执行的,因为这个分片信息是需要所有作业集群统一。所以这里需要等待集群节点leader选举完成,然后从节点不会进行分片操作,从节点是阻塞等待主节点分片完成才退出。
    //等待当前任务所有分片执行完成才去执行分片操作,这里是防止分片任务被重复执行,一个幂等操作
    waitingOtherShardingItemCompleted();
    
    • 等待当前任务所有分片任务执行完成才去执行分片操作,这里是防止分片任务被其它作业服务器重复执行,一个幂等操作的处理。
    jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
    
    • 创建一个临时节点/leader/sharding/processing,表示正在执行重新分片操作。
    JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType());
    
    • 获取重新分片策略,这里是可以在配置作业任务的时候进行配置jobShardingStrategyType参数。
    jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
    
    jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount))
    
    • 首先是分片策略(默认是平均分配策略)根据当前的作业服务器实例和分片个数,来计算出每个作业实例获取到的对应的分片信息。然后是在一个zookeeper事务中去创建对应的节点信息。
    @RequiredArgsConstructor
        class PersistShardingInfoTransactionExecutionCallback implements TransactionExecutionCallback {
            
            private final Map<JobInstance, List<Integer>> shardingResults;
            
            @Override
            public List<CuratorOp> createCuratorOperators(final TransactionOp transactionOp) throws Exception {
                List<CuratorOp> result = new LinkedList<>();
                for (Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
                    for (int shardingItem : entry.getValue()) {
                        result.add(transactionOp.create().forPath(jobNodePath.getFullPath(ShardingNode.getInstanceNode(shardingItem)), entry.getKey().getJobInstanceId().getBytes()));
                    }
                }
                result.add(transactionOp.delete().forPath(jobNodePath.getFullPath(ShardingNode.NECESSARY)));
                result.add(transactionOp.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)));
                return result;
            }
        }
    
    • 在这个zookeeper事务中,根据每个作业服务器实例获取到的分片信息,创建一个路径为/sharding/{item}/instance,value为instanceId的节点信息,待所有分片信息对应点的zookeeper节点创建完成之后,会删除掉/leader/sharding/neccessary和/leader/sharding/processing子节点信息。

    相关文章

      网友评论

          本文标题:ElasticJob分片机制

          本文链接:https://www.haomeiwen.com/subject/wkxzuktx.html