美文网首页玩转大数据大数据Spark_Flink_Hadoop
Elasticsearch NettyTransport通信机制

Elasticsearch NettyTransport通信机制

作者: kason_zhang | 来源:发表于2018-06-01 15:40 被阅读110次

ES 当发起各种Action类的Request时, 在集群环境中通常需要进行请求的转发, 此时就会出现RPC通信, 针对此RPC通信ES 是通过NettyTransport类来完成转发与接收功能的. 实际上就是基于Netty的一个通信代码模块.
在Node启动以及TransportClient初始化的过程中都会创建NettyTransport

Node节点启动时

当节点Node启动时会根据依赖注入获取TransportService实例, 并调用其start方法,最终会转向NettyTransport的doStart 方法, 进而创建Netty RPC的客户端以及服务端对象, 通过MessageHandler来处理请求或者处理Response.

public Node start() {
       
        //... 省略
        injector.getInstance(MappingUpdatedAction.class).setClient(client);
        injector.getInstance(IndicesService.class).start();
        injector.getInstance(IndexingMemoryController.class).start();
        injector.getInstance(IndicesClusterStateService.class).start();
        injector.getInstance(IndicesTTLService.class).start();
        injector.getInstance(SnapshotsService.class).start();
        injector.getInstance(SnapshotShardsService.class).start();
        injector.getInstance(RoutingService.class).start();
        injector.getInstance(SearchService.class).start();
        injector.getInstance(MonitorService.class).start();
        injector.getInstance(RestController.class).start();

        // TODO hack around circular dependencies problems
        injector.getInstance(GatewayAllocator.class).setReallocation(injector.getInstance(ClusterService.class), injector.getInstance(RoutingService.class));

        injector.getInstance(ResourceWatcherService.class).start();
        injector.getInstance(GatewayService.class).start();

        // Start the transport service now so the publish address will be added to the local disco node in ClusterService
        TransportService transportService = injector.getInstance(TransportService.class);
        transportService.start();
        injector.getInstance(ClusterService.class).start();

       //... 省略

        return this;
    }

转而AbstractLifecycleComponent的start方法

public T start() {
        if (!lifecycle.canMoveToStarted()) {
            return (T) this;
        }
        for (LifecycleListener listener : listeners) {
            listener.beforeStart();
        }
        doStart();
        lifecycle.moveToStarted();
        for (LifecycleListener listener : listeners) {
            listener.afterStart();
        }
        return (T) this;
    }

doStart方法此处只分析NettyTransport的doStart方法(不分析LocalTransport的)
那段代码中有如下两个比较关键:

clientBootstrap = createClientBootstrap(); // 启动客户端
createServerBootstrap(name, mergedSettings); // 启动Server端

看看客户端的构造:

private ClientBootstrap createClientBootstrap() {

        if (blockingClient) {
            clientBootstrap = new ClientBootstrap(new OioClientSocketChannelFactory(Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX))));
        } else {
            int bossCount = settings.getAsInt("transport.netty.boss_count", 1);
            clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
                    Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)),
                    bossCount,
                    new NioWorkerPool(Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX)), workerCount),
                    new HashedWheelTimer(daemonThreadFactory(settings, "transport_client_timer"))));
        }
        clientBootstrap.setPipelineFactory(configureClientChannelPipelineFactory());
       //... 省略

        return clientBootstrap;
    }

上面代码一个注意的地方是clientBootstrap.setPipelineFactory(configureClientChannelPipelineFactory());

public ChannelPipelineFactory configureClientChannelPipelineFactory() {
        return new ClientChannelPipelineFactory(this);
    }
protected static class ClientChannelPipelineFactory implements ChannelPipelineFactory {
        protected final NettyTransport nettyTransport;

        public ClientChannelPipelineFactory(NettyTransport nettyTransport) {
            this.nettyTransport = nettyTransport;
        }

        @Override
        public ChannelPipeline getPipeline() throws Exception {
            ChannelPipeline channelPipeline = Channels.pipeline();
            SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder();
            if (nettyTransport.maxCumulationBufferCapacity != null) {
                if (nettyTransport.maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) {
                    sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
                } else {
                    sizeHeader.setMaxCumulationBufferCapacity((int) nettyTransport.maxCumulationBufferCapacity.bytes());
                }
            }
            if (nettyTransport.maxCompositeBufferComponents != -1) {
                sizeHeader.setMaxCumulationBufferComponents(nettyTransport.maxCompositeBufferComponents);
            }
            channelPipeline.addLast("size", sizeHeader);
            // using a dot as a prefix means, this cannot come from any settings parsed
            channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger, ".client"));
            return channelPipeline;
        }
    }

上面的代码中channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger, ".client"));指定了使用MessageChannelHandler来进行接收处理请求或者Response

createServerBootstrap(name, mergedSettings); // 启动Server端 基本和createClientBootstrap()流程相似, 最终都是通过MessageChannelHandler来进行接收处理请求或者Response.

创建TransportClient时

当调用build方法时会根据依赖注入获取TransportService实例, 并调用其start方法,最终会转向NettyTransport的doStart 方法, 进而创建Netty RPC的客户端以及服务端对象, 通过MessageHandler来处理请求或者处理Response.
它的具体流程和上面基本一致.
整体的结构图如图所示


image.png

通过这样的方式, 当Client发起一个请求, 比如先发给协调器, 协调器可能需要通过NettyTranport(也可能是LocalTransport, 看具体情况)转发到其他的Node, 最终会通过MessageChannelHandler进行处理Request以及Response

相关文章

网友评论

    本文标题:Elasticsearch NettyTransport通信机制

    本文链接:https://www.haomeiwen.com/subject/tfhksftx.html