美文网首页
Netty学习之内置处理器以及编解码器

Netty学习之内置处理器以及编解码器

作者: 颜洛滨 | 来源:发表于2018-10-15 16:08 被阅读40次

    Netty学习之内置处理器以及编解码器

    前言

    SSL/TLS

    SSL/TLS是目前广泛使用的加密,位于TCP之上,其他的应用层协议之下,当应用层将数据交给SSL/TLS之后,数据会被进行加密,关于SSL/TLS更多的内容,可以参考:SSL/TLS协议运行机制的概述OpenSSL 与 SSL 数字证书概念贴

    javax.net.ssl中提供了原生的SSL/TLS支持,通过SSLContextSSLEngine可以方便地进行数据的加密及解密。

    在Netty中,为了方便开发者使用SSL/TLS,Netty提供了SSlHandler(本质是一个ChannelHandler),只要为其配置一个SSLEngine即可进行加密数据传输。

    class SslEngineInitializer extends ChannelInitializer<Channel> {
    
        private final SslContext context;
        private final boolean startTls;
    
        public SslEngineInitializer(SslContext context, boolean startTls) {
            this.context = context;
            this.startTls = startTls;
        }
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
            SSLEngine engine = context.newEngine(ch.alloc());
            ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls));
        }
    }
    

    HTTP/HTTPS

    一个HTTP请求或者相应可能由多个部分组成,一个完整的Http请求由以下内容组成

    • 一个HttpRequest,表示请求头部
    • 一个或者多个HttpContent表示Http的内容
    • 一个LastHttpContent标志Http内容的结束

    由于一个Http请求包含请求部分以及相应部分,而对于客户端及服务端来说,这两者是不相同的,客户端发送请求,服务端接收请求,服务端发送响应,客户端接收响应,所以,需要有不同的处理器来处理不同的内容

    常用编解码器

    HttpRequestEncoder
    HttpResponseEncoder
    HttpRequestDecoder
    HttpResponseDecoder
    
    class HttpPipelineInitializer extends ChannelInitializer<Channel> {
    
        private final boolean client;
    
        public HttpPipelineInitializer(boolean client) {
            this.client = client;
        }
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            if (client) {
                // 解码响应
                pipeline.addLast("decoder", new HttpResponseEncoder());
                // 编码请求
                pipeline.addLast("encoder", new HttpRequestEncoder());
            }else {
                // 解码请求
                pipeline.addLast("decoder", new HttpRequestDecoder());
                // 编码响应
                pipeline.addLast("encoder", new HttpResponseEncoder());
            }
        }
    }
    

    当一个字节流被解码成Http内容之后,就可以操作具体的HttpObject消息了,但是由于一个完整的请求/响应可能会被拆分成几个部分,所以,直接使用其实不是很合适,更好地方式是使用聚合器。

    class HttpAggregatorInitializer extends ChannelInitializer<Channel> {
    
        private final boolean client;
    
        public HttpAggregatorInitializer(boolean client) {
            this.client = client;
        }
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            if (client) {
                // 客户端编解码器
                // 等同于上面的两者
                pipeline.addLast("codec", new HttpClientCodec());
            }else {
                // 服务端编解码器
                pipeline.addLast("codec", new HttpServerCodec());
            }
            // 聚合器,允许最大大小为 512 * 1024
            pipeline.addLast("aggregator", new HttpObjectAggregator(512 * 1024));
        }
    }
    

    聚合之后我们可以直接使用FullHttpRequestFullHttpResponse消息来处理,这两个对象表示的是完整的请求/响应了。

    当使用HTTP的时候,如果内容大部分是文本数据,我们一般会使用压缩技术,虽然会增加CPU开销,但是可以有效地节省网络带宽,Netty同样提供了对应的handler并且提供gzip和deflate技术。

    class HttpCompressionInitializer extends ChannelInitializer<Channel> {
    
        private final boolean client;
    
        public HttpCompressionInitializer(boolean client) {
            this.client = client;
        }
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            if (client) {
                pipeline.addLast("codec", new HttpClientCodec());
                // 客户端解压
                pipeline.addLast("decompressor", new HttpContentDecompressor());
            }else {
                pipeline.addLast("codec", new HttpServerCodec());
                // 服务端加压
                pipeline.addLast("compressor", new HttpContentCompressor());
            }
        }
    }
    

    同时需要注意,如果是JDK6及以前的版本,需要引入JZlib依赖。

    <dependency>
        <groupId>com.jcraft</groupId>
        <artifactId>jzlib</artifactId>
        <version>1.1.3</version>
    </dependency>
    

    如果我们需要使用HTTPS,只需要将SslHanlder配置在所有handler的最前面即可。

    空闲检测及超时

    空闲检测及超时,也可以称为心跳检测,目的就是确保连接的另一端依旧在线,如果不在线,则断开连接,节省资源。

    Netty中提供了几个常用的handler

    • IdleStateHandler,如果连接空闲时间过长,则触发一个IdleStateEvent,可以通过在ChannelInboundHandler覆盖userEventTriggered()来处理该事件
    • ReadTimeoutHandler,当channel中一段时间没有inbound数据的时候,抛出readTimeoutException并且关闭channel,可以通过exceptionCaught()捕获该异常。
    • WriteTimeoutHandler,当channel中有一段时间没有outbound数据时,抛出writeTimeoutException并且关闭channel,可以通过exceptionCaught()捕获异常。

    需要注意的是,IdleStateHandler的作用是用于检测channel在指定时间内是否有数据流通,如果没有的话,则触发一个IdleStateEvent,该Event是用于通知本channel的,而不是用于通知对方,所以,我们可以根据收到的Event来决定处理逻辑,比如

    • 对于服务端:收到超过3个对应的事件,表示超过3 * time时间内没有交互,因此决定断开连接。
    • 对于客户端:收到事件后,发送一个心跳包(内容其实是随意的,主要是由数据流动),表明自己还活着(注意该事件同样是给自己的,不是给对方的,所以都需要增加对应的逻辑)

    下面举一个具体的例子

    服务端

    public class Server {
        public static void main(String[] args) {
            ServerBootstrap bootstrap = new ServerBootstrap();
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup worker = new NioEventLoopGroup();
            bootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new IdleStateHandlerInitializer());
            try {
                ChannelFuture future = bootstrap.bind(8080).sync();
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                worker.shutdownGracefully();
                boss.shutdownGracefully();
            }
        }
    }
    
    /**
    *  服务端空闲检测
    */
    class IdleStateHandlerInitializer extends ChannelInitializer<Channel>{
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new IdleStateHandler(0, 0, 60));
            pipeline.addLast(new HeartbeatHandler());
        }
    
        /**
        *  服务端的空闲处理逻辑
        */
        private class HeartbeatHandler extends ChannelInboundHandlerAdapter {
    
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                if (evt instanceof IdleStateEvent) {
                    IdleStateEvent event = (IdleStateEvent) evt;
                    // 如果超过,则断开连接
                    if (event.state() == IdleState.ALL_IDLE) {
                        ctx.writeAndFlush(Unpooled.copiedBuffer("bybe".getBytes()));
                        ctx.close();
                    }
                }else {
                    super.userEventTriggered(ctx, evt);
                }
            }
        }
    }
    

    客户端

    class Client {
        public static void main(String[] args) {
            Bootstrap bootstrap = new Bootstrap();
            EventLoopGroup group = new NioEventLoopGroup();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<Channel>() {
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 如果这里改成70,则会断开
                            pipeline.addLast(new IdleStateHandler(0, 0, 50, TimeUnit.SECONDS));
                            pipeline.addLast(new Heartbeat());
                        }
                    });
            try {
                ChannelFuture fu = bootstrap.connect(new InetSocketAddress("localhost", 8080)).sync();
                fu.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                group.shutdownGracefully();
            }
        }
    
        /**
        *  客户端空闲检测
        */
        private static class Heartbeat extends ChannelInboundHandlerAdapter {
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                if (evt instanceof IdleStateEvent) {
                    // 发送心跳包
                    ctx.writeAndFlush(Unpooled.copiedBuffer("heartbeat".getBytes()));
                }
            }
    
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ByteBuf data = (ByteBuf) msg;
                System.out.println(data.toString(CharsetUtil.UTF_8));
            }
        }
    }
    

    基于分隔符及长度的协议处理

    在某些协议中,是根据换行符或者指定长度来划分的,Netty中提供了基于这两者的处理器

    基于分隔符

    Netty中主要的基于分隔符的处理器有以下两个

    • DelimeterBasedFrameDecoder,基于指定分隔符
    • LineBasedFrameDecoder,基于换行符(DelimeterBasedFrameDecoder的特例)

    基于长度

    Netty中基于长度的处理器有以下两个

    • FixedLenghtFrameDecoder,固定长度
    • LengthFieldBasedFrameDecoder,通过构造器指定长度字段的偏移及所占字节数

    发送大数据

    为了高效地发送大量数据,Netty中提供了FileRegion接口(默认实现DefaultFileRegion),作为支持zero-copy的传输器,用于在channel中发送文件

    如果需要将数据从文件系统拷贝到用户空间,可以使用ChunkedWriteHandler,它提供了低消耗内存异步将大数据流写出。

    序列化

    Netty提供的JDK序列化相关的处理器

    • CompatibleObjectDecoder,适用于非Netty的并且使用JDK的序列化器
    • CompatibleObjectEncoder,同上
    • ObjectDecoder,适用于在JDK序列化器之上使用自定义序列化
    • ObjectEncoder,同上

    同时,Netty还提供了基于ProtoBuf的处理器,具体的可以参考文件即可,使用上基本差不多

    总结

    本小节我们主要学习了Netty所提供的几个常用的handler,包括了SSL/TLS相关的handler、HTTP相关的handler、空闲处理器(心跳)、协议分割处理器以及序列化处理器等,有了这些常用的处理器,可以不用处理具体协议的相关内容,从而可以更专注于逻辑方面的处理。

    相关文章

      网友评论

          本文标题:Netty学习之内置处理器以及编解码器

          本文链接:https://www.haomeiwen.com/subject/wxgfzftx.html