美文网首页
ElasticSearch Peer to Peer Recov

ElasticSearch Peer to Peer Recov

作者: persisting_ | 来源:发表于2018-12-04 23:54 被阅读0次

    1 概述

    ElasticSearch恢复(Recovery)分为几种模式,从
    RecoverySource.Type中可以看到所有的模式:

    //RecoverySource.Type
    public enum Type {
        EMPTY_STORE,
        EXISTING_STORE,
        //这种就是Peer to Peer恢复
        PEER,
        SNAPSHOT,
        LOCAL_SHARDS
    }
    

    本文主要介绍Peer to Peer(P2P)恢复模式,在某个节点创建根据最新ClusterState创建完Shard之后,会进行恢复模式。

    P2P恢复中涉及到Source端和Target端,P2P恢复是由Target发起,但是整个恢复流程又是由Source主导的。

    Source端为恢复源所在的Node节点,Target端为需要从Source端接受数据并恢复到Source端状态的Node节点,恢复的结果为Target恢复为Source端数据。

    2 预备知识

    ElasticSearch在启动时会实例化一个Node实例,在Node初始化时,会分别初始化进行恢复必要的两个Service:
    PeerRecoverySourceService以及PeerRecoveryTargetService

    其中PeerRecoverySourceService主要负责Source端的工作,PeerRecoveryTargetService则主要负责Target端的工作。

    恢复过程涉及到的其他几个重要的类:

    1. RecoverySourceHandler : Source端接受到Target端恢复请求时创建,在PeerRecoverySourceService负责主导恢复流程。

    2. RemoteRecoveryTargetHandler : Source端RecoverySourceHandler持有的用于和Target交互(发送Segment快照、Translog等)的对象。

    3. RecoveryTarget : Target端PeerRecoveryTargetService在启动恢复流程时创建的对象,用于接受Source端文件、Translog等执行恢复的对象。

    此外,还需要注意一下PeerRecoverySourceServicePeerRecoveryTargetService在构造函数中向TransporgService注册的请求处理Handler。

    Source端注册的Handler如下:

    //PeerRecoverySourceService构造函数
    //StartRecoveryTransportRequestHandler主要负责处理Target端启动恢复的
    //请求,创建RecoverySourceHandler对象实例,开始恢复流程
    transportService.registerRequestHandler(Actions.START_RECOVERY, StartRecoveryRequest::new, ThreadPool.Names.GENERIC, new StartRecoveryTransportRequestHandler());
    

    Target端注册的Handler如下(后文介绍中可能有些没有涉及):

    //PeerRecoveryTargetService
    //负责接受处理Source端Shard快照文件
    transportService.registerRequestHandler(Actions.FILES_INFO, RecoveryFilesInfoRequest::new, ThreadPool.Names.GENERIC, new
            FilesInfoRequestHandler());
    
    //负责接受Source端发送的Shard文件块
    transportService.registerRequestHandler(Actions.FILE_CHUNK, RecoveryFileChunkRequest::new, ThreadPool.Names.GENERIC, new
            FileChunkTransportRequestHandler());
    
    //负责将接受到的Shard文件更改为正式文件
    transportService.registerRequestHandler(Actions.CLEAN_FILES, RecoveryCleanFilesRequest::new, ThreadPool.Names.GENERIC, new
            CleanFilesRequestHandler());
    
    //负责打开Engine,准备接受Source端Translog进行日志重放
    transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC,
            RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler());
    
    //负责接受Source端Translog,进行日志重放
    transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC,
            new TranslogOperationsRequestHandler());
    
    //负责刷新Engine,让新的Segment生效,回收旧的文件,更新global checkpoint等
    transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
            FinalizeRecoveryRequestHandler());
    transportService.registerRequestHandler(Actions.WAIT_CLUSTERSTATE, RecoveryWaitForClusterStateRequest::new,
        ThreadPool.Names.GENERIC, new WaitForClusterStateRequestHandler());
        
    //如果是主分片relocate,则负责接受主分片身份
    transportService.registerRequestHandler(
            Actions.HANDOFF_PRIMARY_CONTEXT,
            RecoveryHandoffPrimaryContextRequest::new,
            ThreadPool.Names.GENERIC,
            new HandoffPrimaryContextRequestHandler());
    

    如上,Source和Target端注册的Handler负责处理指定阶段相互交互发送Request的请求,下文不再详述具体交互细节,会直接进入指定Handler进行介绍。

    3 大致流程

    P2P恢复大致流程.png

    4 Target端发送开始恢复流程请求

    Target端在createShade结束后会向Source端发送开始恢复流程请求。

    //IndicesService
    @Override
    public IndexShard createShard(...) throws IOException {
        ensureChangesAllowed();
        IndexService indexService = indexService(shardRouting.index());
        IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer);
        indexShard.addShardFailureCallback(onShardFailure);
        //Target端发起恢复流程起点
        indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService,
            (type, mapping) -> {
                ...
            }, this);
        return indexShard;
    }
    
    //IndexShard
    public void startRecovery(...) {
            ...
            switch (recoveryState.getRecoverySource().getType()) {
                case EMPTY_STORE:
                case EXISTING_STORE:
                    threadPool.generic().execute(() -> {
                        ...
                        //从本地文件(Translog)恢复
                            if (recoverFromStore()) {
                              ...
                    });
                    break;
                case PEER:
                        ...
                        //P2P恢复
                        recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
                    ...
                    break;
                case SNAPSHOT:
                    //从快照恢复
                    ...
                    break;
                case LOCAL_SHARDS:
                    //从本地Shard恢复
                    ...
                default:
                    throw new IllegalArgumentException("Unknown recovery source " + recoveryState.getRecoverySource());
            }
    }
    
    //PeerRecoveryTargetService
    //Target端PeerRecoveryTargetService开始恢复
    public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {
        // create a new recovery status, and process...
        //创建新的RecoveryTarget,并返回该RecoveryTarget对应的ID
        //在发送START_RECOVERY给Source端时会同时发送该ID,后续
        //Source端发送恢复流程中的request也会带上该ID,Target端根据
        //该ID找到RecoveryTarget进行请求处理
        final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout());
        //Target具体恢复流程放在RecoveryRunner中
        threadPool.generic().execute(new RecoveryRunner(recoveryId));
    } 
    
    //PeerRecoveryTargetService.RecoveryRunner
    class RecoveryRunner extends AbstractRunnable {
    
        final long recoveryId;
    
        RecoveryRunner(long recoveryId) {
            this.recoveryId = recoveryId;
        }
    
        @Override
        public void onFailure(Exception e) {
            ...
        }
    
        @Override
        public void doRun() {
            //具体的恢复函数
            doRecovery(recoveryId);
        }
    }
    //PeerRecoveryTargetService
    private void doRecovery(final long recoveryId) {
        ...
    
        try (
                ...
                //生成request
                request = getStartRecoveryRequest(recoveryTarget);
                ...
        }
    
        try {
            
            cancellableThreads.execute(() -> responseHolder.set(
                    //发送START_RECOVERY到Source端,启动Source端恢复流程
                    transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request,
                            new FutureTransportResponseHandler<RecoveryResponse>() {
                                @Override
                                public RecoveryResponse newInstance() {
                                    return new RecoveryResponse();
                                }
                            }).txGet()));
            
        } catch (CancellableThreads.ExecutionCancelledException e) {
            ...
        } catch (Exception e) {
            //下面异常处理中某些异常可以重试恢复流程
            Throwable cause = ExceptionsHelper.unwrapCause(e);
            ...
            if (cause instanceof IllegalIndexShardStateException || cause instanceof IndexNotFoundException ||
                ..
                retryRecovery(
                   ..);
                return;
            }
    
            if (cause instanceof DelayRecoveryException) {
                retryRecovery(..);
                return;
            }
    
            if (cause instanceof ConnectTransportException) {
                ..
                retryRecovery(..);
                return;
            }
    
            ..
    
            onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, e), true);
        }
    }
    

    5 Source端开启恢复流程

    5.1 RecoverySourceHandler的创建

    Source端接收到Target发起的启动恢复流程START_RECOVERY请求后,使用StartRecoveryTransportRequestHandler处理该请求:

    //PeerRecoverySourceService.StartRecoveryTransportRequestHandler
     @Override
    public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel, Task task) throws Exception {
        //Source端处理START_RECOVERY请求Handler调用
        //PeerRecoverySourceService开启恢复流程
        RecoveryResponse response = recover(request);
        channel.sendResponse(response);
    }
    
    private RecoveryResponse recover(final StartRecoveryRequest request) throws IOException {
        ...
        //这里创建RecoverySourceHandler用于主导恢复流程
        RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, shard);
        logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(), request.targetNode());
        try {
            //实际的恢复流程函数,Source端所有的恢复流程都是这个函数定义的,后续
            //的恢复流程都是Source主导,发送各种请求让Target端进行文件恢复
            //Translog重放等。
            return handler.recoverToTarget();
        } finally {
            ongoingRecoveries.remove(shard, handler);
        }
    }
    //PeerRecoverySourceService.OngoingRecoveries
    synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
        final ShardRecoveryContext shardContext = ongoingRecoveries.computeIfAbsent(shard, s -> new ShardRecoveryContext());
        //实际创建Handler的地方
        RecoverySourceHandler handler = shardContext.addNewRecovery(request, shard);
        shard.recoveryStats().incCurrentAsSource();
        return handler;
    }
    //PeerRecoverySourceService.OngoingRecoveries.ShardRecoveryContext
    synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
        for (RecoverySourceHandler existingHandler : recoveryHandlers) {
            if (existingHandler.getRequest().targetAllocationId().equals(request.targetAllocationId())) {
                throw new DelayRecoveryException("recovery with same target already registered, waiting for " +
                    "previous recovery attempt to be cancelled or completed");
            }
        }
        RecoverySourceHandler handler = createRecoverySourceHandler(request, shard);
        recoveryHandlers.add(handler);
        return handler;
    }
    
    private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest request, IndexShard shard) {
        RecoverySourceHandler handler;
        //RecoverySourceHandler会持有一个RemoteRecoveryTargetHandler
        //用于和Target端进行交互,后续各种请求都是通过这个对象进行交互的。
        final RemoteRecoveryTargetHandler recoveryTarget =
            new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService,
                request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
        handler = new RecoverySourceHandler(shard, recoveryTarget, request, recoverySettings.getChunkSize().bytesAsInt());
        return handler;
    }
    

    5.2 Source端主导流程recoverToTarget

    Source端主导流程主要在RecoverySourceHandler.recoverToTarget函数中:

    //RecoverySourceHandler
    public RecoveryResponse recoverToTarget() throws IOException {
        ...
        //创建Shard快照,该快照会在phase1阶段发送给Target端
        phase1Snapshot = shard.acquireSafeIndexCommit();
        ...
        phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps);
        ...
        prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
            shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
        ...
        targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot,
            maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
        ...
    
        finalizeRecovery(targetLocalCheckpoint);
        ...
        return response;
    }
    
    

    通过上面的recoverToTarget函数可知,恢复流程主要分为另个阶段,额外加一个收尾阶段,分别为phase1,phase2,finalizeRecovery,下面依次介绍:

    5.2.1 phase1:source端向Target发送快照

    根据上述代码,在调用phase1函数之前,source端使用phase1Snapshot = shard.acquireSafeIndexCommit()对本地的数据执行快照操作,然后调用phase1发送快照文件到Target端。

    phase1阶段主要向Target端发送三种请求:单个FILES_INFO告知Target即将接收的快照文件的相关信息、多个FILE_CHUNK进行文件传输、CLEAN_FILES告诉Target端清理原本存在但是恢复到Source端状态却不再需要的文件,CLEAN_FILES也会要求Target端对接收到的文件进行验证。

    public void phase1(final IndexCommit snapshot, final Supplier<Integer> translogOps) {
        //省略的内容主要是Target Start Recovery请求中的信息处理有差异的文件
        ...
        //发送Shard文件相关信息到Target端
        cancellableThreads.execute(() ->
                recoveryTarget.receiveFileInfo(response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames,
                        response.phase1ExistingFileSizes, translogOps.get()));
                // How many bytes we've copied since we last called RateLimiter.pause
        ...
        //通过文件块FILE_CHUNK的方式,将文件发送给Target端
        sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories);
        // Send the CLEAN_FILES request, which takes all of the files that
        // were transferred and renames them from their temporary file
        // names to the actual file names. It also writes checksums for
        // the files after they have been renamed.
        //
        // Once the files have been renamed, any other files that are not
        // related to this recovery (out of date segments, for example)
        // are deleted
        //发送CLEAN_FILES请求,告诉Target端临时文件中的文件命名为正式名称,删除旧的文件等。
        try {
            cancellableThreads.executeIO(() ->
                recoveryTarget.cleanFiles(translogOps.get(), recoverySourceMetadata));
        } catch (RemoteTransportException | IOException 
         ...
    }
    

    其实Source端和Target端的交互基本上都是以请求和Handler的方式进行,比如Source端发送文件信息到Target端,即recoveryTarget.receiveFileInfo实现代码如下:

    //RemoteRecoveryTargetHandler
    @Override
    public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames,
                                List<Long> phase1ExistingFileSizes, int totalTranslogOps) {
        //注意下面的recoveryId,Target端收到请求后会根据该ID
        //找到负责助理本恢复的RecoveryTarget
        RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(recoveryId, shardId,
                phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps);
        transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FILES_INFO, recoveryInfoFilesRequest,
                TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
                EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
    
    }
    

    具体如何发送的可以查看phase1函数代码,下面着重看下Target如何处理Source端发送的FILES_INFO、FILE_CHUNK、CLEAN_FILES请求:

    Target端接收FILES_INFO请求后,会使用注册的FilesInfoRequestHandler处理:

    //PeerRecoveryTargetService.FilesInfoRequestHandler
     @Override
    public void receiveFileInfo(List<String> phase1FileNames,
                                List<Long> phase1FileSizes,
                                List<String> phase1ExistingFileNames,
                                List<Long> phase1ExistingFileSizes,
                                int totalTranslogOps) {
        final RecoveryState.Index index = state().getIndex();
        //Target在此主要记录发送过来的快照文件的相关信息,如文件名、文件大小、事务日志等相关信息
        for (int i = 0; i < phase1ExistingFileNames.size(); i++) {
            index.addFileDetail(phase1ExistingFileNames.get(i), phase1ExistingFileSizes.get(i), true);
        }
        for (int i = 0; i < phase1FileNames.size(); i++) {
            index.addFileDetail(phase1FileNames.get(i), phase1FileSizes.get(i), false);
        }
        state().getTranslog().totalOperations(totalTranslogOps);
        state().getTranslog().totalOperationsOnStart(totalTranslogOps);
    
    }
    

    Source端发送完快照文件信息之后,会通过多个FILE_CHUNK请求向Target传输快
    照文件,其实Target端在发送START_RECOVERY告诉Source端启动恢复流程时,也会将自己的元数据随着该请求发送给Source端,Source端在进行快照操作之后、发送给Target之前会依据自己的快照文件以及Target端请求中的元数据计算出差异的地方。

    计算出的多个差异文件(即多个StoreFileMetaData)会以多个FILE_CHUNK请求发送给Target端。每个FILE_CHUNK携带一个完整的StoreFileMetaData

    Target端使用FileChunkTransportRequestHandler处理FILE_CHUNK请求,这里面需要注意一下ElasticSearch可使用参数参数indices.recovery.max_bytes_per_sec调节发送文件的速度:

    //PeerRecoveryTargetService.FileChunkTransportRequestHandler
    //Target端接收Source端发送过来的文件
    @Override
    public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel, Task task) throws Exception {
        try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
        )) {
            final RecoveryTarget recoveryTarget = recoveryRef.target();
            final RecoveryState.Index indexState = recoveryTarget.state().getIndex();
            if (request.sourceThrottleTimeInNanos() != RecoveryState.Index.UNKNOWN) {
                indexState.addSourceThrottling(request.sourceThrottleTimeInNanos());
            }
            //这里默认使用Lucene的SimpleRateLimiter实现限速
            RateLimiter rateLimiter = recoverySettings.rateLimiter();
            if (rateLimiter != null) {
                long bytes = bytesSinceLastPause.addAndGet(request.content().length());
                if (bytes > rateLimiter.getMinPauseCheckBytes()) {
                    // Time to pause
                    bytesSinceLastPause.addAndGet(-bytes);
                    long throttleTimeInNanos = rateLimiter.pause(bytes);
                    indexState.addTargetThrottling(throttleTimeInNanos);
                    recoveryTarget.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos);
                }
            }
    
            recoveryTarget.writeFileChunk(request.metadata(), request.position(), request.content(),
                    request.lastChunk(), request.totalTranslogOps()
            );
        }
        channel.sendResponse(TransportResponse.Empty.INSTANCE);
    }
    

    Source端发送完所有差异文件之后,会发送CLEAN_FILES请求给Target端,Target端处理CLEAN_FILES的handler是:

    //PeerRecoveryTargetService.CleanFilesRequestHandler
     @Override
    public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel, Task task) throws Exception {
        try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
        )) {
            recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.sourceMetaSnapshot());
            channel.sendResponse(TransportResponse.Empty.INSTANCE);
        }
    }
    
    //RecoveryTarget
    @Override
    public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
        state().getTranslog().totalOperations(totalTranslogOps);
        // first, we go and move files that were created with the recovery id suffix to
        // the actual names, its ok if we have a corrupted index here, since we have replicas
        // to recover from in case of a full cluster shutdown just when this code executes...
        //将从Source端接收文件时创建的临时命名为正式文件,根据注释可知可能会覆盖Target端已有文件。
        renameAllTempFiles();
        final Store store = store();
        store.incRef();
        try {
            //Target端清理本地原有文件,但是根据Source端发送过来恢复信息判断恢复之后不需要的文件,并对接收的文件进行验证
            store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData);
            if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) {
                store.ensureIndexHasHistoryUUID();
            }
            ...
            //其他相关处理
    }
    

    至此Source端phase1已经执行结束。Source端已经将本地快照文件发送给了Target端,Target端也已经对接收到的快照文件进行了处理,下面就需要进入Source端的phase2阶段,phase2端只要处理事务日志。

    在进入phase2阶段之前,Source端还发送了PREPARE_TRANSLOG请求给Rarget端,告诉其打开Engine(ElasticSearch使用Engine执行写入操作),准备好接收Translog并重放。

    Target端使用PrepareForTranslogOperationsRequestHandler处理PREPARE_TRANSLOG请求:

    //PeerRecoveryTargetService.PrepareForTranslogOperationsRequestHandler
     @Override
    public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel,
                                Task task) throws Exception {
        try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
        )) {
            recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps());
        }
        channel.sendResponse(TransportResponse.Empty.INSTANCE);
    }
    
    //RecoveryTarget
    @Override
    public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
        state().getTranslog().totalOperations(totalTranslogOps);
        indexShard().openEngineAndSkipTranslogRecovery();
    }
    

    5.2.2 phase2:source端向Target发送Tanslog

    Source端在phase2主要向Target发送事务日志:

    //RecoverySourceHandler
    long phase2(...)
           ...
    
            // send all the snapshot's translog operations to the target
            //调用RemoteRecoveryTargetHandler发送事务日志到Target端
            //使用TRANSLOG_OPS请求
            final SendSnapshotResult result = sendSnapshot(
                startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
    
            ...
            return result.targetLocalCheckpoint;
        }
    

    Target处理TRANSLOG_OPS请求的handler为TranslogOperationsRequestHandler:

    //PeerRecoveryTargetService.TranslogOperationsRequestHandler
    //这里不再展示TranslogOperationsRequestHandler源码,可以自行查看,//主要逻辑是Target端依次取出事务日志中的每个操作在本地进行重放。
    

    5.2.3 收尾工作finalizeRecovery

    Source端RecoverySourceHandler.recoverToTarget最后一步就是finalizeRecovery

    //RecoverySourceHandler
    /*
    * finalizes the recovery process
    */
    public void finalizeRecovery(final long targetLocalCheckpoint) throws IOException {
        ...
        //忽略其他的逻辑,我们主要看下这里,如果Source端是主分片,则进行的是
        //主分片重定位(PrimaryRelocation)
        //恢复结束后需要移交Primary上下文给Target端
        if (request.isPrimaryRelocation()) {
            logger.trace("performing relocation hand-off");
            // this acquires all IndexShard operation permits and will thus delay new recoveries until it is done
            cancellableThreads.execute(() -> shard.relocated(recoveryTarget::handoffPrimaryContext));
            /*
                * if the recovery process fails after disabling primary mode on the source shard, both relocation source and
                * target are failed (see {@link IndexShard#updateRoutingEntry}).
                */
        }
        stopWatch.stop();
        logger.trace("finalizing recovery took [{}]", stopWatch.totalTime());
    }
    

    Source端移交Primary上下文主要通过发送HANDOFF_PRIMARY_CONTEXT报文:

     @Override
    public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
        transportService.submitRequest(
                targetNode,
                PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT,
                new RecoveryHandoffPrimaryContextRequest(recoveryId, shardId, primaryContext),
                TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
                EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
    }
    

    Target端使用HandoffPrimaryContextRequestHandler处理HANDOFF_PRIMARY_CONTEXT报文:

    //PeerRecoveryTargetService.HandoffPrimaryContextRequestHandler
    @Override
    public void messageReceived(final RecoveryHandoffPrimaryContextRequest request, final TransportChannel channel,
                                Task task) throws Exception {
        try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
            recoveryRef.target().handoffPrimaryContext(request.primaryContext());
        }
        channel.sendResponse(TransportResponse.Empty.INSTANCE);
    }
    
    //RecoveryTarget
     @Override
    public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
        indexShard.activateWithPrimaryContext(primaryContext);
    }
    

    Target端主要调用indexShard.activateWithPrimaryContext完成Primary上下文的接手,这里涉及到ReplicationTracker,比较复杂,后续会有专文介绍。

    ReplicationTracker的介绍参考文章ElasticSearch ReplicationTracker

    相关文章

      网友评论

          本文标题:ElasticSearch Peer to Peer Recov

          本文链接:https://www.haomeiwen.com/subject/mppbcqtx.html