1 ActionModule
在了解ES源代码增删改查之前, 首先要了解一下TransportClient build时的一个关键Action的注入------ActionModule
在ActionModule的configure函数中, 会注册各种各样的Action 并绑定到具体的Handler(实际上就是TransportRequestHandler), 这个Handler很关键
registerAction(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class);
registerAction(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
registerAction(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
registerAction(ListTasksAction.INSTANCE, TransportListTasksAction.class);
registerAction(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class);
registerAction(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class);
registerAction(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
registerAction(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class);
registerAction(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
registerAction(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class);
registerAction(ClusterSearchShardsAction.INSTANCE, TransportClusterSearchShardsAction.class);
registerAction(PendingClusterTasksAction.INSTANCE, TransportPendingClusterTasksAction.class);
registerAction(PutRepositoryAction.INSTANCE, TransportPutRepositoryAction.class);
registerAction(GetRepositoriesAction.INSTANCE, TransportGetRepositoriesAction.class);
registerAction(DeleteRepositoryAction.INSTANCE, TransportDeleteRepositoryAction.class);
registerAction(VerifyRepositoryAction.INSTANCE, TransportVerifyRepositoryAction.class);
registerAction(GetSnapshotsAction.INSTANCE, TransportGetSnapshotsAction.class);
registerAction(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class);
registerAction(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
registerAction(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
registerAction(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);
registerAction(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
registerAction(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
registerAction(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
registerAction(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
registerAction(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class);
registerAction(GetIndexAction.INSTANCE, TransportGetIndexAction.class);
registerAction(OpenIndexAction.INSTANCE, TransportOpenIndexAction.class);
registerAction(CloseIndexAction.INSTANCE, TransportCloseIndexAction.class);
registerAction(IndicesExistsAction.INSTANCE, TransportIndicesExistsAction.class);
registerAction(TypesExistsAction.INSTANCE, TransportTypesExistsAction.class);
registerAction(GetMappingsAction.INSTANCE, TransportGetMappingsAction.class);
registerAction(GetFieldMappingsAction.INSTANCE, TransportGetFieldMappingsAction.class, TransportGetFieldMappingsIndexAction.class);
registerAction(PutMappingAction.INSTANCE, TransportPutMappingAction.class);
registerAction(IndicesAliasesAction.INSTANCE, TransportIndicesAliasesAction.class);
registerAction(UpdateSettingsAction.INSTANCE, TransportUpdateSettingsAction.class);
registerAction(AnalyzeAction.INSTANCE, TransportAnalyzeAction.class);
registerAction(PutIndexTemplateAction.INSTANCE, TransportPutIndexTemplateAction.class);
registerAction(GetIndexTemplatesAction.INSTANCE, TransportGetIndexTemplatesAction.class);
registerAction(DeleteIndexTemplateAction.INSTANCE, TransportDeleteIndexTemplateAction.class);
registerAction(ValidateQueryAction.INSTANCE, TransportValidateQueryAction.class);
registerAction(RefreshAction.INSTANCE, TransportRefreshAction.class);
registerAction(FlushAction.INSTANCE, TransportFlushAction.class);
registerAction(SyncedFlushAction.INSTANCE, TransportSyncedFlushAction.class);
registerAction(ForceMergeAction.INSTANCE, TransportForceMergeAction.class);
registerAction(UpgradeAction.INSTANCE, TransportUpgradeAction.class);
registerAction(UpgradeStatusAction.INSTANCE, TransportUpgradeStatusAction.class);
registerAction(UpgradeSettingsAction.INSTANCE, TransportUpgradeSettingsAction.class);
registerAction(ClearIndicesCacheAction.INSTANCE, TransportClearIndicesCacheAction.class);
registerAction(PutWarmerAction.INSTANCE, TransportPutWarmerAction.class);
registerAction(DeleteWarmerAction.INSTANCE, TransportDeleteWarmerAction.class);
registerAction(GetWarmersAction.INSTANCE, TransportGetWarmersAction.class);
registerAction(GetAliasesAction.INSTANCE, TransportGetAliasesAction.class);
registerAction(AliasesExistAction.INSTANCE, TransportAliasesExistAction.class);
registerAction(GetSettingsAction.INSTANCE, TransportGetSettingsAction.class);
registerAction(IndexAction.INSTANCE, TransportIndexAction.class);
registerAction(GetAction.INSTANCE, TransportGetAction.class);
registerAction(TermVectorsAction.INSTANCE, TransportTermVectorsAction.class,
TransportDfsOnlyAction.class);
registerAction(MultiTermVectorsAction.INSTANCE, TransportMultiTermVectorsAction.class,
TransportShardMultiTermsVectorAction.class);
registerAction(DeleteAction.INSTANCE, TransportDeleteAction.class);
registerAction(ExistsAction.INSTANCE, TransportExistsAction.class);
registerAction(SuggestAction.INSTANCE, TransportSuggestAction.class);
registerAction(UpdateAction.INSTANCE, TransportUpdateAction.class);
registerAction(MultiGetAction.INSTANCE, TransportMultiGetAction.class,
TransportShardMultiGetAction.class);
registerAction(BulkAction.INSTANCE, TransportBulkAction.class,
TransportShardBulkAction.class);
registerAction(SearchAction.INSTANCE, TransportSearchAction.class);
registerAction(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
registerAction(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
registerAction(PercolateAction.INSTANCE, TransportPercolateAction.class);
registerAction(MultiPercolateAction.INSTANCE, TransportMultiPercolateAction.class, TransportShardMultiPercolateAction.class);
registerAction(ExplainAction.INSTANCE, TransportExplainAction.class);
registerAction(ClearScrollAction.INSTANCE, TransportClearScrollAction.class);
registerAction(RecoveryAction.INSTANCE, TransportRecoveryAction.class);
registerAction(RenderSearchTemplateAction.INSTANCE, TransportRenderSearchTemplateAction.class);
//Indexed scripts
registerAction(PutIndexedScriptAction.INSTANCE, TransportPutIndexedScriptAction.class);
registerAction(GetIndexedScriptAction.INSTANCE, TransportGetIndexedScriptAction.class);
registerAction(DeleteIndexedScriptAction.INSTANCE, TransportDeleteIndexedScriptAction.class);
registerAction(FieldStatsAction.INSTANCE, TransportFieldStatsTransportAction.class);
调用的注册Handler的代码如下:
protected <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
synchronized (requestHandlerMutex) {
RequestHandlerRegistry replaced = requestHandlers.get(reg.getAction());
requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();
if (replaced != null) {
logger.warn("registered two transport handlers for action {}, handlers: {}, {}", reg.getAction(), reg, replaced);
}
}
}
TransportService里有一个内部类, Adapter:
protected class Adapter implements TransportServiceAdapter
image.png
这个Adapter里有个方法是根据Action获取RequestHandlerRegistry 对象(关键, 后面会用到)
@Override
public RequestHandlerRegistry getRequestHandler(String action) {
return requestHandlers.get(action);
}
2 以Get来解析
这里以Get操作来解析, 从ActionModule依赖注入到使用Get操作的Handler流程, 中间涉及了Netty的RPC.
这里通过es 的id 来Get读取es的某条记录:
private static void getInfo(Client client) {
GetRequestBuilder getRequestBuilder = client.prepareGet("face_fixedperson", "Fixedperson", "126tc");
GetResponse getFields = getRequestBuilder.execute().actionGet();
String sourceAsString = getFields.getSourceAsString();
System.out.println("-----" + sourceAsString);
}
其涉及的流程如下:
- 1 ActionModule里面:
registerAction(GetAction.INSTANCE, TransportGetAction.class);
----------------------------------------------------------------------------------------
@Inject
public TransportGetAction(Settings settings, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, GetAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
GetRequest.class, ThreadPool.Names.GET);
this.indicesService = indicesService;
this.realtime = settings.getAsBoolean("action.get.realtime", true);
}
---------------------------------------------------------------------------------------------
protected TransportSingleShardAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Class<Request> request, String executor) {
super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
this.clusterService = clusterService;
this.transportService = transportService;
this.transportShardAction = actionName + "[s]";
this.executor = executor;
if (!isSubAction()) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler());
}
transportService.registerRequestHandler(transportShardAction, request, executor, new ShardTransportHandler());
}
- 2 注册Handler(registerRequestHandler)
public <Request extends TransportRequest> void registerRequestHandler(String action, Class<Request> request, String executor, boolean forceExecution, boolean canTripCircuitBreaker, TransportRequestHandler<Request> handler) {
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(action, request, taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
registerRequestHandler(reg);
}
protected <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
synchronized (requestHandlerMutex) {
RequestHandlerRegistry replaced = requestHandlers.get(reg.getAction());
requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();
if (replaced != null) {
logger.warn("registered two transport handlers for action {}, handlers: {}, {}", reg.getAction(), reg, replaced);
}
}
}
- 3 TransportClient端执行prepareGet之后执行execute
根据之前的文章, 我们知道这个请求最终会被代理类真正的调用:
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action, final Request request, ActionListener<Response> listener) {
// 比如registerAction(DeleteAction.INSTANCE, TransportDeleteAction.class);即Delete 对应的是 TransportDeleteAction类
final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action);
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
@Override
public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) {
proxy.execute(node, request, listener);
}
}, listener);
}
---------------------------------------------------------------------------------------------
public void execute(final DiscoveryNode node, final Request request, final ActionListener<Response> listener) {
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {
listener.onFailure(validationException);
return;
}
transportService.sendRequest(node, action.name(), request, transportOptions, new ActionListenerResponseHandler<Response>(listener) {
@Override
public Response newInstance() {
return action.newResponse();
}
});
}
上面调用了transportService.sendRequest实际上是RPC,会先判断node是本地节点, 还是远程Server节点, 如果是本地节点则sendLocalRequest, 远程节点则transport.sendRequest(node, requestId, action, request, options);
// 查询的时候先检查目标是否本地node
if (node.equals(localNode)) {
sendLocalRequest(requestId, action, request);
} else {
transport.sendRequest(node, requestId, action, request, options);
}
这里不再说明sendLocalRequest的情况, 这种情况比较简单, 可自行阅读. 这里说明远程Node节点的情况
- 4 前面文章分析了ES中的NettyTransport的情景(https://www.jianshu.com/p/ad89a0ad0424), 这里就会使用到, 在调用transport.sendRequest之后, Server端Node的Netty就会接受这个请求,Server端的MessageChannelHandler的messageReceived方法就会被触发(因为收到了客户端的数据发送),
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
// .........省略
// 如果是请求, 则处理Request请求, 如果是服务端返回的则走else分支, 执行response的handler
if (TransportStatus.isRequest(status)) { // 是Request请求
handleRequest(ctx.getChannel(), marker, streamIn, requestId, size, version);
} else { // 是Response返回
TransportResponseHandler<?> handler = transportServiceAdapter.onResponseReceived(requestId);
// ignore if its null, the adapter logs it
if (handler != null) {
if (TransportStatus.isError(status)) {
handlerResponseError(streamIn, handler);
} else {
handleResponse(ctx.getChannel(), streamIn, handler);
}
marker.validateResponse(streamIn, requestId, handler, TransportStatus.isError(status));
}
}
// .....省略
}
因为此处是请求, 所以执行handleRequest方法,
protected String handleRequest(Channel channel, Marker marker, StreamInput buffer, long requestId, int messageLengthBytes,
Version version) throws IOException {
buffer = new NamedWriteableAwareStreamInput(buffer, transport.namedWriteableRegistry);
final String action = buffer.readString();
transportServiceAdapter.onRequestReceived(requestId, action);
NettyTransportChannel transportChannel = null;
try {
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
if (reg == null) {
throw new ActionNotFoundTransportException(action);
}
if (reg.canTripCircuitBreaker()) {
transport.inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
} else {
transport.inFlightRequestsBreaker().addWithoutBreaking(messageLengthBytes);
}
transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
requestId, version, profileName, messageLengthBytes);
final TransportRequest request = reg.newRequest();
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
request.readFrom(buffer);
// in case we throw an exception, i.e. when the limit is hit, we don't want to verify
validateRequest(marker, buffer, requestId, request, action);
if (ThreadPool.Names.SAME.equals(reg.getExecutor())) {
//noinspection unchecked
reg.processMessageReceived(request, transportChannel); //具体处理请求. 我们知道请求有多种种类, 不同的Request会使用不同的Handler进行处理
} else {
threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));
}
} catch (Throwable e) {
// the circuit breaker tripped
if (transportChannel == null) {
transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
requestId, version, profileName, 0);
}
try {
transportChannel.sendResponse(e);
} catch (IOException e1) {
logger.warn("Failed to send error message back to client for action [" + action + "]", e);
logger.warn("Actual Exception", e1);
}
}
return action;
}
---------------------------------------------------------------------------------------------
public void processMessageReceived(Request request, TransportChannel channel) throws Exception {
final Task task = taskManager.register(channel.getChannelType(), action, request);
if (task == null) {
handler.messageReceived(request, channel);
} else {
boolean success = false;
try {
handler.messageReceived(request, new TransportChannelWrapper(taskManager, task, channel), task);
success = true;
} finally {
if (success == false) {
taskManager.unregister(task);
}
}
}
}
上面一大串代码, 关键的代码是如下2个:
(1)final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);根据Action获取具体的Handler, 还记得2 注册Handler(registerRequestHandler)里面Get操作我们注册的transportService.registerRequestHandler(transportShardAction, request, executor, new ShardTransportHandler());吗? 此处我们的具体Handler就是ShardTransportHandler.
(2)reg.processMessageReceived(request, transportChannel); 就是调用具体的Handler的messageReceived方法.
- 5 具体Handler处理
第4步分析到实际上执行的就是ShardTransportHandler的messageReceived方法,
// 读取数据组织成 Response, 给客户端 channel 返回。
private class ShardTransportHandler extends TransportRequestHandler<Request> {
@Override
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
}
// shardOperation主要处理请求中是否有 refresh 选项,然后调用indexShard.getService().get() 读取数据
Response response = shardOperation(request, request.internalShardId);
channel.sendResponse(response);
}
}
---------------------------------------------------------------------------------------------
shardOperation 实际上是TransportGetAction的shardOperation方法.
@Override
protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardId.id());// shard 的id 号, 哪个shard
if (request.refresh() && !request.realtime()) {
indexShard.refresh("refresh_flag_get");
}
GetResult result = indexShard.getService().get(request.type(), request.id(), request.fields(),
request.realtime(), request.version(), request.versionType(), request.fetchSourceContext(), request.ignoreErrorsOnGeneratedFields());
return new GetResponse(result);
}
- 6 经过第5步之后, Get请求就会通过操作转向成具体的Lucene的Get操作. 此处细节不在说明.
以上6步就是一个操作的大致流程. 此处只分析了Get操作, 但是ES其他操作的流程和这个流程基本是一样的.
网友评论