美文网首页
ElasticSearch 副分片提升

ElasticSearch 副分片提升

作者: persisting_ | 来源:发表于2019-01-05 14:58 被阅读0次

    1 概述

    ElasticSearch Master、Node故障检测说到,Master节点会定时Ping其他非主节点,如果其他的节点发生故障时,会在集群中移除该节点,如果该节点持有分片,那么可能会涉及到副分片提升,本文就介绍副分片如何提升(本文主要介绍Node故障检测之后的副分片提升,其他情形的副分片提升调用可以参考RoutingNodes.promoteReplicaToPrimary的调用轨迹,看其在哪些情景下会被触发)

    2 节点故障后触发副分片的提升

    2.1 节点故障造成节点移除

    ZenDiscovery中主节点定时Ping其他非主节点的类为NodesFaultDetectionZenDiscovery会在构造函数中为NodesFaultDetection注册处理节点故障的Listener类NodeFaultDetectionListener,具体的请参考文章ElasticSearch Master、Node故障检测中的介绍,这里我们直接看如果主节点检测到节点故障会调用ZenDiscovery.handleNodeFailure

    //ZenDiscovery
    private void handleNodeFailure(final DiscoveryNode node, final String reason) {
        if (lifecycleState() != Lifecycle.State.STARTED) {
            // not started, ignore a node failure
            return;
        }
        if (!localNodeMaster()) {
            // nothing to do here...
            return;
        }
        //移除发生故障的节点
        removeNode(node, "zen-disco-node-failed", reason);
    }
    
    private void removeNode(final DiscoveryNode node, final String source, final String reason) {
        //向MasterService提交一个集群状态变更Task,
        //NodeRemovalClusterStateTaskExecutor便是副分片提升的关键
        //nodeRemovalExecutor就是NodeRemovalClusterStateTaskExecutor实例
        masterService.submitStateUpdateTask(
                source + "(" + node + "), reason(" + reason + ")",
                new NodeRemovalClusterStateTaskExecutor.Task(node, reason),
                ClusterStateTaskConfig.build(Priority.IMMEDIATE),
                nodeRemovalExecutor,
                nodeRemovalExecutor);
    }
    

    提交到MasterService中的集群状态变更任务会被执行,
    NodeRemovalClusterStateTaskExecutorZenDiscovery的一个静态内部类。

    2.2 NodeRemovalClusterStateTaskExecutor触发副分片提升

    下面看NodeRemovalClusterStateTaskExecutor.execute方法,该方法中会触发副分片提升:

    //ZenDiscovery.NodeRemovalClusterStateTaskExecutor
    @Override
    public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {
        final DiscoveryNodes.Builder remainingNodesBuilder = DiscoveryNodes.builder(currentState.nodes());
        //记录是否有节点被移除,因为有可能节点已经被之前的任务移除了
        boolean removed = false;
        for (final Task task : tasks) {
            //如果当前集群中有Task.node记录的节点,那么移除该节点,并
            //置removed为true
            if (currentState.nodes().nodeExists(task.node())) {
                //移除故障节点,后续使用remainingNodesBuilder生成移除故障节点
                //之后的集群状态。
                remainingNodesBuilder.remove(task.node());
                removed = true;
            } else {
                logger.debug("node [{}] does not exist in cluster state, ignoring", task);
            }
        }
        //如果没有移除,则集群状态没有变化,直接返回当前集群状态
        if (!removed) {
            // no nodes to remove, keep the current cluster state
            return ClusterTasksResult.<Task>builder().successes(tasks).build(currentState);
        }
    
        //生成移除故障节点之后的状态
        final ClusterState remainingNodesClusterState = remainingNodesClusterState(currentState, remainingNodesBuilder);
    
        final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);
        //如果已经没有进行主节点选举的最小备选节点数目,则触发一次重新选举
        //选举之后会触发一次reroute进行分配,这里不进行介绍
        if (electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes()) == false) {
            final int masterNodes = electMasterService.countMasterNodes(remainingNodesClusterState.nodes());
            rejoin.accept(LoggerMessageFormat.format("not enough master nodes (has [{}], but needed [{}])",
                                                        masterNodes, electMasterService.minimumMasterNodes()));
            return resultBuilder.build(currentState);
        } else {
            //如果当前可当选为主节点的节点个数足够,则调用allocationService.deassociateDeadNodes
            //进行处理,该方法中会触发副分片提升操作
            return resultBuilder.build(allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks)));
        }
    }
    

    3 副分片提升流程

    副分片提升是在AllocationService.deassociateDeadNodes方法中进行的:

    //AllocationService
    //下面的注释写的很清楚,将被移除节点上的所有的分片置为为分配状态,
    //此过程中可能会触发副分片提升
    /**
    * unassigned an shards that are associated with nodes that are no longer part of the cluster, potentially promoting replicas
    * if needed.
    */
    public ClusterState deassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason) {
        RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
        // shuffle the unassigned nodes, just so we won't have things like poison failed shards
        //对未分配的Shard进行一次shuffle
        routingNodes.unassigned().shuffle();
        RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
            clusterInfoService.getClusterInfo(), currentNanoTime());
    
        // first, clear from the shards any node id they used to belong to that is now dead
        //对所有被移除节点上的shard进行处理
        deassociateDeadNodes(allocation);
    
        if (allocation.routingNodesChanged()) {
            clusterState = buildResult(clusterState, allocation);
        }
        //主节点移除非主节点时,reroute会传true,会触发一次reroute操作
        if (reroute) {
            return reroute(clusterState, reason);
        } else {
            return clusterState;
        }
    }
    

    下面我们重点介绍deassociateDeadNodes,该函数是故障处理或者说分片提升的关键:

    //AllocationService
    private void deassociateDeadNodes(RoutingAllocation allocation) {
        for (Iterator<RoutingNode> it = allocation.routingNodes().mutableIterator(); it.hasNext(); ) {
            //当前待处理节点
            RoutingNode node = it.next();
            //如果当前节点列表还包含该节点,表示该节点未移除,
            //则不对分配在该节点上的shard进行处理
            if (allocation.nodes().getDataNodes().containsKey(node.nodeId())) {
                // its a live node, continue
                continue;
            }
            // now, go over all the shards routing on the node, and fail them
            //如果当前节点列表已经没有了该节点,则表示该节点被移除,所以对
            //分配在该节点上的所有shard进行处理
            for (ShardRouting shardRouting : node.copyShards()) {
                final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
                //index.unassigned.node_left.delayed_timeout配置当一个节点故障后
                //分配给该节点的Shard是否需要延迟分配
                boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0;
                UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]",
                    null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT);
                allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData, allocation.changes());
            }
            // its a dead node, remove it, note, its important to remove it *after* we apply failed shard
            // since it relies on the fact that the RoutingNode exists in the list of nodes
            it.remove();
        }
    }
    

    deassociateDeadNodes的函数定义可知,ES对分配在当前移除的故障节点上的所有Shard通过调用allocation.routingNodes().failShard进行处理:

    //RoutingNodes
    /**
    * Applies the relevant logic to handle a cancelled or failed shard.
    *
    * Moves the shard to unassigned or completely removes the shard (if relocation target).
    *
    * - If shard is a primary, this also fails initializing replicas.
    * - If shard is an active primary, this also promotes an active replica to primary (if such a replica exists).
    * - If shard is a relocating primary, this also removes the primary relocation target shard.
    * - If shard is a relocating replica, this promotes the replica relocation target to a full initializing replica, removing the
    *   relocation source information. This is possible as peer recovery is always done from the primary.
    * - If shard is a (primary or replica) relocation target, this also clears the relocation information on the source shard.
    *
    */
    public void failShard(Logger logger, ShardRouting failedShard, UnassignedInfo unassignedInfo, IndexMetaData indexMetaData,
                            RoutingChangesObserver routingChangesObserver) {
        ensureMutable();
        assert failedShard.assignedToNode() : "only assigned shards can be failed";
        assert indexMetaData.getIndex().equals(failedShard.index()) :
            "shard failed for unknown index (shard entry: " + failedShard + ")";
        assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId()) == failedShard :
            "shard routing to fail does not exist in routing table, expected: " + failedShard + " but was: " +
                getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId());
    
        logger.debug("{} failing shard {} with unassigned info ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
    
        // if this is a primary, fail initializing replicas first (otherwise we move RoutingNodes into an inconsistent state)
        //如果该Shard是主分片,则对该分片的所有INITIALIZING状态副分片,递归调用failShard进行处理
        if (failedShard.primary()) {
            //获取该分片的所有副分片,这其中包含主分片
            List<ShardRouting> assignedShards = assignedShards(failedShard.shardId());
            if (assignedShards.isEmpty() == false) {
                // copy list to prevent ConcurrentModificationException
                for (ShardRouting routing : new ArrayList<>(assignedShards)) {
                    //对所有非主且为INITIALIZING状态的分片,递归调用failShard进行处理
                    if (!routing.primary() && routing.initializing()) {
                        // re-resolve replica as earlier iteration could have changed source/target of replica relocation
                        ShardRouting replicaShard = getByAllocationId(routing.shardId(), routing.allocationId().getId());
                        assert replicaShard != null : "failed to re-resolve " + routing + " when failing replicas";
                        UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED,
                            "primary failed while replica initializing", null, 0, unassignedInfo.getUnassignedTimeInNanos(),
                            unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT);
                        failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetaData, routingChangesObserver);
                    }
                }
            }
        }
        //如果该分片正在relocate,进行如下处理,这里不介绍
        if (failedShard.relocating()) {
            // find the shard that is initializing on the target node
            ShardRouting targetShard = getByAllocationId(failedShard.shardId(), failedShard.allocationId().getRelocationId());
            assert targetShard.isRelocationTargetOf(failedShard);
            if (failedShard.primary()) {
                logger.trace("{} is removed due to the failure/cancellation of the source shard", targetShard);
                // cancel and remove target shard
                remove(targetShard);
                routingChangesObserver.shardFailed(targetShard, unassignedInfo);
            } else {
                logger.trace("{}, relocation source failed / cancelled, mark as initializing without relocation source", targetShard);
                // promote to initializing shard without relocation source and ensure that removed relocation source
                // is not added back as unassigned shard
                removeRelocationSource(targetShard);
                routingChangesObserver.relocationSourceRemoved(targetShard);
            }
        }
    
        // fail actual shard
        //如果该分片处于INITIALIZING状态
        if (failedShard.initializing()) {
            //不是进行relocate
            if (failedShard.relocatingNodeId() == null) {
                //如果该分片是主分片,即主分片发生故障,则尝试进行副分片提升
                if (failedShard.primary()) {
                    // promote active replica to primary if active replica exists (only the case for shadow replicas)
                    //获取具有最大版本号(最新)且为active状态的副分片
                    ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
                    //如果没有获取到active状态的副分片,则将该故障分片加入到未分配分片列表中记录
                    if (activeReplica == null) {
                        moveToUnassigned(failedShard, unassignedInfo);
                    } else {
                        //如果找到了具有最大版本号且为active状态的分片,则将主分片放到未分配分片列表中,并将其降级为副分片
                        //降级过程不再展开,就是从RoutingNodes已分配shard中移除该分片,并加入到未分配分片列表中
                        movePrimaryToUnassignedAndDemoteToReplica(failedShard, unassignedInfo);
                        //提升过程可以自行查看该函数具体实现,主要就是修改具有最大版本号的active分片为为主分片
                        promoteReplicaToPrimary(activeReplica, indexMetaData, routingChangesObserver);
                    }
                } else {
                    //如果该分片不是主分片,则直接将其移动到未分配分片列表中即可
                    // initializing shard that is not relocation target, just move to unassigned
                    moveToUnassigned(failedShard, unassignedInfo);
                }
            } else {
                //如果是relocate,则取消relocate过程
                // The shard is a target of a relocating shard. In that case we only need to remove the target shard and cancel the source
                // relocation. No shard is left unassigned
                logger.trace("{} is a relocation target, resolving source to cancel relocation ({})", failedShard,
                    unassignedInfo.shortSummary());
                ShardRouting sourceShard = getByAllocationId(failedShard.shardId(),
                    failedShard.allocationId().getRelocationId());
                assert sourceShard.isRelocationSourceOf(failedShard);
                logger.trace("{}, resolved source to [{}]. canceling relocation ... ({})", failedShard.shardId(), sourceShard,
                    unassignedInfo.shortSummary());
                cancelRelocation(sourceShard);
                remove(failedShard);
            }
            routingChangesObserver.shardFailed(failedShard, unassignedInfo);
        } else {
            //如果该分片不是INITIALIZING状态,和上面过程类似,不再介绍
            assert failedShard.active();
            if (failedShard.primary()) {
                // promote active replica to primary if active replica exists
                ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
                if (activeReplica == null) {
                    moveToUnassigned(failedShard, unassignedInfo);
                } else {
                    movePrimaryToUnassignedAndDemoteToReplica(failedShard, unassignedInfo);
                    promoteReplicaToPrimary(activeReplica, indexMetaData, routingChangesObserver);
                }
            } else {
                assert failedShard.primary() == false;
                if (failedShard.relocating()) {
                    remove(failedShard);
                } else {
                    moveToUnassigned(failedShard, unassignedInfo);
                }
            }
            routingChangesObserver.shardFailed(failedShard, unassignedInfo);
        }
        assert node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) == null : "failedShard " + failedShard +
            " was matched but wasn't removed";
    }
    

    介绍了上述内容,现在回到deassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason),该方法在经过了上述过程之后,会进行一次reroute操作,也就是重新进行一次分配,reroute方法会返回分配之后最新的ClusterState,具体过程会在后面有专文介绍。

    再回过头看下ZenDiscovery.NodeRemovalClusterStateTaskExecutor.execute,该方法在上面已经列出源码,其中有段代码为

    //ZenDiscovery.NodeRemovalClusterStateTaskExecutor.execute
    return resultBuilder.build(allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks)));
    

    这段代码就是返回移除节点操作(操作包括置分片为未分配状态、副分片提升、reroute等)之后的ClusterState,该状态会被publish到集群中其他节点进行状态同步。

    4 总结

    上述过程中有一些操作比如副分片提升没有展开具体介绍,因为过程比较简单,首先要知道RoutingNodes记录了分片、节点之间的分配关系,提升、移除等操作就是在RoutingNodes更新这些关系。更新完之后进行rerote,然后返回最新的ClusterStatepublish到集群中的所有节点进行集群状态同步。

    相关文章

      网友评论

          本文标题:ElasticSearch 副分片提升

          本文链接:https://www.haomeiwen.com/subject/yagrrqtx.html