美文网首页
ElasticJob故障转移机制

ElasticJob故障转移机制

作者: 圣村的希望 | 来源:发表于2020-10-06 18:20 被阅读0次

    在ElasticJob中,会把一个任务分成多个分片,然后再把分片分配给集群中不同的节点实例进行作业任务的执行。但是如果集群中的某几台机器宕机,这些分片任务的执行就需要转移到其它正常节点机器进行继续执行分片任务,这就是任务分片的故障转移。在ElasticJob中有对应的节点故障转移的功能,我们在任务配置的时候配置failover参数即可。下面看下故障转移功能在ElasticJob中的实现流程:

FailoverListenerManager:这是故障转移功能开始的地方
@Override
    public void start() {
        addDataListener(new JobCrashedJobListener());
        addDataListener(new FailoverSettingsChangedJobListener());
    }

     FailoverListenerManager是故障转移监听管理器,在ElasticJob启动的时候,他启动了2个监听器JobCrashedJobListener(集群节点宕机监听器)和FailoverSettingsChangedJobListener(故障转移修改监听器),在集群中的节点发生变化的时候能立即监听到,进而能及时做故障转移操作。

class JobCrashedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            //TODO 当前job运行实例没有关闭、并且config配置开启了故障转移功能、并且是节点删除事件、并且path路径是{jobName}/instances开头的
            if (!JobRegistry.getInstance().isShutdown(jobName) && isFailoverEnabled() && Type.NODE_DELETED == eventType && instanceNode.isInstancePath(path)) {
                //TODO 获取当前被删除的job节点实例id
                String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
                //TODO 被删除节点实例id是否为当前节点
                if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
                    return;
                }
                //TODO 获取故障转移到当前节点的分片信息,初始肯定为空
                List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
                if (!failoverItems.isEmpty()) {
                    for (int each : failoverItems) {
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                } else {
                    //TODO 获取当前删除节点对应的初始分片信息
                    for (int each : shardingService.getShardingItems(jobInstanceId)) {
                        //TODO 设置故障转移标记,其实就是创建了/leader/failover/items/{item}持久节点
                        failoverService.setCrashedFailoverFlag(each);
                        //TODO 进行故障转移
                        failoverService.failoverIfNecessary();
                    }
                }
            }
        }
    }
  • 通过zookeeper的watcher机制,ElasticJob能够感知监听到作业下集群节点删除事件。
  • 首先判断当前job作业实例没有关闭,并且开启了故障转移功能,并且当前是节点删除事件。
  • 获取被删除的job节点实例id,如果当前节点是被删除节点实例,忽略本次故障转移处理。
  • 获取节点的所有故障转移分片信息,这里是先获取该作业下的所有/{namespace}/sharding/子节点信息,然后遍历所有分片,获取已经发生故障转移的分片信息(就是获取/sharding/{item}/failover节点信息),然后比对该节点对应的作业实例是否为当前作业实例。
//TODO 获取故障转移到当前节点的分片信息,初始肯定为空
List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);

public List<Integer> getFailoverItems(final String jobInstanceId) {
        //TODO 获取当前{nameSpace}/sharding分片信息
        List<String> items = jobNodeStorage.getJobNodeChildrenKeys(ShardingNode.ROOT);
        List<Integer> result = new ArrayList<>(items.size());
        for (String each : items) {
            int item = Integer.parseInt(each);
            //TODO node=[sharding/{item}/failover]
            String node = FailoverNode.getExecutionFailoverNode(item);
            //TODO 查看node节点是否存在,并且job实例id是否为转移到当前节点
            if (jobNodeStorage.isJobNodeExisted(node) && jobInstanceId.equals(jobNodeStorage.getJobNodeDataDirectly(node))) {
                result.add(item);
            }
        }
        //TODO 分片做下排序
        Collections.sort(result);
        return result;
    }
  • 初次执行故障转移的话,该节点对应的故障转移分片信息肯定为空。然后就是获取分配该故障删除节点的所有分片信息,一个个遍历执行故障转移操作
//TODO 获取当前删除节点对应的初始分片信息
for (int each : shardingService.getShardingItems(jobInstanceId)) {
    //TODO 设置故障转移标记,其实就是创建了/leader/failover/items/{item}持久节点
    failoverService.setCrashedFailoverFlag(each);
    //TODO 进行故障转移
    failoverService.failoverIfNecessary();
}
  • 遍历一个故障节点分片信息,就把他设置为需要故障转移分片,其实就是在zookeeper下面创建一个/leader/failover/items/{item}节点,表示当前分片需要故障转移。在创建该节点前需要先判断当前节点是否已经执行了故障转移(判断/sharding/{item}/failover是否存在)
//TODO 设置故障转移标记,其实就是创建了/leader/failover/items/{item}持久节点
failoverService.setCrashedFailoverFlag(each);

public void setCrashedFailoverFlag(final int item) {
        //TODO 查看当前分片是否执行了故障转移 /sharding/{item}/failover
        if (!isFailoverAssigned(item)) {
            //TODO 创建需要故障转移节点标记 /leader/failover/items/{item}
            jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item));
        }
    }

private boolean isFailoverAssigned(final Integer item) {
        //TODO 判断/sharding/{item}/failover节点是否存在
        return jobNodeStorage.isJobNodeExisted(FailoverNode.getExecutionFailoverNode(item));
    }
  • 执行当前遍历节点的故障转移操作。
public void failoverIfNecessary() {
        //TODO 是否进行故障转移 /leader/failover/items节点存在并且下面存在子节点信息、并且当前job非运行状态
        if (needFailover()) {
            //TODO 获取故障转移分布式锁/leader/failover/latch,在主节点中进行故障转移
            jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
        }
    }

class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
        
        @Override
        public void execute() {
            if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
                return;
            }
            //TODO 获取节点/leader/failover/items/下的一个子节点信息
            int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
            log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
            //TODO 创建临时节点/sharding/{item}/failover ,value为jobInstanceId
            jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
            //TODO 删除/leader/failover/items/{item}节点
            jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
            // TODO Instead of using triggerJob, use executor for unified scheduling
            JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
            if (null != jobScheduleController) {
                //TODO 触发下任务调度
                jobScheduleController.triggerJob();
            }
        }
    }

1、首先判断当前分片是否能进行故障转移。判断/leader/failover/items节点存在并且存在子节点信息,并且当前job非运行状态。这里解释了一个疑惑,就是为什么作业执行完之后需要在手动调用一下故障转移操作,就是因为在前一次执行故障转移的时候有分片任务正在执行,导致故障转移操作没有执行,所有在分片任务执行完之后再手动执行下故障转移。
2、获取故障转移分布式锁/leader/failover/latch,获取锁的节点即可往下执行当前分片的故障转移操作。
3、执行故障转移操作是在一个zookeeper事务中执行,它通过LeaderExecutionCallback回调完成故障转移操作,在这个事务中,所有的操作要么都执行要么都不执行。
4、获取节点/leader/failover/items/下的一个子节点信息
5、创建临时节点/sharding/{item}/failover,value为jobInstanceId
6、删除/leader/failover/items/{item}节点
7、手动触发一下任务的调度,防止下次下次周期任务没有及时到来,没有即时执行故障节点的任务执行。

以上是执行了故障节点的分片转移,但是对应的故障分片的任务没有被执行,这个是在任务调度执行的时候触发。
    /**
     * 执行作业.
     */
    public final void execute() {
        //...

        ShardingContexts shardingContexts = jobFacade.getShardingContexts();
        
        //...
    }

    @Override
    public ShardingContexts getShardingContexts() {
        //...
        boolean isFailover = configService.load(true).isFailover();
        if (isFailover) {
            //获取分配给当前作业实例的故障转移分片,遍历作业的所有分片信息,获取/sharding/{item}/failover
            List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
            if (!failoverShardingItems.isEmpty()) {
                return executionContextService.getJobShardingContext(failoverShardingItems);
            }
        }
    }

    在作业执行流程中获取分片信息的时候,如果开启了故障转移,本次作业的执行,会去优先执行故障转移到当前节点的分片任务。

相关文章

  • ElasticJob故障转移机制

    在ElasticJob中,会把一个任务分成多个分片,然后再把分片分配给集群中不同的节点实例进行作业任务的执行。但是...

  • 分布式术语

    容错机制 FAILOVER:故障转移,故障出现时重试其他机器,用于读操作。重试会带来延迟。 FAILBACK: 故...

  • ElasticJob分片机制

    ElasticJob是一个弹性的分布式任务调度框架,这里的分布式就是采用分片的来进行任务调度和业务执行的解耦,分片...

  • ElasticJob幂等机制

    ElasticJob的幂等机制,是指作业分片执行的幂等,他需要做到以下两点: 同一个分片在当前作业实例上不会被重复...

  • 故障转移集群的仲裁

    转载自我的博客《故障转移集群的仲裁 》 Windows服务器故障转移集群(WindowsServerFailove...

  • 仲裁盘的另一种用法:高可用集群故障转移

    转自:集群故障转移的仲裁 Windows服务器故障转移集群(Windows Server Failover Clu...

  • elastic job源码分析 - 失效转移监听管理器

    elastic job服务启动时会通过失效转移监听管理器io.elasticjob.lite.internal.f...

  • redis系列之sentinel的故障转移

    故障转移 接着上章构建的sentinel网络构建后分析sentinel的故障转移。sentinel本身做为redi...

  • 故障检测和故障转移

    故障检测 集群中的每个节点都会定期地向集群中的其他节点发送PING消息,以此来检测对方是否在线,如果接收PING消...

  • elasticJob 源码解析之失效转移FailOver

    失效转移: 运行中的作业服务器崩溃不会导致重新分片,只会在下次作业启动时分片。启用失效转移功能可以在本次作业执行过...

网友评论

      本文标题:ElasticJob故障转移机制

      本文链接:https://www.haomeiwen.com/subject/xbyauktx.html