美文网首页
ElasticSearch 写流程

ElasticSearch 写流程

作者: persisting_ | 来源:发表于2018-12-02 21:48 被阅读0次

ElasticSearch写流程

1 概览

来自官网的图片:


官网索引流程.png
  1. 假设任意节点接受到了客户端的写请求,如果该节点(如node1)首先会根据文档ID确定该文档所在主分片的位置(如node2),node1会将写请求发送给node2;
  2. node2为保存该文档的主分片,其收到写请求之后会执行写操作,执行完之后会发送该请求给该包含该文档的所有副分片并等待副分片的执行结果。
  3. 副分片收到请求之后会执行该请求,并及时反馈给node2。
  4. 当node2接收到所有副分片写入成功的反馈后,会返回操作结果给node1。

2 使用Java写入的编码

以TransportClient为例,Restful接口会有Http请求到Action之间的流程需要走,具体见ElasticSearch Rest和(RPC)NodeClient

//创建Client
TransportClient client = createClient(clustName, sourceIp, sourcePort);
//client.prepareBulk会设置action为BulkAction.INSTANCE
BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.add(client.prepareIndex(indexName, typeName).setSource(json));
//重新创建一个bulk
bulkRequest = client.prepareBulk();

bulkRequest.execute().actionGet();

在调用bulkRequest.execute()之后,最终会进入NodeClient父类AbstractClient中的方法:

//AbstractClient
@Override
public final <Request extends ActionRequest, Response extends ActionResponse> ActionFuture<Response> execute(
    Action<Response> action, Request request) {
    //创建一个Future对象,PlainActionFuture既是一个Future,也是一个
    //ActionListener,可以为异步调用注册回调方法,也可以实现同步等待,
    //其继承关系如下图所示
    PlainActionFuture<Response> actionFuture = PlainActionFuture.newFuture();
    execute(action, request, actionFuture);
    return actionFuture;
}

@Override
public final <Request extends ActionRequest, Response extends ActionResponse> void execute(
    Action<Response> action, Request request, ActionListener<Response> listener) {
    listener = threadedWrapper.wrap(listener);
    //doExecute在子类NodeClient中实现
    doExecute(action, request, listener);
}
//NodeClient
@Override
public <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
    // Discard the task because the Client interface doesn't use it.
    executeLocally(action, request, listener);
}
//action为client.prepareBulk设置的action为BulkAction.INSTANCE
 /**
 * Execute an {@link Action} locally, returning that {@link Task} used to track it, and linking an {@link ActionListener}. Prefer this
 * method if you don't need access to the task when listening for the response. This is the method used to implement the {@link Client}
 * interface.
 */
public <    Request extends ActionRequest,
            Response extends ActionResponse
        > Task executeLocally(Action<Response> action, Request request, ActionListener<Response> listener) {
    //transportAction(BulkAction.INSTANCE)会返回ActionModule中注册的TransportBulkAction
    //ActionModule.setupActions() actions.register(BulkAction.INSTANCE, TransportBulkAction.class,
                TransportShardBulkAction.class);
    return transportAction(action).execute(request, listener);
}

transportAction(action).execute(request, listener)会先执行actionFilter,然后执行TransportBulkAction的doExecute方法


PlainActionFuture类继承层次.png

3 TransportBulkAction中的流程

目前写入操作由TransportBulkAction处理,该类也会在ActionModule中进行注册。
TransportBulkAction继承层次如下:


3.1 判断是否存在需要自动创建的索引

TransportBulkAction.doExecute首先会判断是否开启了自动创建不存在的索引以及是否存在需要自动创建的不存在索引,如果存在需要自动创建,则创建之后会执行具体的写操作;否则直接执行写操作executeIngestAndBulk

//TransportBulkAction
@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
final long startTime = relativeTime();
//首先创建一个和request数量一致的response用于接受返回结果
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
//return autoCreateIndex.needToCheck();判断是否配置为需要自动创建不存在的索引
if (needToCheck()) {
    // Step 1: collect all the indices in the request
    ...
    /* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create
        * that we'll use when we try to run the requests. */
    ...
    // Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
    //配置为自动创建不存在的索引,但是没有检测到执行request需要创建
    //不存在的索引
    if (autoCreateIndices.isEmpty()) {
        executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
    } else {//创建索引后再执行操作
        //counter为需要创建的索引数量
        final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
        for (String index : autoCreateIndices) {
            createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() {
                @Override
                public void onResponse(CreateIndexResponse result) {
                    //所有索引创建完之后再执行具体的bulk操作
                    if (counter.decrementAndGet() == 0) {
                        executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
                    }
                }

                @Override
                public void onFailure(Exception e) {
                    if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) {
                        // fail all requests involving this index, if create didn't work
                        for (int i = 0; i < bulkRequest.requests.size(); i++) {
                            DocWriteRequest<?> request = bulkRequest.requests.get(i);
                            if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) {
                                bulkRequest.requests.set(i, null);
                            }
                        }
                    }
                    if (counter.decrementAndGet() == 0) {
                        executeIngestAndBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
                            inner.addSuppressed(e);
                            listener.onFailure(inner);
                        }), responses, indicesThatCannotBeCreated);
                    }
                }
            });
        }
    }
} else {
    //如果没有需要自动创建的索引,则直接执行具体的写操作
    executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
}

3.2 pipeline的执行

下面具体看executeIngestAndBulk函数

//TransportBulkAction
private void executeIngestAndBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos,
    final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses,
    Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
    //下面的代码主要判断是否需要执行pipeline
    boolean hasIndexRequestsWithPipelines = false;
    ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices();
    for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
        if (actionRequest instanceof IndexRequest) {
            IndexRequest indexRequest = (IndexRequest) actionRequest;
            String pipeline = indexRequest.getPipeline();
            if (pipeline == null) {
                IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
                if (indexMetaData == null) {
                    indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
                } else {
                    String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
                    indexRequest.setPipeline(defaultPipeline);
                    if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
                        hasIndexRequestsWithPipelines = true;
                    }
                }
            } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
                //注意这里IngestService.NOOP_PIPELINE_NAME是退出
                //pipeline执行的关键,在执行过pipeline之后,会再次回
                //到doExecute方法中,但是在返回之前会调用
                //request.setPipeline(IngestService.NOOP_PIPELINE_NAME),
                //保证不会再次执行
                hasIndexRequestsWithPipelines = true;
            }
        }
    }
    //如果需要执行pipeline,则先执行pipeline
    if (hasIndexRequestsWithPipelines) {
        try {
            //这里也有一个知识点,如果当前节点不是Ingetst节点,
            //则需要转发给具有Ingest功能的节点
            if (clusterService.localNode().isIngestNode()) {
                processBulkIndexIngestRequest(task, bulkRequest, listener);
            } else {
                ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
            }
        } catch (Exception e) {
            listener.onFailure(e);
        }
    } else {//没有pipeline需要执行,则直接进行bulk操作
        executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated);
    }
}

下面对执行pipeline的调用做简单的介绍,pipeline的执行是在函数processBulkIndexIngestRequest(task, bulkRequest, listener)中完成的。

processBulkIndexIngestRequest函数体如下:

//TransportBulkAction
void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) {
long ingestStartTimeInNanos = System.nanoTime();
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
//使用ingestService执行具体的pipeline
ingestService.executeBulkRequest(() -> bulkRequestModifier,
    (indexRequest, exception) -> {
        logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]",
            indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception);
        bulkRequestModifier.markCurrentItemAsFailed(exception);
    }, (exception) -> {
        if (exception != null) {
            logger.error("failed to execute pipeline for a bulk request", exception);
            listener.onFailure(exception);
        } else {
            long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos);
            BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
            ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, listener);
            if (bulkRequest.requests().isEmpty()) {
                // at this stage, the transport bulk action can't deal with a bulk request with no requests,
                // so we stop and send an empty response back to the client.
                // (this will happen if pre-processing all items in the bulk failed)
                actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
            } else {
                //pipeline执行完之后,会再次调用
                //TransportBulkAction.doExecute方法,但是在调用之前
                //已经将bulkRequest的pipeline设置为
                //IngestService.NOOP_PIPELINE_NAME,所以再次进入
                //doExecute方法时,不会执行pipeline
                doExecute(task, bulkRequest, actionListener);
            }
        }
    },
    indexRequest -> bulkRequestModifier.markCurrentItemAsDropped());
}

//IngestService
public void executeBulkRequest(Iterable<DocWriteRequest<?>> actionRequests,
    BiConsumer<IndexRequest, Exception> itemFailureHandler, Consumer<Exception> completionHandler,
    Consumer<IndexRequest> itemDroppedHandler) {

    threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {

        @Override
        public void onFailure(Exception e) {
            completionHandler.accept(e);
        }

        @Override
        protected void doRun() {
            for (DocWriteRequest<?> actionRequest : actionRequests) {
                IndexRequest indexRequest = null;
                if (actionRequest instanceof IndexRequest) {
                    indexRequest = (IndexRequest) actionRequest;
                } else if (actionRequest instanceof UpdateRequest) {
                    UpdateRequest updateRequest = (UpdateRequest) actionRequest;
                    indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
                }
                if (indexRequest == null) {
                    continue;
                }
                String pipelineId = indexRequest.getPipeline();
                if (NOOP_PIPELINE_NAME.equals(pipelineId) == false) {
                    try {
                        Pipeline pipeline = pipelines.get(pipelineId);
                        if (pipeline == null) {
                            throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
                        }
                        //进行具体的pipeline调用,不再展开
                        innerExecute(indexRequest, pipeline, itemDroppedHandler);
                        //如注释所说,将request的pipeline设置为NOOP_PIPELINE_NAME,防止二次执行
                        //this shouldn't be needed here but we do it for consistency with index api
                        // which requires it to prevent double execution
                        indexRequest.setPipeline(NOOP_PIPELINE_NAME);
                    } catch (Exception e) {
                        itemFailureHandler.accept(indexRequest, e);
                    }
                }
            }
            completionHandler.accept(null);
        }
    });
}

3.3 执行完Pipeline进行具体的request操作

函数为TransportBulkAction.executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated);

//TransportBulkAction
void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener,
            final AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
    new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos, indicesThatCannotBeCreated).run();
}

BulkOperation.run会调用doRun,其方法体如下:

@Override
protected void doRun() throws Exception {
    //下面省略的代码为具体的操作(CREATE、INDEX、UPDATE、DELETE)进行必要的准备
    ...
    //定义计数器,下面会依次执行每个request,每个request执行返回后
    //会递减该计数器,减到0之后(批量请求中所有request执行完毕)
    //会返回给客户端(调用finishHim)
    final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
    String nodeId = clusterService.localNode().getId();
    for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
        final ShardId shardId = entry.getKey();
        final List<BulkItemRequest> requests = entry.getValue();
        BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
                requests.toArray(new BulkItemRequest[requests.size()]));
        bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
        bulkShardRequest.timeout(bulkRequest.timeout());
        if (task != null) {
            bulkShardRequest.setParentTask(nodeId, task.getId());
        }
        //TransportShardBulkAction具体执行某一request
        shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
            @Override
            public void onResponse(BulkShardResponse bulkShardResponse) {
                for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
                    // we may have no response if item failed
                    if (bulkItemResponse.getResponse() != null) {
                        bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
                    }
                    //前面声明用于接受返回结果的responses数组
                    responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
                }
                if (counter.decrementAndGet() == 0) {
                    finishHim();
                }
            }

            @Override
            public void onFailure(Exception e) {
                // create failures for all relevant requests
                for (BulkItemRequest request : requests) {
                    final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
                    DocWriteRequest<?> docWriteRequest = request.request();
                    responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(),
                            new BulkItemResponse.Failure(indexName, docWriteRequest.type(), docWriteRequest.id(), e)));
                }
                if (counter.decrementAndGet() == 0) {
                    finishHim();
                }
            }
            //最终返回给客户端的逻辑
            private void finishHim() {
                //listener为一开始传入TransportBulkAction的PlainActionFuture
                listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
            }
        });
    }
}

3.4 TransportShardBulkAction中具体执行某一request

TransportShardBulkAction继承层次.png

TransportShardBulkAction.execute最终会调用父类TransportReplicationAction的doExecute方法。

//TransportReplicationAction
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
    new ReroutePhase((ReplicationTask) task, request, listener).run();
}

ReroutePhase.run最终调用其doRun方法:

//ReroutePhase
 @Override
protected void doRun() {
    setPhase(task, "routing");
    final ClusterState state = observer.setAndGetObservedState();
    if (handleBlockExceptions(state)) {
        return;
    }

    // request does not have a shardId yet, we need to pass the concrete index to resolve shardId
    final String concreteIndex = concreteIndex(state);
    final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
    if (indexMetaData == null) {
        retry(new IndexNotFoundException(concreteIndex));
        return;
    }
    if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
        throw new IndexClosedException(indexMetaData.getIndex());
    }

    // resolve all derived request fields, so we can route and apply it
    resolveRequest(indexMetaData, request);
    assert request.shardId() != null : "request shardId must be set in resolveRequest";
    assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";

    final ShardRouting primary = primary(state);
    if (retryIfUnavailable(state, primary)) {
        return;
    }
    final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
    //下面为执行操作的阶段一,在主分片上执行具体的操作
    //如果当前节点为主分片所在节点,则本地执行request
    if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
        performLocalAction(state, primary, node, indexMetaData);
    } else {//否则发送给主分片所在节点执行request
        performRemoteAction(state, primary, node);
    }
}
//performLocalAction和performRemoteAction实现如下,最终都会调用
//performAction方法,不过传入的action分别为transportPrimaryAction
//(BulkAction.NAME + "[s][p]")和actionName(BulkAction.NAME + "[s]"),
//具体节点会根据此action找到注册到transportService中的请求处理Handler,
//除了上面两种action,还有一种是副分片使用的action,即transportReplicaAction(BulkAction.NAME + "[s][r]")
private void performLocalAction(ClusterState state, ShardRouting primary, DiscoveryNode node, IndexMetaData indexMetaData) {
    setPhase(task, "waiting_on_primary");
    ...
    performAction(node, transportPrimaryAction, true,
        new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetaData.primaryTerm(primary.id())));
}

private void performRemoteAction(ClusterState state, ShardRouting primary, DiscoveryNode node) {
    ...
    setPhase(task, "rerouted");
    performAction(node, actionName, false, request);
}

 protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<Request> request,
                                           Supplier<ReplicaRequest> replicaRequest, String executor) {
        //performAction发送的请求会由下面注册的handler进行处理
        //如果当前节点不是主分片,则会转发给相应的主分片,
        //主分片使用OperationTransportHandler进行处理,
        //OperationTransportHandler主要调用execute方法
        transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
        //主分片操作
        transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
            new PrimaryOperationTransportHandler());
        // we must never reject on because of thread pool capacity on replicas
        //副分片操作
        transportService.registerRequestHandler(transportReplicaAction,
            () -> new ConcreteReplicaRequest<>(replicaRequest),
            executor, true, true,
            new ReplicaOperationTransportHandler());
    }

3.4.1 主分片操作

主分片操作在类PrimaryOperationTransportHandler中定义

//PrimaryOperationTransportHandler
@Override
public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) {
    new AsyncPrimaryAction(request.request, request.targetAllocationID, request.primaryTerm, channel, (ReplicationTask) task).run();
}

//AsyncPrimaryAction
//注意下面传入的参数this,AsyncPrimaryAction也实现了ActionListener,所以会作为操作结果回调其onResponse函数
@Override
protected void doRun() throws Exception {
    acquirePrimaryShardReference(request.shardId(), targetAllocationID, primaryTerm, this, request);
}

/**
     * Tries to acquire reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally
     * and replication of the operation to all replica shards is completed / failed (see {@link ReplicationOperation}).
     */
    private void acquirePrimaryShardReference(ShardId shardId, String allocationId, long primaryTerm,
                                              ActionListener<PrimaryShardReference> onReferenceAcquired, Object debugInfo) {
        ...
        //定义获取Permit成功之后的Listener,成功之后会调用AsyncPrimaryAction的onResponse方法,
        //参数为new PrimaryShardReference(indexShard, releasable)

        ActionListener<Releasable> onAcquired = new ActionListener<Releasable>() {
            @Override
            public void onResponse(Releasable releasable) {
                onReferenceAcquired.onResponse(new PrimaryShardReference(indexShard, releasable));
            }

            @Override
            public void onFailure(Exception e) {
                onReferenceAcquired.onFailure(e);
            }
        };
        //具体获取Permit
        indexShard.acquirePrimaryOperationPermit(onAcquired, executor, debugInfo);
    }
//PrimaryShardReference的perform方法会调用TransportShardBulkAction的具体写操作shardOperationOnPrimary
class PrimaryShardReference extends ShardReference
            implements ReplicationOperation.Primary<Request, ReplicaRequest, PrimaryResult<ReplicaRequest, Response>> {

    ...

    @Override
    public PrimaryResult perform(Request request) throws Exception {
        //shardOperationOnPrimary方法为TransportShardBulkAction中定义的写操作在主分片
        //中的具体操作
        PrimaryResult result = shardOperationOnPrimary(request, indexShard);
        assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest()
            + "] with a primary failure [" + result.finalFailure + "]";
        return result;
    }
    ...
}

获取Permits成功之后,会调用传入的Listener的onResonse方法,这里传入的是AsyncPrimaryAction,
所以调用AsyncPrimaryAction的onResponse方法

//AsyncPrimaryAction
 @Override
public void onResponse(PrimaryShardReference primaryShardReference) {
    try {
        if (primaryShardReference.isRelocated()) {
            ...
        } else {
            setPhase(replicationTask, "primary");
            //该listener主要负责在操作结束之后释放permit并返回结果给请求节点
            final ActionListener<Response> listener = createResponseListener(primaryShardReference);
            createReplicatedOperation(request,
                    ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
                    primaryShardReference)
                    .execute();
        }
    } catch (Exception e) {
        Releasables.closeWhileHandlingException(primaryShardReference); // release shard operation lock before responding to caller
        onFailure(e);
    }
}
 protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult<ReplicaRequest, Response>> createReplicatedOperation(
    Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
    PrimaryShardReference primaryShardReference) {
    return new ReplicationOperation<>(request, primaryShardReference, listener,
            newReplicasProxy(primaryTerm), logger, actionName);
}

//ReplicationOperation
public void execute() throws Exception {
    final String activeShardCountFailure = checkActiveShardCount();
    final ShardRouting primaryRouting = primary.routingEntry();
    final ShardId primaryId = primaryRouting.shardId();
    if (activeShardCountFailure != null) {
        finishAsFailed(new UnavailableShardsException(primaryId,
            "{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
        return;
    }

    totalShards.incrementAndGet();
    pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
    //调用TransportShardBulkAction中的具体主分片操作
    primaryResult = primary.perform(request);
    primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());
    final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
    if (replicaRequest != null) {
        if (logger.isTraceEnabled()) {
            logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
        }

        // we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics.
        // we have to make sure that every operation indexed into the primary after recovery start will also be replicated
        // to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
        // we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint
        // is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset
        // of the sampled replication group, and advanced further than what the given replication group would allow it to.
        // This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
        final long globalCheckpoint = primary.globalCheckpoint();
        // we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of
        // max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed on.
        final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
        assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";
        final ReplicationGroup replicationGroup = primary.getReplicationGroup();
        markUnavailableShardsAsStale(replicaRequest, replicationGroup);
        //进行副分片操作,具体也是发送之前采用计数器记录总操作个数,
        //每完成一个就减少,减少到0之后则分片全部执行完毕
        performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
    }

    successfulShards.incrementAndGet();  // mark primary as successful
    decPendingAndFinishIfNeeded();
}

在副分片上进行的具体写操作与主分片类似,下面不再赘述

相关文章

网友评论

      本文标题:ElasticSearch 写流程

      本文链接:https://www.haomeiwen.com/subject/zxkscqtx.html