1 概览
- 假设任意节点接受到了客户端的写请求,如果该节点(如node1)首先会根据文档ID确定该文档所在主分片的位置(如node2),node1会将写请求发送给node2;
- node2为保存该文档的主分片,其收到写请求之后会执行写操作,执行完之后会发送该请求给该包含该文档的所有副分片并等待副分片的执行结果。
- 副分片收到请求之后会执行该请求,并及时反馈给node2。
- 当node2接收到所有副分片写入成功的反馈后,会返回操作结果给node1。
2 使用Java写入的编码
以TransportClient为例,Restful接口会有Http请求到Action之间的流程需要走,具体见ElasticSearch Rest和(RPC)NodeClient
TransportClient client = createClient(clustName, sourceIp, sourcePort);
BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.add(client.prepareIndex(indexName, typeName).setSource(json));
bulkRequest = client.prepareBulk();
public final <Request extends ActionRequest, Response extends ActionResponse> ActionFuture<Response> execute(
Action<Response> action, Request request) {
PlainActionFuture<Response> actionFuture = PlainActionFuture.newFuture();
execute(action, request, actionFuture);
return actionFuture;
public final <Request extends ActionRequest, Response extends ActionResponse> void execute(
Action<Response> action, Request request, ActionListener<Response> listener) {
listener = threadedWrapper.wrap(listener);
doExecute(action, request, listener);
public <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
// Discard the task because the Client interface doesn't use it.
executeLocally(action, request, listener);
* Execute an {@link Action} locally, returning that {@link Task} used to track it, and linking an {@link ActionListener}. Prefer this
* method if you don't need access to the task when listening for the response. This is the method used to implement the {@link Client}
* interface.
public < Request extends ActionRequest,
Response extends ActionResponse
> Task executeLocally(Action<Response> action, Request request, ActionListener<Response> listener) {
//ActionModule.setupActions() actions.register(BulkAction.INSTANCE, TransportBulkAction.class,
return transportAction(action).execute(request, listener);
transportAction(action).execute(request, listener)会先执行actionFilter,然后执行TransportBulkAction的doExecute方法
3 TransportBulkAction中的流程
3.1 判断是否存在需要自动创建的索引
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
final long startTime = relativeTime();
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
//return autoCreateIndex.needToCheck();判断是否配置为需要自动创建不存在的索引
if (needToCheck()) {
// Step 1: collect all the indices in the request
/* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create
* that we'll use when we try to run the requests. */
// Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
if (autoCreateIndices.isEmpty()) {
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
} else {//创建索引后再执行操作
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
for (String index : autoCreateIndices) {
createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() {
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
public void onFailure(Exception e) {
if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) {
// fail all requests involving this index, if create didn't work
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> request = bulkRequest.requests.get(i);
if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) {
bulkRequest.requests.set(i, null);
if (counter.decrementAndGet() == 0) {
executeIngestAndBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
}), responses, indicesThatCannotBeCreated);
} else {
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
3.2 pipeline的执行
private void executeIngestAndBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos,
final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses,
Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
boolean hasIndexRequestsWithPipelines = false;
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
String pipeline = indexRequest.getPipeline();
if (pipeline == null) {
IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
if (indexMetaData == null) {
} else {
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
hasIndexRequestsWithPipelines = true;
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
hasIndexRequestsWithPipelines = true;
if (hasIndexRequestsWithPipelines) {
try {
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, listener);
} else {
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
} catch (Exception e) {
} else {//没有pipeline需要执行,则直接进行bulk操作
executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated);
下面对执行pipeline的调用做简单的介绍,pipeline的执行是在函数processBulkIndexIngestRequest(task, bulkRequest, listener)中完成的。
void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) {
long ingestStartTimeInNanos = System.nanoTime();
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
ingestService.executeBulkRequest(() -> bulkRequestModifier,
(indexRequest, exception) -> {
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]",
indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception);
}, (exception) -> {
if (exception != null) {
logger.error("failed to execute pipeline for a bulk request", exception);
} else {
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos);
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, listener);
if (bulkRequest.requests().isEmpty()) {
// at this stage, the transport bulk action can't deal with a bulk request with no requests,
// so we stop and send an empty response back to the client.
// (this will happen if pre-processing all items in the bulk failed)
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
} else {
doExecute(task, bulkRequest, actionListener);
indexRequest -> bulkRequestModifier.markCurrentItemAsDropped());
public void executeBulkRequest(Iterable<DocWriteRequest<?>> actionRequests,
BiConsumer<IndexRequest, Exception> itemFailureHandler, Consumer<Exception> completionHandler,
Consumer<IndexRequest> itemDroppedHandler) {
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
public void onFailure(Exception e) {
protected void doRun() {
for (DocWriteRequest<?> actionRequest : actionRequests) {
IndexRequest indexRequest = null;
if (actionRequest instanceof IndexRequest) {
indexRequest = (IndexRequest) actionRequest;
} else if (actionRequest instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) actionRequest;
indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
if (indexRequest == null) {
String pipelineId = indexRequest.getPipeline();
if (NOOP_PIPELINE_NAME.equals(pipelineId) == false) {
try {
Pipeline pipeline = pipelines.get(pipelineId);
if (pipeline == null) {
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
innerExecute(indexRequest, pipeline, itemDroppedHandler);
//this shouldn't be needed here but we do it for consistency with index api
// which requires it to prevent double execution
} catch (Exception e) {
itemFailureHandler.accept(indexRequest, e);
3.3 执行完Pipeline进行具体的request操作
函数为TransportBulkAction.executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated);
void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener,
final AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos, indicesThatCannotBeCreated).run();
protected void doRun() throws Exception {
final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
String nodeId = clusterService.localNode().getId();
for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
final ShardId shardId = entry.getKey();
final List<BulkItemRequest> requests = entry.getValue();
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
requests.toArray(new BulkItemRequest[requests.size()]));
if (task != null) {
bulkShardRequest.setParentTask(nodeId, task.getId());
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
public void onResponse(BulkShardResponse bulkShardResponse) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
// we may have no response if item failed
if (bulkItemResponse.getResponse() != null) {
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
if (counter.decrementAndGet() == 0) {
public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
DocWriteRequest<?> docWriteRequest = request.request();
responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.type(), docWriteRequest.id(), e)));
if (counter.decrementAndGet() == 0) {
private void finishHim() {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
3.4 TransportShardBulkAction中具体执行某一request
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
new ReroutePhase((ReplicationTask) task, request, listener).run();
protected void doRun() {
setPhase(task, "routing");
final ClusterState state = observer.setAndGetObservedState();
if (handleBlockExceptions(state)) {
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
final String concreteIndex = concreteIndex(state);
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
if (indexMetaData == null) {
retry(new IndexNotFoundException(concreteIndex));
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
throw new IndexClosedException(indexMetaData.getIndex());
// resolve all derived request fields, so we can route and apply it
resolveRequest(indexMetaData, request);
assert request.shardId() != null : "request shardId must be set in resolveRequest";
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";
final ShardRouting primary = primary(state);
if (retryIfUnavailable(state, primary)) {
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
performLocalAction(state, primary, node, indexMetaData);
} else {//否则发送给主分片所在节点执行request
performRemoteAction(state, primary, node);
//(BulkAction.NAME + "[s][p]")和actionName(BulkAction.NAME + "[s]"),
//除了上面两种action,还有一种是副分片使用的action,即transportReplicaAction(BulkAction.NAME + "[s][r]")
private void performLocalAction(ClusterState state, ShardRouting primary, DiscoveryNode node, IndexMetaData indexMetaData) {
setPhase(task, "waiting_on_primary");
performAction(node, transportPrimaryAction, true,
new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetaData.primaryTerm(primary.id())));
private void performRemoteAction(ClusterState state, ShardRouting primary, DiscoveryNode node) {
setPhase(task, "rerouted");
performAction(node, actionName, false, request);
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
new PrimaryOperationTransportHandler());
// we must never reject on because of thread pool capacity on replicas
() -> new ConcreteReplicaRequest<>(replicaRequest),
executor, true, true,
new ReplicaOperationTransportHandler());
3.4.1 主分片操作
public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) {
new AsyncPrimaryAction(request.request, request.targetAllocationID, request.primaryTerm, channel, (ReplicationTask) task).run();
protected void doRun() throws Exception {
acquirePrimaryShardReference(request.shardId(), targetAllocationID, primaryTerm, this, request);
* Tries to acquire reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally
* and replication of the operation to all replica shards is completed / failed (see {@link ReplicationOperation}).
private void acquirePrimaryShardReference(ShardId shardId, String allocationId, long primaryTerm,
ActionListener<PrimaryShardReference> onReferenceAcquired, Object debugInfo) {
//参数为new PrimaryShardReference(indexShard, releasable)
ActionListener<Releasable> onAcquired = new ActionListener<Releasable>() {
public void onResponse(Releasable releasable) {
onReferenceAcquired.onResponse(new PrimaryShardReference(indexShard, releasable));
public void onFailure(Exception e) {
indexShard.acquirePrimaryOperationPermit(onAcquired, executor, debugInfo);
class PrimaryShardReference extends ShardReference
implements ReplicationOperation.Primary<Request, ReplicaRequest, PrimaryResult<ReplicaRequest, Response>> {
public PrimaryResult perform(Request request) throws Exception {
PrimaryResult result = shardOperationOnPrimary(request, indexShard);
assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest()
+ "] with a primary failure [" + result.finalFailure + "]";
return result;
public void onResponse(PrimaryShardReference primaryShardReference) {
try {
if (primaryShardReference.isRelocated()) {
} else {
setPhase(replicationTask, "primary");
final ActionListener<Response> listener = createResponseListener(primaryShardReference);
ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
} catch (Exception e) {
Releasables.closeWhileHandlingException(primaryShardReference); // release shard operation lock before responding to caller
protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult<ReplicaRequest, Response>> createReplicatedOperation(
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
PrimaryShardReference primaryShardReference) {
return new ReplicationOperation<>(request, primaryShardReference, listener,
newReplicasProxy(primaryTerm), logger, actionName);
public void execute() throws Exception {
final String activeShardCountFailure = checkActiveShardCount();
final ShardRouting primaryRouting = primary.routingEntry();
final ShardId primaryId = primaryRouting.shardId();
if (activeShardCountFailure != null) {
finishAsFailed(new UnavailableShardsException(primaryId,
"{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
primaryResult = primary.perform(request);
primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest != null) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
// we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics.
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
// to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
// we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint
// is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset
// of the sampled replication group, and advanced further than what the given replication group would allow it to.
// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
final long globalCheckpoint = primary.globalCheckpoint();
// we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of
// max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed on.
final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
successfulShards.incrementAndGet(); // mark primary as successful