Bulk注册
在启动类BootStrap的start()方法中,启动了node.start()方法。在Node初始化的过程中,加载了一系列的模块和插件,其中包含ActionModel。
ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),
threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService);
modules.add(actionModule);
在ActionModel中,注册了我们常用的一些操作action,比如说我们这次解析的BulkAction:
actions.register(UpdateAction.INSTANCE, TransportUpdateAction.class);
actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class,TransportShardMultiGetAction.class);
actions.register(BulkAction.INSTANCE, TransportBulkAction.class,TransportShardBulkAction.class);
并且初始化RestHandler:
registerHandler.accept(new RestMultiTermVectorsAction(settings, restController));
registerHandler.accept(new RestBulkAction(settings, restController));
registerHandler.accept(new RestUpdateAction(settings, restController));
在RestBulkAction中规定了我们的查询方式:
controller.registerHandler(POST, "/_bulk", this);
controller.registerHandler(PUT, "/_bulk", this);
controller.registerHandler(POST, "/{index}/_bulk", this);
controller.registerHandler(PUT, "/{index}/_bulk", this);
controller.registerHandler(POST, "/{index}/{type}/_bulk", this);
controller.registerHandler(PUT, "/{index}/{type}/_bulk", this);
接收到请求
RestBulkAction在prepareRequest方法中将我们普通的RestRequest转化为BulkReqest,并通过NodeClient调用:
channel -> client.bulk(bulkRequest, new RestStatusToXContentListener<>(channel));
而在NodeClient的bulk中则是调用了NodeClient的doExecute()方法。
doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener)
传入的Action是BulkAction.Instance,request就是上一步封装的BulkRequest,listener则是监听器。
在doExecute方法中,首先将普通的action转化为tansportAction,然后用转化后的tansportAction执行该请求:
transportAction(action).execute(request, listener);
bulkAction转化后变为TransportBulkAction,而TransportBulkAction的execute方法则是调用本身的doExecute()方法。在doExecut()方法中首先将存在和不存在的索引分类:
1)Step 1: collect all the indices in the request
2)Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create that we'll use when we try to run the requests.
3)Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
然后执行executeBulk()方法,接着在executeBulk中创建一个BulkOperation,并开始执行该BulkOperation:
void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener,
final AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos, indicesThatCannotBeCreated).run();
}
在BulkOperation中存在两次遍历Bulk中所有的请求,第一次遍历则将给该请求设置Routing,Mapping等等,如果允许产生ID,则自动生成ID。第二次遍历则是根据shardID将请求分类。ES官网有说到批量处理时让用bulk,原因是bulk处理请求时做了一些底层的优化。这就是一个优化点,将同一个shard的请求集合在一起直接发送到节点对应的shard,避免请求在节点间传递,影响效率。
for (int i = 0; i < bulkRequest.requests.size(); i++) {
....
switch (docWriteRequest.opType()) {
case CREATE:
case INDEX:
IndexRequest indexRequest = (IndexRequest) docWriteRequest;
MappingMetaData mappingMd = null;
final IndexMetaData indexMetaData = metaData.index(concreteIndex);
if (indexMetaData != null) {
mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
}
indexRequest.resolveRouting(metaData);
indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName());
....
}
....
}
....
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest request = bulkRequest.requests.get(i);
if (request == null) {
continue;
}
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
shardRequests.add(new BulkItemRequest(i, request));
}
然后针对不同的shardRequest,分别用shardBulkAction处理:
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {}
每个shard的处理流程
接下来就是复杂的类继承关系了:
TransportShardBulkAction>TransportWriteAction >TransportReplicationAction>TransportAction
上一步的shardBulkAction.execute()方法则是执行的TransportAction的execute方法。我看得源码版本是5.6版本的,与5.0版本相比,ES增加了一个
TransportWriteAction类,而且在TransportReplicationAction不是直接运行run方法,而是通过transportService的RPC接口在实现功能。具体的流程如下:
1)TransportAction.execute()方法会调用TransportReplicationAction的doExecute()方法
2)在TransportReplicationAction的doExecute()方法中执行ReroutePhase的run方法,run方法中根据请求的shardID获取到primary shardID,同时得到primary shard的NodeID,如果当前节点包含primary shard,则执行performLocalAction方法,否则执行performRemoteAction。
3)performLocalAction和performRemoteAction最终都将执行performAction方法,在performAction中我们可以看到,transportService发送请求:
transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {}
4)transportService接收到请求后用的PrimaryOperationTransportHandler处理,至于PrimaryOperationTransportHandler是在TransportReplicationAction中注册的:
transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
new PrimaryOperationTransportHandler());
5)PrimaryOperationTransportHandler则是一个primary操作的处理类,在这个类接收到信息之后调用AsyncPrimaryAction处理:
@Override
public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) {
new AsyncPrimaryAction(request.request, request.targetAllocationID, channel, (ReplicationTask) task).run();
}
6)在AsyncPrimaryAction中首先获取shard锁,如果成功的获取到锁则调用自身的onresponse()方法,否则将获取操作加入线程池:
synchronized (this) {
releasable = tryAcquire();
if (releasable == null) {
// blockOperations is executing, this operation will be retried by blockOperations once it finishes
if (delayedOperations == null) {
delayedOperations = new ArrayList<>();
}
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
if (executorOnDelay != null) {
delayedOperations.add(
new ThreadedActionListener<>(logger, threadPool, executorOnDelay,
new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution));
} else {
delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired));
}
return;
}
}
7)在onresponse中,如果该primaryShardReference已经被移动了,则获取到正确的primary shard和nodeID重新发送请求。否则就用primaryShardReference直接处理:
@Override
public void onResponse(PrimaryShardReference primaryShardReference) {
try {
if (primaryShardReference.isRelocated()) {
primaryShardReference.close(); // release shard operation lock as soon as possible
setPhase(replicationTask, "primary_delegation");
// delegate primary phase to relocation target
// it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary
// phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase.
final ShardRouting primary = primaryShardReference.routingEntry();
assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId());
transportService.sendRequest(relocatingNode, transportPrimaryAction,
new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId()),
transportOptions,
new TransportChannelResponseHandler<Response>(logger, channel, "rerouting indexing to target primary " + primary,
TransportReplicationAction.this::newResponseInstance) {
@Override
public void handleResponse(Response response) {
setPhase(replicationTask, "finished");
super.handleResponse(response);
}
@Override
public void handleException(TransportException exp) {
setPhase(replicationTask, "finished");
super.handleException(exp);
}
});
} else {
setPhase(replicationTask, "primary");
final IndexMetaData indexMetaData = clusterService.state().getMetaData().index(request.shardId().getIndex());
final boolean executeOnReplicas = (indexMetaData == null) || shouldExecuteReplication(indexMetaData);
final ActionListener<Response> listener = createResponseListener(primaryShardReference);
createReplicatedOperation(request,
ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
primaryShardReference, executeOnReplicas)
.execute();
}
} catch (Exception e) {
Releasables.closeWhileHandlingException(primaryShardReference); // release shard operation lock before responding to caller
onFailure(e);
}
}
8)createReplicatedOperation看名字还以为直接就是副本处理了,点进去看了之后才发现是先执行primary,后执行replia。
primaryResult = primary.perform(request);
...
performOnReplicas(replicaRequest, shards);
主分片处理
主分片的处理调用的是PrimaryShardReference.perform()方法,在该方法中则是调用shardOperationOnPrimary()进行主分片的处理。
shardOperationOnPrimary()方法则是由TransportShardBulkAction来实现的,具体执行的步骤如下:
1)获取节点中所有的索引元数据
2)获取版本号
3)更新mapping
4)调用Engin底层的代码。比如说primary.delete(delete),primary.index(operation)等等。
5)写到tanslog中
副本分片和主分片类似,这里就不做过多解释。
网友评论