美文网首页
elasticsearch shard split 分析(四)

elasticsearch shard split 分析(四)

作者: 华饼 | 来源:发表于2018-02-02 10:39 被阅读0次

    上篇文章中最后提到索引创建完毕后会生成一个集群变更事件,而该事件是通过clusterStatePublisher发送给集群中每个节点的。本文继续分析在shard split中,目标shard底层的数据是怎么恢复的。
    clusterStatePublisher是一个函数式接口对象,被设置为ZenDiscovery模块中的publish方法。

    ZenDiscovery

    public void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener) {
            ClusterState newState = clusterChangedEvent.state();
            assert newState.getNodes().isLocalNodeElectedMaster() : "Shouldn't publish state when not master " + clusterChangedEvent.source();
    
            // state got changed locally (maybe because another master published to us)
            if (clusterChangedEvent.previousState() != this.committedState.get()) {
                throw new FailedToCommitClusterStateException("state was mutated while calculating new CS update");
            }
    
            pendingStatesQueue.addPending(newState);
    
            try {
               //将本次集群状态更改事件发到其它节点,内部会等到至少minMaster个节点响应后才将这次更改标记为commited,同时会发送commited信息到其它节点。
                publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
            } catch (FailedToCommitClusterStateException t) {
                // cluster service logs a WARN message
                logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])",
                    newState.version(), electMaster.minimumMasterNodes());
    
                synchronized (stateMutex) {
                    pendingStatesQueue.failAllStatesAndClear(
                        new ElasticsearchException("failed to publish cluster state"));
    
                    rejoin("zen-disco-failed-to-publish");
                }
                throw t;
            }
            //执行到这里说明集群状态已经被commited了。
            final DiscoveryNode localNode = newState.getNodes().getLocalNode();
            final CountDownLatch latch = new CountDownLatch(1);
            final AtomicBoolean processedOrFailed = new AtomicBoolean();
            pendingStatesQueue.markAsCommitted(newState.stateUUID(),
                new PendingClusterStatesQueue.StateProcessedListener() {
                    @Override
                    public void onNewClusterStateProcessed() {
                        processedOrFailed.set(true);
                        latch.countDown();
                        ackListener.onNodeAck(localNode, null);
                    }
    
                    @Override
                    public void onNewClusterStateFailed(Exception e) {
                        processedOrFailed.set(true);
                        latch.countDown();
                        ackListener.onNodeAck(localNode, e);
                        logger.warn(
                            (org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage(
                                "failed while applying cluster state locally [{}]",
                                clusterChangedEvent.source()),
                            e);
                    }
                });
    
            synchronized (stateMutex) {
                if (clusterChangedEvent.previousState() != this.committedState.get()) {
                    throw new FailedToCommitClusterStateException("local state was mutated while CS update was published to other nodes");
                }
               //开始应用本次集群变更,这里只是发消息的节点处理本次集群变更,其它节点是收到commit信息后才会调用该函数处理本次集群变更。
                boolean sentToApplier = processNextCommittedClusterState("master " + newState.nodes().getMasterNode() +
                    " committed version [" + newState.version() + "] source [" + clusterChangedEvent.source() + "]");
                if (sentToApplier == false && processedOrFailed.get() == false) {
                    assert false : "cluster state published locally neither processed nor failed: " + newState;
                    logger.warn("cluster state with version [{}] that is published locally has neither been processed nor failed",
                        newState.version());
                    return;
                }
            }
            // indefinitely wait for cluster state to be applied locally
            try {
                latch.await();
            } catch (InterruptedException e) {
                logger.debug(
                    (org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage(
                        "interrupted while applying cluster state locally [{}]",
                        clusterChangedEvent.source()),
                    e);
                Thread.currentThread().interrupt();
            }
        }
    

    这里也不详细讲解集群状态怎么publish的。先简单介绍下,该函数会把新的集群状态发到集群中的除自身节点外的所有节点。当至少有minMaster节点响应后,将这次集群状态修改变为commited。同时会发送commit信息到其它节点。当其它节点收到commit信息后会processNextCommittedClusterState函数处理已经被commit的集群状态。在该函数中就是应用这次集群状态更改的地方,来保证底层数据和commited的集群状态一致。

    boolean processNextCommittedClusterState(String reason) {
            assert Thread.holdsLock(stateMutex);
    
           ...
           //前面主要是一些异常检测和状态检测,到这里才是应用集群状态的入口
            clusterApplier.onNewClusterState("apply cluster state (from master [" + reason + "])",
                this::clusterState,
                new ClusterStateTaskListener() {
                    @Override
                    public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                        try {
                            pendingStatesQueue.markAsProcessed(newClusterState);
                        } catch (Exception e) {
                            onFailure(source, e);
                        }
                    }
    
                    @Override
                    public void onFailure(String source, Exception e) {
                        logger.error((Supplier<?>) () -> new ParameterizedMessage("unexpected failure applying [{}]", reason), e);
                        try {
                            // TODO: use cluster state uuid instead of full cluster state so that we don't keep reference to CS around
                            // for too long.
                            pendingStatesQueue.markAsFailed(newClusterState, e);
                        } catch (Exception inner) {
                            inner.addSuppressed(e);
                            logger.error((Supplier<?>) () -> new ParameterizedMessage("unexpected exception while failing [{}]", reason), inner);
                        }
                    }
                });
    
            return true;
        }
    

    这里省略了一些异常控制代码,先直接找到应用集群状态的入口。这里是调用了clusterApplier的onNewClusterState函数,clusterAplier是一个ClusterApplierService对象。并且传入了一个listener,当处理完成会调用listener的clusterStateProcessed函数,将当前这次集群状态更改标记为已经处理。

    ClusterApplierService

    public void onNewClusterState(final String source, final java.util.function.Supplier<ClusterState> clusterStateSupplier,
                                      final ClusterStateTaskListener listener) {
            Function<ClusterState, ClusterState> applyFunction = currentState -> {
                ClusterState nextState = clusterStateSupplier.get();
                if (nextState != null) {
                    return nextState;
                } else {
                    return currentState;
                }
            };
            submitStateUpdateTask(source, ClusterStateTaskConfig.build(Priority.HIGH), applyFunction, listener);
        }
    
    private void submitStateUpdateTask(final String source, final ClusterStateTaskConfig config,
                                           final Function<ClusterState, ClusterState> executor,
                                           final ClusterStateTaskListener listener) {
            if (!lifecycle.started()) {
                return;
            }
            try {
                UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterStateTaskListener(listener, logger), executor);
                if (config.timeout() != null) {
                    threadPoolExecutor.execute(updateTask, config.timeout(),
                        () -> threadPool.generic().execute(
                            () -> listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source))));
                } else {
                    threadPoolExecutor.execute(updateTask);
                }
            } catch (EsRejectedExecutionException e) {
                // ignore cases where we are shutting down..., there is really nothing interesting
                // to be done here...
                if (!lifecycle.stoppedOrClosed()) {
                    throw e;
                }
            }
        }
    

    看到这里是不是觉得和前面介绍的创建索引的任务执行有点类似,其实创建索引的任务提交是在ClusterService中完成的,而集群状态应用任务是在ClusterApplierService中完成的。创建索引会异步执行一个task来完成,最终结果是影响到集群状态,生成新的集群状态。而当生成新的集群状态并且该新的其群状态被commit后,保证背后的数据处于新的集群状态又会创建一个异步的task来执行,该task是在ClusterApplierService中提交的。
    这里同样是先产生一个UpdateTask任务,传入的config对象标记为Priority.HIGH,并且没有设置超时。所以直接调用threadPoolExecutor的execute方法。这里的threadPoolExecutor也是一个PrioritizedEsThreadPoolExecutor对象,因此执行任务的过程和之前创建索引的任务执行过程一样了。这里就不在分析。直接看UpdateTask的run方法

    ClusterApplierService.UpdateTask

    public void run() {
                runTask(this);
            }
    

    ClusterApplierService

    protected void runTask(UpdateTask task) {
            if (!lifecycle.started()) {
                logger.debug("processing [{}]: ignoring, cluster applier service not started", task.source);
                return;
            }
    
            logger.debug("processing [{}]: execute", task.source);
            final ClusterState previousClusterState = state.get();
    
            long startTimeNS = currentTimeInNanos();
            final ClusterState newClusterState;
            try {
               //获取新的待处理的集群状态
                newClusterState = task.apply(previousClusterState);
            } catch (Exception e) {
                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
                if (logger.isTraceEnabled()) {
                    logger.trace(
                        (Supplier<?>) () -> new ParameterizedMessage(
                            "failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}{}{}",
                            executionTime,
                            previousClusterState.version(),
                            task.source,
                            previousClusterState.nodes(),
                            previousClusterState.routingTable(),
                            previousClusterState.getRoutingNodes()),
                        e);
                }
                warnAboutSlowTaskIfNeeded(executionTime, task.source);
                task.listener.onFailure(task.source, e);
                return;
            }
    
            if (previousClusterState == newClusterState) {
                task.listener.clusterStateProcessed(task.source, newClusterState, newClusterState);
                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
                logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
                warnAboutSlowTaskIfNeeded(executionTime, task.source);
            } else {
                //新的集群状态和老的集群状态不相等
                if (logger.isTraceEnabled()) {
                    logger.trace("cluster state updated, source [{}]\n{}", task.source, newClusterState);
                } else if (logger.isDebugEnabled()) {
                    logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), task.source);
                }
                try {
                    applyChanges(task, previousClusterState, newClusterState);
                    TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
                    logger.debug("processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", task.source,
                        executionTime, newClusterState.version(),
                        newClusterState.stateUUID());
                    warnAboutSlowTaskIfNeeded(executionTime, task.source);
                } catch (Exception e) {
                    TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
                    final long version = newClusterState.version();
                    final String stateUUID = newClusterState.stateUUID();
                    final String fullState = newClusterState.toString();
                    logger.warn(
                        (Supplier<?>) () -> new ParameterizedMessage(
                            "failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
                            executionTime,
                            version,
                            stateUUID,
                            task.source,
                            fullState),
                        e);
                    // TODO: do we want to call updateTask.onFailure here?
                }
            }
        }
    

    当新的集群状态和老的集群状态不相等时,调用applyChanges方法

    private void applyChanges(UpdateTask task, ClusterState previousClusterState, ClusterState newClusterState) {
            //封装一个ClusterChanedEvent
            ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(task.source, newClusterState, previousClusterState);
            // new cluster state, notify all listeners
            final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
            if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
                String summary = nodesDelta.shortSummary();
                if (summary.length() > 0) {
                    logger.info("{}, reason: {}", summary, task.source);
                }
            }
           //判断所有的节点是否都能连接上
            nodeConnectionsService.connectToNodes(newClusterState.nodes());
    
            logger.debug("applying cluster state version {}", newClusterState.version());
            try {
                // nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
                if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
                    final Settings incomingSettings = clusterChangedEvent.state().metaData().settings();
                    clusterSettings.applySettings(incomingSettings);
                }
            } catch (Exception ex) {
                logger.warn("failed to apply cluster settings", ex);
            }
           //这里是入口
            logger.debug("apply cluster state with version {}", newClusterState.version());
            callClusterStateAppliers(clusterChangedEvent);
    
            nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes());
    
            logger.debug("set locally applied cluster state to version {}", newClusterState.version());
            state.set(newClusterState);
    
            callClusterStateListeners(clusterChangedEvent);
    
            task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);
        }
    

    直接进入callClusterStateAppliers函数

    private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) {
            clusterStateAppliers.forEach(applier -> {
                try {
                    logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
                    applier.applyClusterState(clusterChangedEvent);
                } catch (Exception ex) {
                    logger.warn("failed to notify ClusterStateApplier", ex);
                }
            });
        }
    

    这里会对clusterStateAppliers数组中的每个applier都调用一次applyClusterState函数,我们直接看IndicesClusterStateService把,这也是一个ClusterStateApplier。最终是在IndicesClusterStateService里面处理的。

    IndicesClusterStateService

    public synchronized void applyClusterState(final ClusterChangedEvent event) {
            if (!lifecycle.started()) {
                return;
            }
    
            final ClusterState state = event.state();
    
            // we need to clean the shards and indices we have on this node, since we
            // are going to recover them again once state persistence is disabled (no master / not recovered)
            // TODO: feels hacky, a block disables state persistence, and then we clean the allocated shards, maybe another flag in blocks?
            if (state.blocks().disableStatePersistence()) {
                for (AllocatedIndex<? extends Shard> indexService : indicesService) {
                    indicesService.removeIndex(indexService.index(), NO_LONGER_ASSIGNED,
                        "cleaning index (disabled block persistence)"); // also cleans shards
                }
                return;
            }
    
            updateFailedShardsCache(state);
    
            deleteIndices(event); // also deletes shards of deleted indices
    
            removeUnallocatedIndices(event); // also removes shards of removed indices
    
            failMissingShards(state);
    
            removeShards(state);   // removes any local shards that doesn't match what the master expects
    
            updateIndices(event); // can also fail shards, but these are then guaranteed to be in failedShardsCache
    
            createIndices(state);
    
            createOrUpdateShards(state);
        }
    

    在这里统一对各种状态处理,比如删除索引,删除shard等。因为split其实是一种创建索引,所以直接进入createIndices函数。

    private void createIndices(final ClusterState state) {
            // we only create indices for shards that are allocated
           //从这里可以看到每个节点只创建那些分配给自己的索引
            RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
            if (localRoutingNode == null) {
                return;
            }
            // create map of indices to create with shards to fail if index creation fails
            final Map<Index, List<ShardRouting>> indicesToCreate = new HashMap<>();
            for (ShardRouting shardRouting : localRoutingNode) {
                //遍历分配到该节点的每个shard
                if (failedShardsCache.containsKey(shardRouting.shardId()) == false) {
                    final Index index = shardRouting.index();
                    //如果该节点上还没有创建该索引,则放入indicesToCreate,表示需要创建
                    if (indicesService.indexService(index) == null) {
                        indicesToCreate.computeIfAbsent(index, k -> new ArrayList<>()).add(shardRouting);
                    }
                }
            }
    
            for (Map.Entry<Index, List<ShardRouting>> entry : indicesToCreate.entrySet()) {
                final Index index = entry.getKey();
                final IndexMetaData indexMetaData = state.metaData().index(index);
                logger.debug("[{}] creating index", index);
    
                AllocatedIndex<? extends Shard> indexService = null;
                try {
                    //到这里开始创建索引,执行过程和前面讲过的一样,这里不再分析。
                    indexService = indicesService.createIndex(indexMetaData, buildInIndexListener);
                    if (indexService.updateMapping(indexMetaData) && sendRefreshMapping) {
                        nodeMappingRefreshAction.nodeMappingRefresh(state.nodes().getMasterNode(),
                            new NodeMappingRefreshAction.NodeMappingRefreshRequest(indexMetaData.getIndex().getName(),
                                indexMetaData.getIndexUUID(), state.nodes().getLocalNodeId())
                        );
                    }
                } catch (Exception e) {
                    final String failShardReason;
                    if (indexService == null) {
                        failShardReason = "failed to create index";
                    } else {
                        failShardReason = "failed to update mapping for index";
                        indicesService.removeIndex(index, FAILURE, "removing index (mapping update failed)");
                    }
                    for (ShardRouting shardRouting : entry.getValue()) {
                        sendFailShard(shardRouting, failShardReason, e, state);
                    }
                }
            }
        }
    

    这里有人可能会好奇,之前不是都已经创建好索引了吗,为什么这里还要在创建索引。其实之前创建索引是在master节点上创建的,并且生成一个集群变更事件。然后把这个事件发送到其它节点上。这个时候其它节点上其实还没有创建该索引,当其它节点收到集群变更事件后,就会执行到这一步,即进入到createIndices里边创建索引。创建索引也是调用IndicesService的createIndex函数,和前文介绍的过程是一样的。
    当索引创建完后回到applyClusterState函数中,后面还有一步创建shard的过程。接下来看下createOrUpdateShards是怎么实现的。

    private void createOrUpdateShards(final ClusterState state) {
            //同样只创建分配给自己的shard
            RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
            if (localRoutingNode == null) {
                return;
            }
    
            DiscoveryNodes nodes = state.nodes();
            RoutingTable routingTable = state.routingTable();
    
            for (final ShardRouting shardRouting : localRoutingNode) {
               //遍历每一个shard
                ShardId shardId = shardRouting.shardId();
                if (failedShardsCache.containsKey(shardId) == false) {
                    AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardId.getIndex());
                   //索引必须在shard之前建好
                    assert indexService != null : "index " + shardId.getIndex() + " should have been created by createIndices";
                    Shard shard = indexService.getShardOrNull(shardId.id());
                    if (shard == null) {
                        assert shardRouting.initializing() : shardRouting + " should have been removed by failMissingShards";
                        //创建shard
                        createShard(nodes, routingTable, shardRouting, state);
                    } else {
                        updateShard(nodes, shardRouting, shard, routingTable, state);
                    }
                }
            }
        }
    

    这里遍历每一个分配给自己的shard,如果shard还没创建,则调用createShard创建shard

    private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardRouting shardRouting, ClusterState state) {
            assert shardRouting.initializing() : "only allow shard creation for initializing shard but was " + shardRouting;
    
            DiscoveryNode sourceNode = null;
            if (shardRouting.recoverySource().getType() == Type.PEER)  {
               //如果shard是从peer恢复,则找到恢复源节点,之前讲过split创建的索引的主shard恢复都是local shard 类型,也就是从本地的shard恢复
                sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, shardRouting);
                if (sourceNode == null) {
                    logger.trace("ignoring initializing shard {} - no source node can be found.", shardRouting.shardId());
                    return;
                }
            }
    
            try {
                logger.debug("{} creating shard", shardRouting.shardId());
                RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
                indicesService.createShard(shardRouting, recoveryState, recoveryTargetService, new RecoveryListener(shardRouting),
                    repositoriesService, failedShardHandler, globalCheckpointSyncer);
            } catch (Exception e) {
                failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);
            }
        }
    

    这里直接调用IndicesService的createShard函数创建索引

    IndicesService

    public IndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,
                                      PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
                                      Consumer<IndexShard.ShardFailure> onShardFailure,
                                      Consumer<ShardId> globalCheckpointSyncer) throws IOException {
            ensureChangesAllowed();
            IndexService indexService = indexService(shardRouting.index());
            //生成IndexShard对象
            IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer);
            indexShard.addShardFailureCallback(onShardFailure);
            //开始恢复该shard的数据集
            indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService,
                (type, mapping) -> {
                    assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS:
                        "mapping update consumer only required by local shards recovery";
                    try {
                        client.admin().indices().preparePutMapping()
                            .setConcreteIndex(shardRouting.index()) // concrete index - no name clash, it uses uuid
                            .setType(type)
                            .setSource(mapping.source().string(), XContentType.JSON)
                            .get();
                    } catch (IOException ex) {
                        throw new ElasticsearchException("failed to stringify mapping source", ex);
                    }
                }, this);
            return indexShard;
        }
    

    这里首先调用IndexService创建一个shard,其仅仅是生成一个IndexShard对象,并确定一些shard的元数据,比如shard的目录等信息。然后把创建好的shard保存在IndexService中shards成员变量中。shard的元数据创建好了,但shard的底层数据并没有创建好,所以会调用IndexShard的startRecovery函数开始为shard准备数据。

    IndexShard

    public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,
                                  PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
                                  BiConsumer<String, MappingMetaData> mappingUpdateConsumer,
                                  IndicesService indicesService) {
            // TODO: Create a proper object to encapsulate the recovery context
            // all of the current methods here follow a pattern of:
            // resolve context which isn't really dependent on the local shards and then async
            // call some external method with this pointer.
            // with a proper recovery context object we can simply change this to:
            // startRecovery(RecoveryState recoveryState, ShardRecoverySource source ) {
            //     markAsRecovery("from " + source.getShortDescription(), recoveryState);
            //     threadPool.generic().execute()  {
            //           onFailure () { listener.failure() };
            //           doRun() {
            //                if (source.recover(this)) {
            //                  recoveryListener.onRecoveryDone(recoveryState);
            //                }
            //           }
            //     }}
            // }
            assert recoveryState.getRecoverySource().equals(shardRouting.recoverySource());
            switch (recoveryState.getRecoverySource().getType()) {
                ...
                //省略了其它几种类型的恢复,因为split的主shard的恢复类型为这种
                case LOCAL_SHARDS:
                    final IndexMetaData indexMetaData = indexSettings().getIndexMetaData();
                    final Index resizeSourceIndex = indexMetaData.getResizeSourceIndex();
                    final List<IndexShard> startedShards = new ArrayList<>();
                    final IndexService sourceIndexService = indicesService.indexService(resizeSourceIndex);
                    final Set<ShardId> requiredShards;
                    final int numShards;
                    if (sourceIndexService != null) {
                        //选择恢复的源shard
                        requiredShards = IndexMetaData.selectRecoverFromShards(shardId().id(),
                            sourceIndexService.getMetaData(), indexMetaData.getNumberOfShards());
                        for (IndexShard shard : sourceIndexService) {
                            if (shard.state() == IndexShardState.STARTED && requiredShards.contains(shard.shardId())) {
                               判断所有的源shard是不是都处于started状态
                                startedShards.add(shard);
                            }
                        }
                        numShards = requiredShards.size();
                    } else {
                        numShards = -1;
                        requiredShards = Collections.emptySet();
                    }
                   //如果需要的源shard中有shard不属于started状态则报错
                    if (numShards == startedShards.size()) {
                        assert requiredShards.isEmpty() == false;
                        markAsRecovering("from local shards", recoveryState); // mark the shard as recovering on the cluster state thread
    
                      //另起一个线程开始执行数据恢复
                        threadPool.generic().execute(() -> {
                            try {
                                if (recoverFromLocalShards(mappingUpdateConsumer, startedShards.stream()
                                    .filter((s) -> requiredShards.contains(s.shardId())).collect(Collectors.toList()))) {
                                    recoveryListener.onRecoveryDone(recoveryState);
                                }
                            } catch (Exception e) {
                                recoveryListener.onRecoveryFailure(recoveryState,
                                    new RecoveryFailedException(recoveryState, null, e), true);
                            }
                        });
                    } else {
                        final RuntimeException e;
                        if (numShards == -1) {
                            e = new IndexNotFoundException(resizeSourceIndex);
                        } else {
                            e = new IllegalStateException("not all required shards of index " + resizeSourceIndex
                                + " are started yet, expected " + numShards + " found " + startedShards.size() + " can't recover shard "
                                + shardId());
                        }
                        throw e;
                    }
                    break;
                default:
                    throw new IllegalArgumentException("Unknown recovery source " + recoveryState.getRecoverySource());
            }
        }
    

    数据恢复会新起一个线程,执行recoverFromLocalShards函数

    public boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer, List<IndexShard> localShards) throws IOException {
            assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard";
            assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS : "invalid recovery type: " + recoveryState.getRecoverySource();
            final List<LocalShardSnapshot> snapshots = new ArrayList<>();
            try {
                for (IndexShard shard : localShards) {
                    snapshots.add(new LocalShardSnapshot(shard));
                }
    
                // we are the first primary, recover from the gateway
                // if its post api allocation, the index should exists
                assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard";
                StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
                return storeRecovery.recoverFromLocalShards(mappingUpdateConsumer, this, snapshots);
            } finally {
                IOUtils.close(snapshots);
            }
        }
    

    这里使用StoreRecovery来恢复数据

    StoreRecovery

    boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer, final IndexShard indexShard, final List<LocalShardSnapshot> shards) throws IOException {
            if (canRecover(indexShard)) {
                ...
                //开始执行恢复
                return executeRecovery(indexShard, () -> {
                    logger.debug("starting recovery from local shards {}", shards);
                    try {
                        final Directory directory = indexShard.store().directory(); // don't close this directory!!
                        final Directory[] sources = shards.stream().map(LocalShardSnapshot::getSnapshotDirectory).toArray(Directory[]::new);
                        final long maxSeqNo = shards.stream().mapToLong(LocalShardSnapshot::maxSeqNo).max().getAsLong();
                        final long maxUnsafeAutoIdTimestamp =
                                shards.stream().mapToLong(LocalShardSnapshot::maxUnsafeAutoIdTimestamp).max().getAsLong();
                        //在这个函数里生成shard底层的lucene索引
                        addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, sources, maxSeqNo, maxUnsafeAutoIdTimestamp,
                            indexShard.indexSettings().getIndexMetaData(), indexShard.shardId().id(), isSplit, hasNested);
                        internalRecoverFromStore(indexShard);
                        // just trigger a merge to do housekeeping on the
                        // copied segments - we will also see them in stats etc.
                        indexShard.getEngine().forceMerge(false, -1, false, false, false);
                    } catch (IOException ex) {
                        throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex);
                    }
    
                });
            }
            return false;
        }
    
    void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory target, final Sort indexSort, final Directory[] sources,
                final long maxSeqNo, final long maxUnsafeAutoIdTimestamp, IndexMetaData indexMetaData, int shardId, boolean split,
                boolean hasNested) throws IOException {
    
            // clean target directory (if previous recovery attempt failed) and create a fresh segment file with the proper lucene version
            Lucene.cleanLuceneIndex(target);
            assert sources.length > 0;
            final int luceneIndexCreatedVersionMajor = Lucene.readSegmentInfos(sources[0]).getIndexCreatedVersionMajor();
            new SegmentInfos(luceneIndexCreatedVersionMajor).commit(target);
           //如果支持硬链接,则通过硬链接的方式将目标索引的底层文件指向源索引的文件,否则需要拷贝源索引的文件
            final Directory hardLinkOrCopyTarget = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target);
    
            IndexWriterConfig iwc = new IndexWriterConfig(null)
                .setCommitOnClose(false)
                // we don't want merges to happen here - we call maybe merge on the engine
                // later once we stared it up otherwise we would need to wait for it here
                // we also don't specify a codec here and merges should use the engines for this index
                .setMergePolicy(NoMergePolicy.INSTANCE)
                .setOpenMode(IndexWriterConfig.OpenMode.APPEND);
            if (indexSort != null) {
                iwc.setIndexSort(indexSort);
            }
    
            try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(hardLinkOrCopyTarget, indexRecoveryStats), iwc)) {
               //将源索引的目录下的文件添加到目标索引
                writer.addIndexes(sources);
                if (split) {
                    //通过deletebyquery的方式将不属于该shard的数据删掉,ShardSplittingQuery就是负责查出源shard中split后不属于该目标shard的数据。
                    writer.deleteDocuments(new ShardSplittingQuery(indexMetaData, shardId, hasNested));
                }
                /*
                 * We set the maximum sequence number and the local checkpoint on the target to the maximum of the maximum sequence numbers on
                 * the source shards. This ensures that history after this maximum sequence number can advance and we have correct
                 * document-level semantics.
                 */
                writer.setLiveCommitData(() -> {
                    final HashMap<String, String> liveCommitData = new HashMap<>(3);
                    liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
                    liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
                    liveCommitData.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp));
                    return liveCommitData.entrySet().iterator();
                });
                writer.commit();
            }
        }
    

    通过addIndices方法将shard底层的lucene索引构造出来后会进入internalRecoverFromStore将该shard 的engine创建,该shard基本上就恢复完毕。回到IndicesClusterStateService的createShard函数,在调用IndicesService的createShard的时候传入了一个RecoveryListener对象。

    private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {
    
            private final ShardRouting shardRouting;
    
            private RecoveryListener(ShardRouting shardRouting) {
                this.shardRouting = shardRouting;
            }
    
            @Override
            public void onRecoveryDone(RecoveryState state) {
                shardStateAction.shardStarted(shardRouting, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
            }
    
            @Override
            public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
                handleRecoveryFailure(shardRouting, sendShardFailure, e);
            }
        }
    

    当恢复完毕后就会调用该对象的onRecoveryDone方法,在这方法里边会发起一个shardStarted命令,将该shard的状态从INITIALIZING状态变为STARTED状态。
    至此,shard就已经创建完毕,可以接受外边的请求了

    相关文章

      网友评论

          本文标题:elasticsearch shard split 分析(四)

          本文链接:https://www.haomeiwen.com/subject/niudzxtx.html