1 概述
ElasticSearch恢复(Recovery)分为几种模式,从
RecoverySource.Type
中可以看到所有的模式:
//RecoverySource.Type
public enum Type {
EMPTY_STORE,
EXISTING_STORE,
//这种就是Peer to Peer恢复
PEER,
SNAPSHOT,
LOCAL_SHARDS
}
本文主要介绍Peer to Peer(P2P)恢复模式,在某个节点创建根据最新ClusterState
创建完Shard之后,会进行恢复模式。
P2P恢复中涉及到Source端和Target端,P2P恢复是由Target发起,但是整个恢复流程又是由Source主导的。
Source端为恢复源所在的Node节点,Target端为需要从Source端接受数据并恢复到Source端状态的Node节点,恢复的结果为Target恢复为Source端数据。
2 预备知识
ElasticSearch在启动时会实例化一个Node
实例,在Node
初始化时,会分别初始化进行恢复必要的两个Service:
PeerRecoverySourceService
以及PeerRecoveryTargetService
。
其中PeerRecoverySourceService
主要负责Source端的工作,PeerRecoveryTargetService
则主要负责Target端的工作。
恢复过程涉及到的其他几个重要的类:
-
RecoverySourceHandler
: Source端接受到Target端恢复请求时创建,在PeerRecoverySourceService
负责主导恢复流程。 -
RemoteRecoveryTargetHandler
: Source端RecoverySourceHandler
持有的用于和Target交互(发送Segment快照、Translog等)的对象。 -
RecoveryTarget
: Target端PeerRecoveryTargetService
在启动恢复流程时创建的对象,用于接受Source端文件、Translog等执行恢复的对象。
此外,还需要注意一下PeerRecoverySourceService
和PeerRecoveryTargetService
在构造函数中向TransporgService注册的请求处理Handler。
Source端注册的Handler如下:
//PeerRecoverySourceService构造函数
//StartRecoveryTransportRequestHandler主要负责处理Target端启动恢复的
//请求,创建RecoverySourceHandler对象实例,开始恢复流程
transportService.registerRequestHandler(Actions.START_RECOVERY, StartRecoveryRequest::new, ThreadPool.Names.GENERIC, new StartRecoveryTransportRequestHandler());
Target端注册的Handler如下(后文介绍中可能有些没有涉及):
//PeerRecoveryTargetService
//负责接受处理Source端Shard快照文件
transportService.registerRequestHandler(Actions.FILES_INFO, RecoveryFilesInfoRequest::new, ThreadPool.Names.GENERIC, new
FilesInfoRequestHandler());
//负责接受Source端发送的Shard文件块
transportService.registerRequestHandler(Actions.FILE_CHUNK, RecoveryFileChunkRequest::new, ThreadPool.Names.GENERIC, new
FileChunkTransportRequestHandler());
//负责将接受到的Shard文件更改为正式文件
transportService.registerRequestHandler(Actions.CLEAN_FILES, RecoveryCleanFilesRequest::new, ThreadPool.Names.GENERIC, new
CleanFilesRequestHandler());
//负责打开Engine,准备接受Source端Translog进行日志重放
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC,
RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler());
//负责接受Source端Translog,进行日志重放
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC,
new TranslogOperationsRequestHandler());
//负责刷新Engine,让新的Segment生效,回收旧的文件,更新global checkpoint等
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
FinalizeRecoveryRequestHandler());
transportService.registerRequestHandler(Actions.WAIT_CLUSTERSTATE, RecoveryWaitForClusterStateRequest::new,
ThreadPool.Names.GENERIC, new WaitForClusterStateRequestHandler());
//如果是主分片relocate,则负责接受主分片身份
transportService.registerRequestHandler(
Actions.HANDOFF_PRIMARY_CONTEXT,
RecoveryHandoffPrimaryContextRequest::new,
ThreadPool.Names.GENERIC,
new HandoffPrimaryContextRequestHandler());
如上,Source和Target端注册的Handler负责处理指定阶段相互交互发送Request的请求,下文不再详述具体交互细节,会直接进入指定Handler进行介绍。
3 大致流程
P2P恢复大致流程.png4 Target端发送开始恢复流程请求
Target端在createShade结束后会向Source端发送开始恢复流程请求。
//IndicesService
@Override
public IndexShard createShard(...) throws IOException {
ensureChangesAllowed();
IndexService indexService = indexService(shardRouting.index());
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer);
indexShard.addShardFailureCallback(onShardFailure);
//Target端发起恢复流程起点
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService,
(type, mapping) -> {
...
}, this);
return indexShard;
}
//IndexShard
public void startRecovery(...) {
...
switch (recoveryState.getRecoverySource().getType()) {
case EMPTY_STORE:
case EXISTING_STORE:
threadPool.generic().execute(() -> {
...
//从本地文件(Translog)恢复
if (recoverFromStore()) {
...
});
break;
case PEER:
...
//P2P恢复
recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
...
break;
case SNAPSHOT:
//从快照恢复
...
break;
case LOCAL_SHARDS:
//从本地Shard恢复
...
default:
throw new IllegalArgumentException("Unknown recovery source " + recoveryState.getRecoverySource());
}
}
//PeerRecoveryTargetService
//Target端PeerRecoveryTargetService开始恢复
public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {
// create a new recovery status, and process...
//创建新的RecoveryTarget,并返回该RecoveryTarget对应的ID
//在发送START_RECOVERY给Source端时会同时发送该ID,后续
//Source端发送恢复流程中的request也会带上该ID,Target端根据
//该ID找到RecoveryTarget进行请求处理
final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout());
//Target具体恢复流程放在RecoveryRunner中
threadPool.generic().execute(new RecoveryRunner(recoveryId));
}
//PeerRecoveryTargetService.RecoveryRunner
class RecoveryRunner extends AbstractRunnable {
final long recoveryId;
RecoveryRunner(long recoveryId) {
this.recoveryId = recoveryId;
}
@Override
public void onFailure(Exception e) {
...
}
@Override
public void doRun() {
//具体的恢复函数
doRecovery(recoveryId);
}
}
//PeerRecoveryTargetService
private void doRecovery(final long recoveryId) {
...
try (
...
//生成request
request = getStartRecoveryRequest(recoveryTarget);
...
}
try {
cancellableThreads.execute(() -> responseHolder.set(
//发送START_RECOVERY到Source端,启动Source端恢复流程
transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request,
new FutureTransportResponseHandler<RecoveryResponse>() {
@Override
public RecoveryResponse newInstance() {
return new RecoveryResponse();
}
}).txGet()));
} catch (CancellableThreads.ExecutionCancelledException e) {
...
} catch (Exception e) {
//下面异常处理中某些异常可以重试恢复流程
Throwable cause = ExceptionsHelper.unwrapCause(e);
...
if (cause instanceof IllegalIndexShardStateException || cause instanceof IndexNotFoundException ||
..
retryRecovery(
..);
return;
}
if (cause instanceof DelayRecoveryException) {
retryRecovery(..);
return;
}
if (cause instanceof ConnectTransportException) {
..
retryRecovery(..);
return;
}
..
onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, e), true);
}
}
5 Source端开启恢复流程
5.1 RecoverySourceHandler的创建
Source端接收到Target发起的启动恢复流程START_RECOVERY请求后,使用StartRecoveryTransportRequestHandler
处理该请求:
//PeerRecoverySourceService.StartRecoveryTransportRequestHandler
@Override
public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel, Task task) throws Exception {
//Source端处理START_RECOVERY请求Handler调用
//PeerRecoverySourceService开启恢复流程
RecoveryResponse response = recover(request);
channel.sendResponse(response);
}
private RecoveryResponse recover(final StartRecoveryRequest request) throws IOException {
...
//这里创建RecoverySourceHandler用于主导恢复流程
RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, shard);
logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(), request.targetNode());
try {
//实际的恢复流程函数,Source端所有的恢复流程都是这个函数定义的,后续
//的恢复流程都是Source主导,发送各种请求让Target端进行文件恢复
//Translog重放等。
return handler.recoverToTarget();
} finally {
ongoingRecoveries.remove(shard, handler);
}
}
//PeerRecoverySourceService.OngoingRecoveries
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
final ShardRecoveryContext shardContext = ongoingRecoveries.computeIfAbsent(shard, s -> new ShardRecoveryContext());
//实际创建Handler的地方
RecoverySourceHandler handler = shardContext.addNewRecovery(request, shard);
shard.recoveryStats().incCurrentAsSource();
return handler;
}
//PeerRecoverySourceService.OngoingRecoveries.ShardRecoveryContext
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
for (RecoverySourceHandler existingHandler : recoveryHandlers) {
if (existingHandler.getRequest().targetAllocationId().equals(request.targetAllocationId())) {
throw new DelayRecoveryException("recovery with same target already registered, waiting for " +
"previous recovery attempt to be cancelled or completed");
}
}
RecoverySourceHandler handler = createRecoverySourceHandler(request, shard);
recoveryHandlers.add(handler);
return handler;
}
private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest request, IndexShard shard) {
RecoverySourceHandler handler;
//RecoverySourceHandler会持有一个RemoteRecoveryTargetHandler
//用于和Target端进行交互,后续各种请求都是通过这个对象进行交互的。
final RemoteRecoveryTargetHandler recoveryTarget =
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService,
request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
handler = new RecoverySourceHandler(shard, recoveryTarget, request, recoverySettings.getChunkSize().bytesAsInt());
return handler;
}
5.2 Source端主导流程recoverToTarget
Source端主导流程主要在RecoverySourceHandler.recoverToTarget
函数中:
//RecoverySourceHandler
public RecoveryResponse recoverToTarget() throws IOException {
...
//创建Shard快照,该快照会在phase1阶段发送给Target端
phase1Snapshot = shard.acquireSafeIndexCommit();
...
phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps);
...
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
...
targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot,
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
...
finalizeRecovery(targetLocalCheckpoint);
...
return response;
}
通过上面的recoverToTarget
函数可知,恢复流程主要分为另个阶段,额外加一个收尾阶段,分别为phase1
,phase2
,finalizeRecovery
,下面依次介绍:
5.2.1 phase1:source端向Target发送快照
根据上述代码,在调用phase1
函数之前,source端使用phase1Snapshot = shard.acquireSafeIndexCommit()
对本地的数据执行快照操作,然后调用phase1
发送快照文件到Target端。
phase1阶段主要向Target端发送三种请求:单个FILES_INFO告知Target即将接收的快照文件的相关信息、多个FILE_CHUNK进行文件传输、CLEAN_FILES告诉Target端清理原本存在但是恢复到Source端状态却不再需要的文件,CLEAN_FILES也会要求Target端对接收到的文件进行验证。
public void phase1(final IndexCommit snapshot, final Supplier<Integer> translogOps) {
//省略的内容主要是Target Start Recovery请求中的信息处理有差异的文件
...
//发送Shard文件相关信息到Target端
cancellableThreads.execute(() ->
recoveryTarget.receiveFileInfo(response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames,
response.phase1ExistingFileSizes, translogOps.get()));
// How many bytes we've copied since we last called RateLimiter.pause
...
//通过文件块FILE_CHUNK的方式,将文件发送给Target端
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories);
// Send the CLEAN_FILES request, which takes all of the files that
// were transferred and renames them from their temporary file
// names to the actual file names. It also writes checksums for
// the files after they have been renamed.
//
// Once the files have been renamed, any other files that are not
// related to this recovery (out of date segments, for example)
// are deleted
//发送CLEAN_FILES请求,告诉Target端临时文件中的文件命名为正式名称,删除旧的文件等。
try {
cancellableThreads.executeIO(() ->
recoveryTarget.cleanFiles(translogOps.get(), recoverySourceMetadata));
} catch (RemoteTransportException | IOException
...
}
其实Source端和Target端的交互基本上都是以请求和Handler的方式进行,比如Source端发送文件信息到Target端,即recoveryTarget.receiveFileInfo
实现代码如下:
//RemoteRecoveryTargetHandler
@Override
public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames,
List<Long> phase1ExistingFileSizes, int totalTranslogOps) {
//注意下面的recoveryId,Target端收到请求后会根据该ID
//找到负责助理本恢复的RecoveryTarget
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(recoveryId, shardId,
phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps);
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FILES_INFO, recoveryInfoFilesRequest,
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
具体如何发送的可以查看phase1
函数代码,下面着重看下Target如何处理Source端发送的FILES_INFO、FILE_CHUNK、CLEAN_FILES请求:
Target端接收FILES_INFO请求后,会使用注册的FilesInfoRequestHandler
处理:
//PeerRecoveryTargetService.FilesInfoRequestHandler
@Override
public void receiveFileInfo(List<String> phase1FileNames,
List<Long> phase1FileSizes,
List<String> phase1ExistingFileNames,
List<Long> phase1ExistingFileSizes,
int totalTranslogOps) {
final RecoveryState.Index index = state().getIndex();
//Target在此主要记录发送过来的快照文件的相关信息,如文件名、文件大小、事务日志等相关信息
for (int i = 0; i < phase1ExistingFileNames.size(); i++) {
index.addFileDetail(phase1ExistingFileNames.get(i), phase1ExistingFileSizes.get(i), true);
}
for (int i = 0; i < phase1FileNames.size(); i++) {
index.addFileDetail(phase1FileNames.get(i), phase1FileSizes.get(i), false);
}
state().getTranslog().totalOperations(totalTranslogOps);
state().getTranslog().totalOperationsOnStart(totalTranslogOps);
}
Source端发送完快照文件信息之后,会通过多个FILE_CHUNK请求向Target传输快
照文件,其实Target端在发送START_RECOVERY告诉Source端启动恢复流程时,也会将自己的元数据随着该请求发送给Source端,Source端在进行快照操作之后、发送给Target之前会依据自己的快照文件以及Target端请求中的元数据计算出差异的地方。
计算出的多个差异文件(即多个StoreFileMetaData
)会以多个FILE_CHUNK请求发送给Target端。每个FILE_CHUNK携带一个完整的StoreFileMetaData
。
Target端使用FileChunkTransportRequestHandler
处理FILE_CHUNK请求,这里面需要注意一下ElasticSearch可使用参数参数indices.recovery.max_bytes_per_sec
调节发送文件的速度:
//PeerRecoveryTargetService.FileChunkTransportRequestHandler
//Target端接收Source端发送过来的文件
@Override
public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
final RecoveryTarget recoveryTarget = recoveryRef.target();
final RecoveryState.Index indexState = recoveryTarget.state().getIndex();
if (request.sourceThrottleTimeInNanos() != RecoveryState.Index.UNKNOWN) {
indexState.addSourceThrottling(request.sourceThrottleTimeInNanos());
}
//这里默认使用Lucene的SimpleRateLimiter实现限速
RateLimiter rateLimiter = recoverySettings.rateLimiter();
if (rateLimiter != null) {
long bytes = bytesSinceLastPause.addAndGet(request.content().length());
if (bytes > rateLimiter.getMinPauseCheckBytes()) {
// Time to pause
bytesSinceLastPause.addAndGet(-bytes);
long throttleTimeInNanos = rateLimiter.pause(bytes);
indexState.addTargetThrottling(throttleTimeInNanos);
recoveryTarget.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos);
}
}
recoveryTarget.writeFileChunk(request.metadata(), request.position(), request.content(),
request.lastChunk(), request.totalTranslogOps()
);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
Source端发送完所有差异文件之后,会发送CLEAN_FILES请求给Target端,Target端处理CLEAN_FILES的handler是:
//PeerRecoveryTargetService.CleanFilesRequestHandler
@Override
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.sourceMetaSnapshot());
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
//RecoveryTarget
@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps);
// first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas
// to recover from in case of a full cluster shutdown just when this code executes...
//将从Source端接收文件时创建的临时命名为正式文件,根据注释可知可能会覆盖Target端已有文件。
renameAllTempFiles();
final Store store = store();
store.incRef();
try {
//Target端清理本地原有文件,但是根据Source端发送过来恢复信息判断恢复之后不需要的文件,并对接收的文件进行验证
store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData);
if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) {
store.ensureIndexHasHistoryUUID();
}
...
//其他相关处理
}
至此Source端phase1已经执行结束。Source端已经将本地快照文件发送给了Target端,Target端也已经对接收到的快照文件进行了处理,下面就需要进入Source端的phase2阶段,phase2端只要处理事务日志。
在进入phase2阶段之前,Source端还发送了PREPARE_TRANSLOG请求给Rarget端,告诉其打开Engine
(ElasticSearch使用Engine
执行写入操作),准备好接收Translog并重放。
Target端使用PrepareForTranslogOperationsRequestHandler
处理PREPARE_TRANSLOG请求:
//PeerRecoveryTargetService.PrepareForTranslogOperationsRequestHandler
@Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel,
Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
//RecoveryTarget
@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps);
indexShard().openEngineAndSkipTranslogRecovery();
}
5.2.2 phase2:source端向Target发送Tanslog
Source端在phase2主要向Target发送事务日志:
//RecoverySourceHandler
long phase2(...)
...
// send all the snapshot's translog operations to the target
//调用RemoteRecoveryTargetHandler发送事务日志到Target端
//使用TRANSLOG_OPS请求
final SendSnapshotResult result = sendSnapshot(
startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
...
return result.targetLocalCheckpoint;
}
Target处理TRANSLOG_OPS请求的handler为TranslogOperationsRequestHandler
:
//PeerRecoveryTargetService.TranslogOperationsRequestHandler
//这里不再展示TranslogOperationsRequestHandler源码,可以自行查看,//主要逻辑是Target端依次取出事务日志中的每个操作在本地进行重放。
5.2.3 收尾工作finalizeRecovery
Source端RecoverySourceHandler.recoverToTarget
最后一步就是finalizeRecovery
//RecoverySourceHandler
/*
* finalizes the recovery process
*/
public void finalizeRecovery(final long targetLocalCheckpoint) throws IOException {
...
//忽略其他的逻辑,我们主要看下这里,如果Source端是主分片,则进行的是
//主分片重定位(PrimaryRelocation)
//恢复结束后需要移交Primary上下文给Target端
if (request.isPrimaryRelocation()) {
logger.trace("performing relocation hand-off");
// this acquires all IndexShard operation permits and will thus delay new recoveries until it is done
cancellableThreads.execute(() -> shard.relocated(recoveryTarget::handoffPrimaryContext));
/*
* if the recovery process fails after disabling primary mode on the source shard, both relocation source and
* target are failed (see {@link IndexShard#updateRoutingEntry}).
*/
}
stopWatch.stop();
logger.trace("finalizing recovery took [{}]", stopWatch.totalTime());
}
Source端移交Primary上下文主要通过发送HANDOFF_PRIMARY_CONTEXT报文:
@Override
public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
transportService.submitRequest(
targetNode,
PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT,
new RecoveryHandoffPrimaryContextRequest(recoveryId, shardId, primaryContext),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
Target端使用HandoffPrimaryContextRequestHandler
处理HANDOFF_PRIMARY_CONTEXT报文:
//PeerRecoveryTargetService.HandoffPrimaryContextRequestHandler
@Override
public void messageReceived(final RecoveryHandoffPrimaryContextRequest request, final TransportChannel channel,
Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
recoveryRef.target().handoffPrimaryContext(request.primaryContext());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
//RecoveryTarget
@Override
public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
indexShard.activateWithPrimaryContext(primaryContext);
}
Target端主要调用indexShard.activateWithPrimaryContext
完成Primary上下文的接手,这里涉及到ReplicationTracker
,比较复杂,后续会有专文介绍。
ReplicationTracker
的介绍参考文章ElasticSearch ReplicationTracker
网友评论