美文网首页
Elastic job ShardingService 分片服务

Elastic job ShardingService 分片服务

作者: pcgreat | 来源:发表于2018-08-14 11:42 被阅读26次

    ShardingService 是es job 分片服务类 , 这里重点介绍该类一些重要方法

    setReshardingFlag
    ...
    /**
    * 设置需要重新分片的标记.
    */
    public void setReshardingFlag() {
    jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY);
    }
    ...
    如果 job 存在 , 而leader/sharding/necessary 不存在 ,会创建永久节点 , 设置重分片flag (leader/sharding/necessary 节点)

    setReshardingFlag 被下图中方法调用

    image.png

    ShardingListenerManager.ShardingTotalCountChangedJobListener 监听分片数量变更 , 如果变更了 ,则 调用 setReshardingFlag 设置重分片flag

    ShardingListenerManager.ListenServersChangedJobListener 在job 没有shutdown情况下 , 监听 zk instances 下 节点 非 NODE_UPDATED 事件变更 ,以及 servers下 节点 事件变更 , 调用 setReshardingFlag 设置重分片flag

    SchedulerFacade. registerStartUpInfo(boolean) 实例启动时 设置重分片

    ReconcileService runOneIteration 方法 每分钟调度一次 , 判断自己是否leader , 判断 是否分片完成 ,判断分片 所在节点是否离线 (instances临时节点 和 分片节点(分片节点永久)对比),设置重分片

    shardingIfNecessary 分片方法

     /**
         * 如果需要分片且当前节点为主节点, 则作业分片.
         * 
         * <p>
         * 如果当前无可用节点则不分片.
         * </p>
         */
        public void shardingIfNecessary() {
            List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
            if (!isNeedSharding() || availableJobInstances.isEmpty()) {
                return;
            }
            if (!leaderService.isLeaderUntilBlock()) {
                blockUntilShardingCompleted();
                return;
            }
            waitingOtherShardingItemCompleted();
            LiteJobConfiguration liteJobConfig = configService.load(false);
            int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
            log.debug("Job '{}' sharding begin.", jobName);
            jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
            resetShardingInfo(shardingTotalCount);
            JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());
            jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
            log.debug("Job '{}' sharding complete.", jobName);
        }
    
    

    在需要分片(leader/sharding/necessary存在 ),合适instances 不为空 , 非leader 节点 阻塞至分片完成然后返回 ,leader节点 会等待正在处理分片完成 。 会在leader/sharding/processing 创建正在处理节点 ,清理分片对应旧实例 ,有必要会创建新分片或者减少分片 。 根据分片策略,将分片分给实例 ,最后 去掉 leader/sharding/processing 和 leader/sharding/necessary ,唤醒其余实例

    shardingIfNecessary被下图中方法调用


    image.png

    LiteJobFacade getShardingContexts() 方法

        public ShardingContexts getShardingContexts() {
            boolean isFailover = configService.load(true).isFailover();
            if (isFailover) {
                List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
                if (!failoverShardingItems.isEmpty()) {
                    return executionContextService.getJobShardingContext(failoverShardingItems);
                }
            }
            shardingService.shardingIfNecessary();
            List<Integer> shardingItems = shardingService.getLocalShardingItems();
            if (isFailover) {
                shardingItems.removeAll(failoverService.getLocalTakeOffItems());
            }
            shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
            return executionContextService.getJobShardingContext(shardingItems);
        }
    

    isFailover 失败补偿 ,默认情况是false , 比如说某一实例 某些分片失败了 ,那么 isFailover 为true的情况下 ,会分配给其他存活实例 , 这种情况暂时不分析(线上并没有使用) ,getShardingContexts 会在job 执行时候被调用 , 不考虑 isFailover ,首先 shardingIfNecessary ,然后 remove 掉disable分片 ,返回分片上下文

    如下图 192.168.200.151@-@10576 拥有 0 分片 , 新创建个实例,10576实例在执行job时候 会调用 getShardingContexts 方法 , 这时候 0分片 被分配给新的实例192.168.200.151@-@10634

    [zk: 127.0.0.1:2181(CONNECTED) 18] get /elastic-job-example-lite-java/javaSimpleJob/sharding/0/instance
    192.168.200.151@-@10576
    
    [zk: 127.0.0.1:2181(CONNECTED) 19] get /elastic-job-example-lite-java/javaSimpleJob/sharding/0/instance
    192.168.200.151@-@10634
    
    

    相关文章

      网友评论

          本文标题:Elastic job ShardingService 分片服务

          本文链接:https://www.haomeiwen.com/subject/opwmbftx.html