美文网首页Spark_Flink_Hadoop大数据elasticsearch
Elasticsearch的增删改查源码流程

Elasticsearch的增删改查源码流程

作者: kason_zhang | 来源:发表于2018-06-12 14:51 被阅读10次

    1 ActionModule

    在了解ES源代码增删改查之前, 首先要了解一下TransportClient build时的一个关键Action的注入------ActionModule
    在ActionModule的configure函数中, 会注册各种各样的Action 并绑定到具体的Handler(实际上就是TransportRequestHandler), 这个Handler很关键

    registerAction(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class);
            registerAction(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
            registerAction(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
            registerAction(ListTasksAction.INSTANCE, TransportListTasksAction.class);
            registerAction(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class);
    
            registerAction(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class);
            registerAction(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
            registerAction(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class);
            registerAction(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
            registerAction(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class);
            registerAction(ClusterSearchShardsAction.INSTANCE, TransportClusterSearchShardsAction.class);
            registerAction(PendingClusterTasksAction.INSTANCE, TransportPendingClusterTasksAction.class);
            registerAction(PutRepositoryAction.INSTANCE, TransportPutRepositoryAction.class);
            registerAction(GetRepositoriesAction.INSTANCE, TransportGetRepositoriesAction.class);
            registerAction(DeleteRepositoryAction.INSTANCE, TransportDeleteRepositoryAction.class);
            registerAction(VerifyRepositoryAction.INSTANCE, TransportVerifyRepositoryAction.class);
            registerAction(GetSnapshotsAction.INSTANCE, TransportGetSnapshotsAction.class);
            registerAction(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class);
            registerAction(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
            registerAction(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
            registerAction(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);
    
            registerAction(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
            registerAction(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
            registerAction(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
            registerAction(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
            registerAction(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class);
            registerAction(GetIndexAction.INSTANCE, TransportGetIndexAction.class);
            registerAction(OpenIndexAction.INSTANCE, TransportOpenIndexAction.class);
            registerAction(CloseIndexAction.INSTANCE, TransportCloseIndexAction.class);
            registerAction(IndicesExistsAction.INSTANCE, TransportIndicesExistsAction.class);
            registerAction(TypesExistsAction.INSTANCE, TransportTypesExistsAction.class);
            registerAction(GetMappingsAction.INSTANCE, TransportGetMappingsAction.class);
            registerAction(GetFieldMappingsAction.INSTANCE, TransportGetFieldMappingsAction.class, TransportGetFieldMappingsIndexAction.class);
            registerAction(PutMappingAction.INSTANCE, TransportPutMappingAction.class);
            registerAction(IndicesAliasesAction.INSTANCE, TransportIndicesAliasesAction.class);
            registerAction(UpdateSettingsAction.INSTANCE, TransportUpdateSettingsAction.class);
            registerAction(AnalyzeAction.INSTANCE, TransportAnalyzeAction.class);
            registerAction(PutIndexTemplateAction.INSTANCE, TransportPutIndexTemplateAction.class);
            registerAction(GetIndexTemplatesAction.INSTANCE, TransportGetIndexTemplatesAction.class);
            registerAction(DeleteIndexTemplateAction.INSTANCE, TransportDeleteIndexTemplateAction.class);
            registerAction(ValidateQueryAction.INSTANCE, TransportValidateQueryAction.class);
            registerAction(RefreshAction.INSTANCE, TransportRefreshAction.class);
            registerAction(FlushAction.INSTANCE, TransportFlushAction.class);
            registerAction(SyncedFlushAction.INSTANCE, TransportSyncedFlushAction.class);
            registerAction(ForceMergeAction.INSTANCE, TransportForceMergeAction.class);
            registerAction(UpgradeAction.INSTANCE, TransportUpgradeAction.class);
            registerAction(UpgradeStatusAction.INSTANCE, TransportUpgradeStatusAction.class);
            registerAction(UpgradeSettingsAction.INSTANCE, TransportUpgradeSettingsAction.class);
            registerAction(ClearIndicesCacheAction.INSTANCE, TransportClearIndicesCacheAction.class);
            registerAction(PutWarmerAction.INSTANCE, TransportPutWarmerAction.class);
            registerAction(DeleteWarmerAction.INSTANCE, TransportDeleteWarmerAction.class);
            registerAction(GetWarmersAction.INSTANCE, TransportGetWarmersAction.class);
            registerAction(GetAliasesAction.INSTANCE, TransportGetAliasesAction.class);
            registerAction(AliasesExistAction.INSTANCE, TransportAliasesExistAction.class);
            registerAction(GetSettingsAction.INSTANCE, TransportGetSettingsAction.class);
    
            registerAction(IndexAction.INSTANCE, TransportIndexAction.class);
            registerAction(GetAction.INSTANCE, TransportGetAction.class);
            registerAction(TermVectorsAction.INSTANCE, TransportTermVectorsAction.class,
                    TransportDfsOnlyAction.class);
            registerAction(MultiTermVectorsAction.INSTANCE, TransportMultiTermVectorsAction.class,
                    TransportShardMultiTermsVectorAction.class);
            registerAction(DeleteAction.INSTANCE, TransportDeleteAction.class);
            registerAction(ExistsAction.INSTANCE, TransportExistsAction.class);
            registerAction(SuggestAction.INSTANCE, TransportSuggestAction.class);
            registerAction(UpdateAction.INSTANCE, TransportUpdateAction.class);
            registerAction(MultiGetAction.INSTANCE, TransportMultiGetAction.class,
                    TransportShardMultiGetAction.class);
            registerAction(BulkAction.INSTANCE, TransportBulkAction.class,
                    TransportShardBulkAction.class);
            registerAction(SearchAction.INSTANCE, TransportSearchAction.class);
            registerAction(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
            registerAction(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
            registerAction(PercolateAction.INSTANCE, TransportPercolateAction.class);
            registerAction(MultiPercolateAction.INSTANCE, TransportMultiPercolateAction.class, TransportShardMultiPercolateAction.class);
            registerAction(ExplainAction.INSTANCE, TransportExplainAction.class);
            registerAction(ClearScrollAction.INSTANCE, TransportClearScrollAction.class);
            registerAction(RecoveryAction.INSTANCE, TransportRecoveryAction.class);
            registerAction(RenderSearchTemplateAction.INSTANCE, TransportRenderSearchTemplateAction.class);
    
            //Indexed scripts
            registerAction(PutIndexedScriptAction.INSTANCE, TransportPutIndexedScriptAction.class);
            registerAction(GetIndexedScriptAction.INSTANCE, TransportGetIndexedScriptAction.class);
            registerAction(DeleteIndexedScriptAction.INSTANCE, TransportDeleteIndexedScriptAction.class);
    
            registerAction(FieldStatsAction.INSTANCE, TransportFieldStatsTransportAction.class);
    

    调用的注册Handler的代码如下:

    protected <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
            synchronized (requestHandlerMutex) {
                RequestHandlerRegistry replaced = requestHandlers.get(reg.getAction());
                requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();
                if (replaced != null) {
                    logger.warn("registered two transport handlers for action {}, handlers: {}, {}", reg.getAction(), reg, replaced);
                }
            }
        }
    

    TransportService里有一个内部类, Adapter:
    protected class Adapter implements TransportServiceAdapter


    image.png

    这个Adapter里有个方法是根据Action获取RequestHandlerRegistry 对象(关键, 后面会用到)

     @Override
            public RequestHandlerRegistry getRequestHandler(String action) {
                return requestHandlers.get(action);
            }
    

    2 以Get来解析

    这里以Get操作来解析, 从ActionModule依赖注入到使用Get操作的Handler流程, 中间涉及了Netty的RPC.
    这里通过es 的id 来Get读取es的某条记录:

    private static void getInfo(Client client) {
            GetRequestBuilder getRequestBuilder = client.prepareGet("face_fixedperson", "Fixedperson", "126tc");
            GetResponse getFields = getRequestBuilder.execute().actionGet();
            String sourceAsString = getFields.getSourceAsString();
            System.out.println("-----" + sourceAsString);
        }
    

    其涉及的流程如下:

    • 1 ActionModule里面:
    registerAction(GetAction.INSTANCE, TransportGetAction.class);
    
    ----------------------------------------------------------------------------------------
    @Inject
        public TransportGetAction(Settings settings, ClusterService clusterService, TransportService transportService,
                                  IndicesService indicesService, ThreadPool threadPool, ActionFilters actionFilters,
                                  IndexNameExpressionResolver indexNameExpressionResolver) {
            super(settings, GetAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
                    GetRequest.class, ThreadPool.Names.GET);
            this.indicesService = indicesService;
    
            this.realtime = settings.getAsBoolean("action.get.realtime", true);
        }
    ---------------------------------------------------------------------------------------------
    protected TransportSingleShardAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService,
                                             TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
                                             Class<Request> request, String executor) {
            super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
            this.clusterService = clusterService;
            this.transportService = transportService;
    
            this.transportShardAction = actionName + "[s]";
            this.executor = executor;
    
            if (!isSubAction()) {
                transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler());
            }
            transportService.registerRequestHandler(transportShardAction, request, executor, new ShardTransportHandler());
        }
    
    • 2 注册Handler(registerRequestHandler)
    public <Request extends TransportRequest> void registerRequestHandler(String action, Class<Request> request, String executor, boolean forceExecution, boolean canTripCircuitBreaker, TransportRequestHandler<Request> handler) {
            RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(action, request, taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
            registerRequestHandler(reg);
        }
    
        protected <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
            synchronized (requestHandlerMutex) {
                RequestHandlerRegistry replaced = requestHandlers.get(reg.getAction());
                requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();
                if (replaced != null) {
                    logger.warn("registered two transport handlers for action {}, handlers: {}, {}", reg.getAction(), reg, replaced);
                }
            }
        }
    
    • 3 TransportClient端执行prepareGet之后执行execute
      根据之前的文章, 我们知道这个请求最终会被代理类真正的调用:
    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action, final Request request, ActionListener<Response> listener) {
            // 比如registerAction(DeleteAction.INSTANCE, TransportDeleteAction.class);即Delete 对应的是 TransportDeleteAction类
            final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action);
            nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
                @Override
                public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) {
                    proxy.execute(node, request, listener);
                }
            }, listener);
        }
    ---------------------------------------------------------------------------------------------
    public void execute(final DiscoveryNode node, final Request request, final ActionListener<Response> listener) {
            ActionRequestValidationException validationException = request.validate();
            if (validationException != null) {
                listener.onFailure(validationException);
                return;
            }
            transportService.sendRequest(node, action.name(), request, transportOptions, new ActionListenerResponseHandler<Response>(listener) {
                @Override
                public Response newInstance() {
                    return action.newResponse();
                }
            });
        }
    

    上面调用了transportService.sendRequest实际上是RPC,会先判断node是本地节点, 还是远程Server节点, 如果是本地节点则sendLocalRequest, 远程节点则transport.sendRequest(node, requestId, action, request, options);

    // 查询的时候先检查目标是否本地node
    if (node.equals(localNode)) {
        sendLocalRequest(requestId, action, request);
    } else {
        transport.sendRequest(node, requestId, action, request, options);
    }
    

    这里不再说明sendLocalRequest的情况, 这种情况比较简单, 可自行阅读. 这里说明远程Node节点的情况

    • 4 前面文章分析了ES中的NettyTransport的情景(https://www.jianshu.com/p/ad89a0ad0424), 这里就会使用到, 在调用transport.sendRequest之后, Server端Node的Netty就会接受这个请求,Server端的MessageChannelHandler的messageReceived方法就会被触发(因为收到了客户端的数据发送),
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
                // .........省略
                // 如果是请求, 则处理Request请求, 如果是服务端返回的则走else分支, 执行response的handler
                if (TransportStatus.isRequest(status)) { // 是Request请求
                    handleRequest(ctx.getChannel(), marker, streamIn, requestId, size, version);
                } else { // 是Response返回
                    TransportResponseHandler<?> handler = transportServiceAdapter.onResponseReceived(requestId);
                    // ignore if its null, the adapter logs it
                    if (handler != null) {
                        if (TransportStatus.isError(status)) {
                            handlerResponseError(streamIn, handler);
                        } else {
                            handleResponse(ctx.getChannel(), streamIn, handler);
                        }
                        marker.validateResponse(streamIn, requestId, handler, TransportStatus.isError(status));
                    }
                }
               // .....省略
    }
    

    因为此处是请求, 所以执行handleRequest方法,

    protected String handleRequest(Channel channel, Marker marker, StreamInput buffer, long requestId, int messageLengthBytes,
                                       Version version) throws IOException {
            buffer = new NamedWriteableAwareStreamInput(buffer, transport.namedWriteableRegistry);
            final String action = buffer.readString();
            transportServiceAdapter.onRequestReceived(requestId, action);
            NettyTransportChannel transportChannel = null;
            try {
                final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
                if (reg == null) {
                    throw new ActionNotFoundTransportException(action);
                }
                if (reg.canTripCircuitBreaker()) {
                    transport.inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
                } else {
                    transport.inFlightRequestsBreaker().addWithoutBreaking(messageLengthBytes);
                }
                transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
                    requestId, version, profileName, messageLengthBytes);
                final TransportRequest request = reg.newRequest();
                request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
                request.readFrom(buffer);
                // in case we throw an exception, i.e. when the limit is hit, we don't want to verify
                validateRequest(marker, buffer, requestId, request, action);
                if (ThreadPool.Names.SAME.equals(reg.getExecutor())) {
                    //noinspection unchecked
                    reg.processMessageReceived(request, transportChannel); //具体处理请求. 我们知道请求有多种种类, 不同的Request会使用不同的Handler进行处理
                } else {
                    threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));
                }
            } catch (Throwable e) {
                // the circuit breaker tripped
                if (transportChannel == null) {
                    transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
                        requestId, version, profileName, 0);
                }
                try {
                    transportChannel.sendResponse(e);
                } catch (IOException e1) {
                    logger.warn("Failed to send error message back to client for action [" + action + "]", e);
                    logger.warn("Actual Exception", e1);
                }
            }
            return action;
        }
    
    ---------------------------------------------------------------------------------------------
    public void processMessageReceived(Request request, TransportChannel channel) throws Exception {
            final Task task = taskManager.register(channel.getChannelType(), action, request);
            if (task == null) {
                handler.messageReceived(request, channel);
            } else {
                boolean success = false;
                try {
                    handler.messageReceived(request, new TransportChannelWrapper(taskManager, task, channel), task);
                    success = true;
                } finally {
                    if (success == false) {
                        taskManager.unregister(task);
                    }
                }
            }
        }
    

    上面一大串代码, 关键的代码是如下2个:
    (1)final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);根据Action获取具体的Handler, 还记得2 注册Handler(registerRequestHandler)里面Get操作我们注册的transportService.registerRequestHandler(transportShardAction, request, executor, new ShardTransportHandler());吗? 此处我们的具体Handler就是ShardTransportHandler.
    (2)reg.processMessageReceived(request, transportChannel); 就是调用具体的Handler的messageReceived方法.

    • 5 具体Handler处理

    第4步分析到实际上执行的就是ShardTransportHandler的messageReceived方法,

    // 读取数据组织成 Response, 给客户端 channel 返回。
        private class ShardTransportHandler extends TransportRequestHandler<Request> {
    
            @Override
            public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
                if (logger.isTraceEnabled()) {
                    logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
                }
                // shardOperation主要处理请求中是否有 refresh 选项,然后调用indexShard.getService().get() 读取数据
                Response response = shardOperation(request, request.internalShardId);
                channel.sendResponse(response);
            }
        }
    
    ---------------------------------------------------------------------------------------------
    shardOperation 实际上是TransportGetAction的shardOperation方法.
    @Override
        protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
            IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
            IndexShard indexShard = indexService.shardSafe(shardId.id());// shard 的id 号, 哪个shard
    
            if (request.refresh() && !request.realtime()) {
                indexShard.refresh("refresh_flag_get");
            }
    
            GetResult result = indexShard.getService().get(request.type(), request.id(), request.fields(),
                    request.realtime(), request.version(), request.versionType(), request.fetchSourceContext(), request.ignoreErrorsOnGeneratedFields());
            return new GetResponse(result);
        }
    
    • 6 经过第5步之后, Get请求就会通过操作转向成具体的Lucene的Get操作. 此处细节不在说明.

    以上6步就是一个操作的大致流程. 此处只分析了Get操作, 但是ES其他操作的流程和这个流程基本是一样的.

    相关文章

      网友评论

        本文标题:Elasticsearch的增删改查源码流程

        本文链接:https://www.haomeiwen.com/subject/ubstsftx.html