美文网首页Java 杂谈
elastic job源码分析 - 失效转移监听管理器

elastic job源码分析 - 失效转移监听管理器

作者: 耶也夜 | 来源:发表于2018-09-05 14:53 被阅读116次

    elastic job服务启动时会通过失效转移监听管理器io.elasticjob.lite.internal.failover.FailoverListenerManager启动
    实例下线监听器和失效转移设置改变监听器,它们会对满足的事件通知做失效转移处理。

    public void start() {
        addDataListener(new JobCrashedJobListener());
        addDataListener(new FailoverSettingsChangedJobListener());
    }
    

    实例下线监听器

    实例下线监听器io.elasticjob.lite.internal.failover.FailoverListenerManager.JobCrashedJobListener会监听作业运行实例节点instances的子节点的变化(NODE_REMOVED)。

    // 满足该条件时后续逻辑才会被执行
    isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)
    

    作业运行实例子节点为临时节点,实例丢失(下线)时,临时节点会自动被清除。此时,其他实例可以接收到通知。

    1、忽略处理自己的下线通知

    if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
        return;
    }
    

    2、处理下线实例分配的失效转移分片

    List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
    

    当监听器收到有实例下线时(可以出现意外,也可能主动下线),会获取下线实例已分配的失效转移分片项集合(可能接管了其他实例的失效转移分片),如果失效转移分片集合不为空,则在新增节点leader/failover/items/分片项

    leader/failover/items/分片项
    一旦有作业崩溃,则会向此节点记录。当有空闲作业服务器时,会从此节点抓取需失效转移的作业项

    3、处理下线实例分配的分片项

    for (int each : shardingService.getShardingItems(jobInstanceId)) {
        failoverService.setCrashedFailoverFlag(each);
        failoverService.failoverIfNecessary();
    }
    

    如果下线的运行实例未接管失效转移分片,则获取该作业实例的运行分片项。针对每一个分片项新增节点leader/failover/items/分片项

    上述每一个分片项在新增表示需要失效转移的节点时,当前实例会立即发起选举,确定由谁(存活的实例)立即执行该分片。

    leader/failover/items/latch 分配失效转移分片项时占用的分布式锁,为curator的分布式锁使用。

    选举结束后,获胜的实例会在节点sharding/分片项/failover中设置值未当前实例ID,然后删除leader/failover/items/分片项(表示已有实例负责该失效转移分片,不再需要继续处理),并立即执行该分片的任务调度。

    失效转移设置改变监听器

    zk任务配置修改为非失效转移时,执行删除作业失效转移信息。即删除节点sharding/分片项/failover(所有分片项下的子节点failover均会被删除).

    failoverService.removeFailoverInfo();

    相关文章

      网友评论

        本文标题:elastic job源码分析 - 失效转移监听管理器

        本文链接:https://www.haomeiwen.com/subject/ipuawftx.html