美文网首页
ElasticSearch Rest和(RPC)NodeClie

ElasticSearch Rest和(RPC)NodeClie

作者: persisting_ | 来源:发表于2018-11-01 01:25 被阅读0次

    1 概述

    ElasticSearch支持Restful和NodeClient两种通信方式。如下,TransportClient已经被Rest客户端代替,在8.0中会被移除。

    @deprecated {@link TransportClient} is deprecated in favour of the High Level REST client and will be removed in Elasticsearch 8.0.

    ES在启动类org.elasticsearch.bootstrap.Bootstrap中会实例化表示一个节点的org.elasticsearch.node.Node类,Node类的构造函数中会进行大量的模块、Service、Plugin等的初始化。

    还有一点需要明确的是ES中的Rest和NodeClient都是基于Netty实现的。

    2 Restful

    2.1 Rest服务启动流程

    ElasticSearch发送和接受报文的操作由org.elasticsearch.transport.TransportService类完成,通过跟踪其构造函数的调用栈可以发现在Node的构造函数中实例化了该类:


    transportservice initial.png

    在TransportService的构造函数中有一个参数Transport,该类是底层通信的关键。
    该类在Node中实例化并作为参数传入TransportService的构造函数,Node中实例化代码如下:

    final Transport transport = networkModule.getTransportSupplier().get();
    

    继续跟踪到NetworkModule的代码中可以发现getTransportSupplier()返回的是Netty4Plugin,Netty4Pluign.get则返回如下:

    () -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher)
    

    Node、TransportService以及Transport都实现了AbstractLifecycleComponent类,具有start、stop等生命周期方法,BootStrap在Node初始化完成后,会调用其start方法,Node也在其start方法中调用了Module、Service等的start方法。TransportService.doStart方法如下:

    @Override
        protected void doStart() {
            transport.addMessageListener(this);
            connectionManager.addListener(this);
            transport.start();//调用transport的start方法
            if (transport.boundAddress() != null && logger.isInfoEnabled()) {
                logger.info("{}", transport.boundAddress());
                for (Map.Entry<String, BoundTransportAddress> entry : transport.profileBoundAddresses().entrySet()) {
                    logger.info("profile [{}]: {}", entry.getKey(), entry.getValue());
                }
            }
            localNode = localNodeFactory.apply(transport.boundAddress());
    
            if (connectToRemoteCluster) {
                // here we start to connect to the remote clusters
                remoteClusterService.initializeRemoteClusters();
            }
        }
    

    我们直接看Netty4HttpServerTransport的doStart方法:

    @Override
        protected void doStart() {
            boolean success = false;
            try {
                serverBootstrap = new ServerBootstrap();
    
                serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
                    HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
                serverBootstrap.channel(NioServerSocketChannel.class);
                //下面的一行配置了有客户端连接时的ChannelInitializer,ChannelInitializer主要用来配置处理客户端报文的Pipeline,下一小节会详述。
                serverBootstrap.childHandler(configureServerChannelHandler());
                serverBootstrap.handler(new ServerChannelExceptionHandler(this));
    
                serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings));
                serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));
    
                final ByteSizeValue tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);
                if (tcpSendBufferSize.getBytes() > 0) {
                    serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));
                }
    
                final ByteSizeValue tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);
                if (tcpReceiveBufferSize.getBytes() > 0) {
                    serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));
                }
    
                serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
                serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
    
                final boolean reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
                serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
                serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);
    
                bindServer();
                success = true;
            } finally {
                if (success == false) {
                    doStop(); // otherwise we leak threads since we never moved to started
                }
            }
        }
    

    如上,其实就是Netty服务端的启动方法,配置Handler,端口号绑定等。

    2.2 Rest请求处理

    上面的代码注释中,特别提到了configureServerChannelHandler()函数,该函数代码如下:

    public ChannelHandler configureServerChannelHandler() {
          //将自己(Transport)作为参数传入构造函数
            return new HttpChannelHandler(this, handlingSettings);
        }
    
    protected static class HttpChannelHandler extends ChannelInitializer<Channel> {
    
            private final Netty4HttpServerTransport transport;
            private final Netty4HttpRequestHandler requestHandler;
            private final HttpHandlingSettings handlingSettings;
    
            protected HttpChannelHandler(final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings) {
                this.transport = transport;
                this.handlingSettings = handlingSettings;
                this.requestHandler = new Netty4HttpRequestHandler(transport);
            }
    
            @Override
            protected void initChannel(Channel ch) throws Exception {
                Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
                ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
                ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
                final HttpRequestDecoder decoder = new HttpRequestDecoder(
                    handlingSettings.getMaxInitialLineLength(),
                    handlingSettings.getMaxHeaderSize(),
                    handlingSettings.getMaxChunkSize());
                decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
                //编码器、压缩器、解码器的设置
                ch.pipeline().addLast("decoder", decoder);
                ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
                ch.pipeline().addLast("encoder", new HttpResponseEncoder());
                final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
                aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
                ch.pipeline().addLast("aggregator", aggregator);
                if (handlingSettings.isCompression()) {
                    ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
                }
                if (handlingSettings.isCorsEnabled()) {
                    ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.corsConfig));
                }
                ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(transport.logger, transport.pipeliningMaxEvents));
                //这里是注册对请求Request进行处理的handler,requestHandler在其构造函数中被初始化为Netty4HttpRequestHandler(transport)
                ch.pipeline().addLast("handler", requestHandler);
                transport.serverAcceptedChannel(nettyHttpChannel);
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                ExceptionsHelper.maybeDieOnAnotherThread(cause);
                super.exceptionCaught(ctx, cause);
            }
        }
    

    Rest服务端(也就是处理客户端请求的节点),收到报文处理的逻辑主要在上面的Netty4HttpRequestHandler(transport)中,该类实现了netty的inboundhandler

    class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<FullHttpRequest>> 
    

    实现的函数channelRead0用来处理请求报文,其中关键代码为:

    serverTransport.incomingRequest(httpRequest, channel);
    

    serverTransport就是我们通过构造传入的Netty4HttpServerTransport实例,其继承关系如下图:


    Netty4HttpServerTransport class graph.png

    Netty4HttpRequestHandler.channelRead0->
    AbstractHttpServerTransport.incomingRequest->
    handleIncomingRequest->dispatchRequest
    最后dispatchRequest的实现如下:

     // Visible for testing
        void dispatchRequest(final RestRequest restRequest, final RestChannel channel, final Throwable badRequestCause) {
            final ThreadContext threadContext = threadPool.getThreadContext();
            try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
                if (badRequestCause != null) {
                    dispatcher.dispatchBadRequest(restRequest, channel, threadContext, badRequestCause);
                } else {
                    dispatcher.dispatchRequest(restRequest, channel, threadContext);
                }
            }
        }
    

    通过查看方法的调用轨迹可以知道dispatcher为RestController实例,该类类似于SpringMVC的Controller,负责(method,path)->RestHandler的映射,最后会根据method(指Http方法,GET、POST、DELETE、PUT、POST等)以及路径来找到注册的RestHandler,并使用此RestHandler来处理该请求。
    首先看RestController.dispatchRequest方法定义:

    @Override
        public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
            ...
                tryAllHandlers(request, channel, threadContext);
            ...
        }
    
    void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception {
            for (String key : headersToCopy) {
                String httpHeader = request.header(key);
                if (httpHeader != null) {
                    threadContext.putHeader(key, httpHeader);
                }
            }
            // Request execution flag
            boolean requestHandled = false;
    
            if (checkErrorTraceParameter(request, channel) == false) {
                channel.sendResponse(
                        BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, "error traces in responses are disabled."));
                return;
            }
    
            // Loop through all possible handlers, attempting to dispatch the request
            //依次迭代所有匹配上方法、路径的Handler
            Iterator<MethodHandlers> allHandlers = getAllHandlers(request);
            for (Iterator<MethodHandlers> it = allHandlers; it.hasNext(); ) {
                final Optional<RestHandler> mHandler = Optional.ofNullable(it.next()).flatMap(mh -> mh.getHandler(request.method()));
                requestHandled = dispatchRequest(request, channel, client, mHandler);
                if (requestHandled) {
                    break;
                }
            }
    
            // If request has not been handled, fallback to a bad request error.
            if (requestHandled == false) {
                handleBadRequest(request, channel);
            }
        }
    
    boolean dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client,
                                final Optional<RestHandler> mHandler) throws Exception {
    ...
    //对于找到的Handler,调用其handleRequest方法
                    wrappedHandler.handleRequest(request, responseChannel, client);
                    ...
        }
    

    下面说一下RestController中handler的注册过程,其实RestController实例化调用如下:
    Node构造函数中实例化ActionModule
    ->ActionModule构造函数中会new RestController
    最后Node会调用ActionModule的actionModule.initRestHandlers方法在RestController中注册handler,所有的handler子类都会在自己的构造函数中向controller中注册自己,比如:

    public RestMainAction(Settings settings, RestController controller) {
            super(settings);
            controller.registerHandler(GET, "/", this);
            controller.registerHandler(HEAD, "/", this);
        }
    

    controller其实是使用Trie树(字典树)维护handler的。
    上面接受到请求然后找到合适的handler,最后会调用handler的handleRequest方法,该方法在其父类BaseRestHandler中定义:

    @Override
        public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
            // prepare the request for execution; has the side effect of touching the request parameters
          //prepareRequest在具体的子类中实现,具体的为根据不同的子类,返回不同的Action,如下面RestSearchAction的实现
            final RestChannelConsumer action = prepareRequest(request, client);
    
            // validate unconsumed params, but we must exclude params used to format the response
            // use a sorted set so the unconsumed parameters appear in a reliable sorted order
            final SortedSet<String> unconsumedParams =
                request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));
    
            // validate the non-response params
            if (!unconsumedParams.isEmpty()) {
                final Set<String> candidateParams = new HashSet<>();
                candidateParams.addAll(request.consumedParams());
                candidateParams.addAll(responseParams());
                throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
            }
    
            usageCount.increment();
            // execute the action
            action.accept(channel);
        }
    //比如RestSearchAction的实现如下:
    @Override
        public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
            SearchRequest searchRequest = new SearchRequest();
            /*
             * We have to pull out the call to `source().size(size)` because
             * _update_by_query and _delete_by_query uses this same parsing
             * path but sets a different variable when it sees the `size`
             * url parameter.
             *
             * Note that we can't use `searchRequest.source()::size` because
             * `searchRequest.source()` is null right now. We don't have to
             * guard against it being null in the IntConsumer because it can't
             * be null later. If that is confusing to you then you are in good
             * company.
             */
            IntConsumer setSize = size -> searchRequest.source().size(size);
            request.withContentOrSourceParamParserOrNull(parser ->
                parseSearchRequest(searchRequest, request, parser, setSize));
    
            return channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel));
        }
    

    handleRequest最后会调用action.accept,最终会调用client.search(),通过跟踪调用轨迹可以,cient为RestController中的成员变量client,是通过Node实例化ActionModule传入的NodeClient对象。

    可以看下NodeClient的search方法,在其父类AbstractClient定义如下:

    @Override
        public void search(final SearchRequest request, final ActionListener<SearchResponse> listener) {
            execute(SearchAction.INSTANCE, request, listener);
        }
    

    好了,其实通过跟踪可以知道,最终会调用NodeClient的方法如下:

    public <    Request extends ActionRequest,
                    Response extends ActionResponse
                > Task executeLocally(Action<Response> action, Request request, ActionListener<Response> listener) {
            return transportAction(action).execute(request, listener);
        }
    

    上面代码中的SearchAction.INSTANCE其实是一个key,NodeClient同样会在ActionModule中注册key对应的TransportAction,具体可以看ActionModule.setupActions方法,这里不再赘述。
    TransportAction各子类则负责具体的get, index, search, delete, reroute等的处理。

    3 RPC NodeClient

    其实分析到这里已经结束了,NodeClient的search, get, delete等函数都是通过key找到TransportAction的子类进行处理。

    4 注意

    注意ElasticSearch Rest/RPC 接口解析中提到的Transport*Action两层映射关系

    相关文章

      网友评论

          本文标题:ElasticSearch Rest和(RPC)NodeClie

          本文链接:https://www.haomeiwen.com/subject/lagztqtx.html