美文网首页
ElasticSearch 基于Netty的通信原理

ElasticSearch 基于Netty的通信原理

作者: persisting_ | 来源:发表于2018-12-09 13:32 被阅读0次

    ElasticSearch由Transport负责通信,基于TCP通信采用Netty实现,采用Plugin构建,具体可参考Netty4Plugin类。

    1 Netty BootstrapServerBootstrap的创建

    TCP通信实现类为TcpTransport,采用Netty的具体实现类则为Netty4Transport。我们知道基于Netty的通信编码中需要为客户端创建Bootstrap,为服务端创建ServerBootstrap,TcpTransport实现了AbstractLifecycleComponentBootstrapServerBootstrap的创建是在AbstractLifecycleComponent.doStart中创建的。

    因为ElasticSearch每个节点都既是服务端(处理请求),也是客户端(发起请求),所有每个节点都会既创建服务端,也创建客户端。

    //Netty4Transport
    @Override
    protected void doStart() {
        boolean success = false;
        try {
            //创建客户端Bootstrap
            clientBootstrap = createClientBootstrap();
            if (NetworkService.NETWORK_SERVER.get(settings)) {
                for (ProfileSettings profileSettings : profileSettings) {
                    //根据配置创建服务端ServerBootstrap
                    createServerBootstrap(profileSettings);
                    //创建完一个服务端ServerBootstrap进行端口绑定
                    bindServer(profileSettings);
                }
            }
            super.doStart();
            success = true;
        } finally {
            if (success == false) {
                doStop();
            }
        }
    }
    

    1.1 客户端Bootstrap创建

    1.1.1 Bootstrap模板的创建

    本小杰标题说是Bootstrap模板的创建,这是因为createClientBootstrap创建的Bootstrap并没有真正作为客户端进行通信,后续在具体发起请求时,会调用这里创建的Bootstrap的clone函数克隆新的Bootstrap使用,克隆之后注册handler等。具体后面会分析。

    //Netty4Transport
    private Bootstrap createClientBootstrap() {
        final Bootstrap bootstrap = new Bootstrap();
        //客户端的创建没有涉及具体的handler注册,因为这里返回的客户端Bootstrap
        //仅仅起到一个模板的作用,后续具体需要发起通信(发送请求)时,会
        //根据此Bootstrap克隆出一个具体的Bootstrap,然后注册handler。
        //下面主要根据配置设置bootstrap的一些属性。
        bootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)));
        //设置Netty客户端NioSocketChannel
        bootstrap.channel(NioSocketChannel.class);
    
        bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings));
        bootstrap.option(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings));
    
        final ByteSizeValue tcpSendBufferSize = TCP_SEND_BUFFER_SIZE.get(settings);
        if (tcpSendBufferSize.getBytes() > 0) {
            bootstrap.option(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));
        }
    
        final ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.get(settings);
        if (tcpReceiveBufferSize.getBytes() > 0) {
            bootstrap.option(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));
        }
    
        bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
    
        final boolean reuseAddress = TCP_REUSE_ADDRESS.get(settings);
        bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
    
        return bootstrap;
    }
    

    1.1.2 获取客户端连接

    上面只是创建了Bootstrap模板,下面是具体打开Connection时的逻辑:克隆模板得到Bootstrap,注册handler等。

    具体看实现代码:

    //TcpTransport
     @Override
    public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
        ...
        try {
            ensureOpen();
            try {
                ...
                for (int i = 0; i < numConnections; ++i) {
                    try {
                       ...
                       //初始化Channel
                        TcpChannel channel = initiateChannel(node, ...
                    } catch (Exception e) {
                        ...
                    }
                }
    
               ...
    }
    

    TcpTransport.initiateChannel在子类Netty4Transport实现:

    //Netty4Transport
    @Override
    protected Netty4TcpChannel initiateChannel(DiscoveryNode node, ActionListener<Void> listener) throws IOException {
        InetSocketAddress address = node.getAddress().address();
        //根据模板Bootstrap克隆具体使用的对象实例
        Bootstrap bootstrapWithHandler = clientBootstrap.clone();
        //注册handler,这里注册的是一个ChannelInitializer对象
        //当客户端连接服务端成功后向其pipeline设置handler
        //注册的handler则涉及到编码、解码、粘包/拆包解决、报文处理等解决,我们后面和
        //服务端handler一起介绍
        bootstrapWithHandler.handler(getClientChannelInitializer(node));
        bootstrapWithHandler.remoteAddress(address);
        ChannelFuture channelFuture = bootstrapWithHandler.connect();
    
        Channel channel = channelFuture.channel();
        if (channel == null) {
            ExceptionsHelper.maybeDieOnAnotherThread(channelFuture.cause());
            throw new IOException(channelFuture.cause());
        }
        addClosedExceptionLogger(channel);
    
        Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, "default");
        channel.attr(CHANNEL_KEY).set(nettyChannel);
    
        channelFuture.addListener(...);
    
        return nettyChannel;
    }
    

    1.2 服务端ServerBootstrap创建

    ////Netty4Transport
    private void createServerBootstrap(ProfileSettings profileSettings) {
        String name = profileSettings.profileName;
        ...
        final ThreadFactory workerFactory = daemonThreadFactory(this.settings, TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, name);
    
        final ServerBootstrap serverBootstrap = new ServerBootstrap();
    
        serverBootstrap.group(new NioEventLoopGroup(workerCount, workerFactory));
        //设置Netty服务端NioServerSocketChannel
        serverBootstrap.channel(NioServerSocketChannel.class);
        //这里是重点,熟悉Netty的应该知道,childHandler一般会设置为ChannelInitializer
        //当客户端连接成功后向其pipeline设置handler
        //注册的handler则涉及到具体的编码、解码、粘包/拆包解决、报文处理等
        serverBootstrap.childHandler(getServerChannelInitializer(name));
        //注册服务端异常处理handler
        serverBootstrap.handler(new ServerChannelExceptionHandler());
    
        serverBootstrap.childOption(ChannelOption.TCP_NODELAY, profileSettings.tcpNoDelay);
        serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, profileSettings.tcpKeepAlive);
    
        if (profileSettings.sendBufferSize.getBytes() != -1) {
            serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(profileSettings.sendBufferSize.getBytes()));
        }
    
        if (profileSettings.receiveBufferSize.getBytes() != -1) {
            serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(profileSettings.receiveBufferSize.bytesAsInt()));
        }
    
        serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
        serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
    
        serverBootstrap.option(ChannelOption.SO_REUSEADDR, profileSettings.reuseAddress);
        serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, profileSettings.reuseAddress);
        serverBootstrap.validate();
    
        serverBootstraps.put(name, serverBootstrap);
    }
    

    2 客户端、服务端Handler介绍

    这里的标题为客户端、服务端Handler介绍,下文具体内容涉及到编码、解码、粘包/拆包解决、报文处理等。

    首先看一下上面服务端、客户端ChannelInitializer实现:

    //Netty4Transport
    protected ChannelHandler getClientChannelInitializer(DiscoveryNode node) {
        return new ClientChannelInitializer();
    }
    
    //Netty4Transport.ClientChannelInitializer
    protected void initChannel(Channel ch) throws Exception {
        //注册负责记录log的handler,但是进入ESLoggingHandler具体实现
        //可以看到其没有做日志记录操作,源码注释说明因为TcpTransport会做日志记录
        ch.pipeline().addLast("logging", new ESLoggingHandler());
        //注册解码器,这里没有注册编码器因为编码是在TcpTransport实现的,
        //需要发送的报文到达Channel已经是编码之后的格式了
        ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
        // using a dot as a prefix means this cannot come from any settings parsed
        //负责对报文进行处理,主要识别是request还是response
        //然后进行相应的处理
        ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this));
    }
    
    //Netty4Transport
    protected ChannelHandler getServerChannelInitializer(String name) {
        return new ServerChannelInitializer(name);
    }
    
    //Netty4Transport.ServerChannelInitializer
    //这里注册的handler和上面客户端是一样的
    @Override
    protected void initChannel(Channel ch) throws Exception {
        addClosedExceptionLogger(ch);
        Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, name);
        ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
        ch.pipeline().addLast("logging", new ESLoggingHandler());
        ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
        ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this));
        serverAcceptedChannel(nettyTcpChannel);
    }
    

    2.1 报文处理-Netty4MessageChannelHandler

    Netty4MessageChannelHandler负责对报文进行处理,具体看源码:

    //Netty4MessageChannelHandler
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Transports.assertTransportThread();
        assert msg instanceof ByteBuf : "Expected message type ByteBuf, found: " + msg.getClass();
    
        final ByteBuf buffer = (ByteBuf) msg;
        try {
            Channel channel = ctx.channel();
            Attribute<Netty4TcpChannel> channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY);
            //调用TcpTransport.inboundMessage进行消息处理
            transport.inboundMessage(channelAttribute.get(), Netty4Utils.toBytesReference(buffer));
        } finally {
            buffer.release();
        }
    }
    

    TcpTransport会判断该报文是request还是response,然后进行处理:

    //TcpTransport
    public void inboundMessage(TcpChannel channel, BytesReference message) {
        try {
            transportLogger.logInboundMessage(channel, message);
            // Message length of 0 is a ping
            if (message.length() != 0) {
                //判断逻辑所在,不再展开赘述,源码写的非常清楚,可自行参考源码
                messageReceived(message, channel);
            }
        } catch (Exception e) {
            onException(channel, e);
        }
    }
    

    2.2 编码

    ElasticSearch并没有像传统的Netty编程那样,注册Encoder和Decoder,从上面的介绍可以看出,ElasticSearch只注册了Decoder,那么ElasticSearch是如何编码的呢?上面也提到了,ElasticSearch的编码是在TcpTransport实现的,TcpTransport会对报文进行编码,然后直接发送。

    TcpTransport发送请求和响应的函数为sendRequestToChannelsendResponse,二者最后都会调用buildMessage进行Encode:

    //TcpTransport
     /**
        * Serializes the given message into a bytes representation
        */
    private BytesReference buildMessage(long requestId, byte status, Version nodeVersion, TransportMessage message,
                                        CompressibleBytesOutputStream stream) throws IOException {
        final BytesReference zeroCopyBuffer;
        //首先向stream里写消息体字节
        if (message instanceof BytesTransportRequest) { // what a shitty optimization - we should use a direct send method instead
            BytesTransportRequest bRequest = (BytesTransportRequest) message;
            assert nodeVersion.equals(bRequest.version());
            bRequest.writeThin(stream);
            zeroCopyBuffer = bRequest.bytes;
        } else {
            message.writeTo(stream);
            zeroCopyBuffer = BytesArray.EMPTY;
        }
        // we have to call materializeBytes() here before accessing the bytes. A CompressibleBytesOutputStream
        // might be implementing compression. And materializeBytes() ensures that some marker bytes (EOS marker)
        // are written. Otherwise we barf on the decompressing end when we read past EOF on purpose in the
        // #validateRequest method. this might be a problem in deflate after all but it's important to write
        // the marker bytes.
        //获取写入的body
        final BytesReference messageBody = stream.materializeBytes();
        //创建header,header里封装了请求ID、status、version、body长度等
        //status里就指明了该请求是request还是response等,具体可参考TransportStatus
        final BytesReference header = buildHeader(requestId, status, stream.getVersion(), messageBody.length() + zeroCopyBuffer.length());
        //组合body、header成一个完整的报文
        return new CompositeBytesReference(header, messageBody, zeroCopyBuffer);
    }
    
    private BytesReference buildHeader(long requestId, byte status, Version protocolVersion, int length) throws IOException {
        try (BytesStreamOutput headerOutput = new BytesStreamOutput(TcpHeader.HEADER_SIZE)) {
            headerOutput.setVersion(protocolVersion);
            TcpHeader.writeHeader(headerOutput, requestId, status, protocolVersion, length);
            final BytesReference bytes = headerOutput.bytes();
            assert bytes.length() == TcpHeader.HEADER_SIZE : "header size mismatch expected: " + TcpHeader.HEADER_SIZE + " but was: "
                + bytes.length();
            return bytes;
        }
    }
    

    以上就是Encoder的逻辑。

    2.3 解码以及粘包/拆包解决

    解码以及粘包/拆包解决的实现其实是一起的,我们知道经典的粘包/拆包解决方案是固定报文长度、指定分隔符、报文头记录报文长度等。

    ElasticSearch使用的方案是报文头记录报文长度,从上面的Encoder可知,在header中记录了body的长度,所以报文接收方可据此处理粘包/拆包问题,并解析出报文,具体实现在Netty4SizeHeaderFrameDecoder中。

    Netty4SizeHeaderFrameDecoder继承自ByteToMessageDecoder,所以具体的decode在其decode函数中:

    //Netty4SizeHeaderFrameDecoder
     @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        try {
            boolean continueDecode = true;
            while (continueDecode) {
                //查看能不能读取至少6个字节的长度,6个字节因为header的长度为6个字节,
                //这里主要是查看能不能读取一个完整的header,因为读取
                //header之后才能知道body的字节数,进而才能读取body
                //如果可以读取一个完整的header,则返回header中记录的body字节数
                int messageLength = TcpTransport.readMessageLength(Netty4Utils.toBytesReference(in));
                //没有6个字节,则等待后续数据流
                //具体如何等待后续数据流,可以看看ByteToMessageDecoder的实现,其实就是有一个cumulation负责积累收到的字节,
                //每次收到新字节放入cumulation中,再调用具体的decode函数尝试进行解码
                if (messageLength == -1) {
                    continueDecode = false;
                } else {
                    //此报文总字节数=heander字节数+body字节数
                    int messageLengthWithHeader = messageLength + HEADER_SIZE;
                    // If the message length is greater than the network bytes available, we have not read a complete frame.
                    //报文总字节数大于当前收到的字节数,表示报文还没有全部传输过来
                    //继续等待后续数据流
                    if (messageLengthWithHeader > in.readableBytes()) {
                        continueDecode = false;
                    } else {
                        //报文总字节数小于收到的字节数,则进行报文读取操作
                        final ByteBuf message = in.retainedSlice(in.readerIndex() + HEADER_SIZE, messageLength);
                        out.add(message);
                        in.readerIndex(in.readerIndex() + messageLengthWithHeader);
                    }
                }
            }
        } catch (IllegalArgumentException ex) {
            throw new TooLongFrameException(ex);
        }
    }
    

    3 安全通信SecurityNetty4Transport

    首先看源码中对SecurityNetty4Transport的注释:

    Implementation of a transport that extends the {@link Netty4Transport} to add SSL and IP Filtering

    可见SecurityNetty4Transport就是在Netty4Transport的基础上增加了SSL安全功能。是Netty4Transport的一个子类。

    在上面我们介绍了Netty4Transport中客户端和服务端是如何注册ChannelInitializer的。Netty4Transport在建立客户端、服务端Bootstrap时会分别调用getClientChannelInitializergetServerChannelInitializer获取ChannelInitializer实例。

    SecurityNetty4Transport实现SSL安全验证的关键就在对父类Netty4Transport方法getClientChannelInitializergetServerChannelInitializer的重写。

    首先看父类Netty4Transport方法getClientChannelInitializergetServerChannelInitializer的定义:

    //Netty4Transport
    protected ChannelHandler getServerChannelInitializer(String name) {
        return new ServerChannelInitializer(name);
    }
    
    protected ChannelHandler getClientChannelInitializer(DiscoveryNode node) {
        return new ClientChannelInitializer();
    }
    

    再看子类SecurityNetty4Transport方法getClientChannelInitializergetServerChannelInitializer的定义:

    //SecurityNetty4Transport
     @Override
    public final ChannelHandler getServerChannelInitializer(String name) {
        //如果设置启用SSL,则返回SSL功能的ChannelInitializer
        //否则返回没有SSL的ChannelInitializer(通过调用父类Netty4Transport.getServerChannelInitializer实现)
        if (sslEnabled) {
            SSLConfiguration configuration = profileConfiguration.get(name);
            if (configuration == null) {
                throw new IllegalStateException("unknown profile: " + name);
            }
            return getSslChannelInitializer(name, configuration);
        } else {
            return getNoSslChannelInitializer(name);
        }
    }
    
    //返回Security的ChannelInitializer,其实这里有个小的问题,上面服务端ChannelInitializer获取时判断了是否启用了SSL,为什么客户端这里没有判断?
    //其实客户端也判断了,不过是在SecurityClientChannelInitializer的实现中判断的,后文会说到
    @Override
    protected ChannelHandler getClientChannelInitializer(DiscoveryNode node) {
        return new SecurityClientChannelInitializer(node);
    }
    
    //返回具有SSL功能处理的ChannelInitializer
     protected ServerChannelInitializer getSslChannelInitializer(final String name, final SSLConfiguration configuration) {
            return new SslChannelInitializer(name, sslConfiguration);
    }
    
    //直接返回父类Server ChannelInitializer实现,没有SSL功能
    protected ChannelHandler getNoSslChannelInitializer(final String name) {
        return super.getServerChannelInitializer(name);
    }
    

    通过上面可以看出,SSL功能实现关键在于SslChannelInitializerSecurityClientChannelInitializer,这两个类分别是SecurityNetty4Transport父类返回的ServerChannelInitializerClientChannelInitializer的子类:

    //SecurityNetty4Transport
    public class SslChannelInitializer extends ServerChannelInitializer {
        private final SSLConfiguration configuration;
    
        public SslChannelInitializer(String name, SSLConfiguration configuration) {
            super(name);
            this.configuration = configuration;
        }
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
            //先调用父类方法,注册解码器等handler
            super.initChannel(ch);
            SSLEngine serverEngine = sslService.createSSLEngine(configuration, null, -1);
            serverEngine.setUseClientMode(false);
            final SslHandler sslHandler = new SslHandler(serverEngine);
            //额外注册一个sslHandler
            ch.pipeline().addFirst("sslhandler", sslHandler);
        }
    }
    
    private class SecurityClientChannelInitializer extends ClientChannelInitializer {
    
        private final boolean hostnameVerificationEnabled;
        private final SNIHostName serverName;
    
        SecurityClientChannelInitializer(DiscoveryNode node) {
            this.hostnameVerificationEnabled = sslEnabled && sslConfiguration.verificationMode().isHostnameVerificationEnabled();
            String configuredServerName = node.getAttributes().get("server_name");
            if (configuredServerName != null) {
                try {
                    serverName = new SNIHostName(configuredServerName);
                } catch (IllegalArgumentException e) {
                    throw new ConnectTransportException(node, "invalid DiscoveryNode server_name [" + configuredServerName + "]", e);
                }
            } else {
                serverName = null;
            }
        }
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
            //先调用父类方法,注册解码器等handler
            super.initChannel(ch);
            //客户端ChannelInitializer在这里判断是否启用了SSL
            if (sslEnabled) {
                //如果启用了SSL则额外注册一个sslHandler
                ch.pipeline().addFirst(new ClientSslHandlerInitializer(sslConfiguration, sslService, hostnameVerificationEnabled,
                    serverName));
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:ElasticSearch 基于Netty的通信原理

          本文链接:https://www.haomeiwen.com/subject/pcnhhqtx.html