美文网首页
【网络编程】Netty中的编解码之LineEncoder和Del

【网络编程】Netty中的编解码之LineEncoder和Del

作者: 程就人生 | 来源:发表于2023-03-04 12:43 被阅读0次

    如何在句尾增加特殊符号解决粘包/拆包问题呢?

    在 netty 的类库中还有这么两个类, LineEncoder 编码类 和 DelimiterBasedFrameDecoder 解码类。

    打开 DelimiterBasedFrameDecoder 源码,该类继承自ByteToMessageDecoder,负责根据数据包末尾的特殊符号来对数据包进行拆分。

    在 56 行的示例中,加入了换行符作为每个数据包的分隔符。

    在该类中,提供了 7 个成员变量:

    1. delimiters:一个或多个 ByteBuf 数组类型的分隔符;

    2. maxFrameLength:数据包最大长度;

    3. stripDelimiter:是否跳过分隔符;

    4. failFast:为true时在没有读取完整个数据包时就报错,否则在读完才报错;

    5. discardingTooLongFrame:是否丢弃过长的数据;

    6. tooLongFrameLength:数据包最大长度;

    7. LineBasedFrameDecoder:换行分隔符解码器,只能设置换行符作为分隔符。

    在这 7 个参数中,有 5 个成员可以通过该类的 6 个构造函数分别设置,最终在 166 行的构造函数中完成成员变量的赋值。

    在该构造函数中,在168 行对数据包的最大长度进行了判断,最大长度不能设置为小于 0 的数字。在 169 行,分隔符不能设置为 null。在 172 行,分隔符数组不能为空数组。在 176 行,判断分隔符是否为换行符,如果为换行符则使用 LineBasedFrameDecoder 换行符解码类进行解码。在 182 行,对分隔符数组进行遍历,在 183 行对分隔符所在的 ByteBuf 进行不能为 null 和 必须可读的验证。在 184 行将分隔符加入成员变量delimiters数组中。

    isLineBased 方法是验证分隔符是否为换行符的验证。

    解码的核心逻辑在 decode 方法中。

    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        if (lineBasedDecoder != null) {
            return lineBasedDecoder.decode(ctx, buffer);
        }
        // Try all delimiters and choose the delimiter which yields the shortest frame.
        int minFrameLength = Integer.MAX_VALUE;
        ByteBuf minDelim = null;
        for (ByteBuf delim: delimiters) {
            int frameLength = indexOf(buffer, delim);
            if (frameLength >= 0 && frameLength < minFrameLength) {
                minFrameLength = frameLength;
                minDelim = delim;
            }
        }
    
        if (minDelim != null) {
            int minDelimLength = minDelim.capacity();
            ByteBuf frame;
    
            if (discardingTooLongFrame) {
                // We've just finished discarding a very large frame.
                // Go back to the initial state.
                discardingTooLongFrame = false;
                buffer.skipBytes(minFrameLength + minDelimLength);
    
                int tooLongFrameLength = this.tooLongFrameLength;
                this.tooLongFrameLength = 0;
                if (!failFast) {
                    fail(tooLongFrameLength);
                }
                return null;
            }
    
            if (minFrameLength > maxFrameLength) {
                // Discard read frame.
                buffer.skipBytes(minFrameLength + minDelimLength);
                fail(minFrameLength);
                return null;
            }
    
            if (stripDelimiter) {
                frame = buffer.readRetainedSlice(minFrameLength);
                buffer.skipBytes(minDelimLength);
            } else {
                frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
            }
    
            return frame;
        } else {
            if (!discardingTooLongFrame) {
                if (buffer.readableBytes() > maxFrameLength) {
                    // Discard the content of the buffer until a delimiter is found.
                    tooLongFrameLength = buffer.readableBytes();
                    buffer.skipBytes(buffer.readableBytes());
                    discardingTooLongFrame = true;
                    if (failFast) {
                        fail(tooLongFrameLength);
                    }
                }
            } else {
                // Still discarding the buffer since a delimiter is not found.
                tooLongFrameLength += buffer.readableBytes();
                buffer.skipBytes(buffer.readableBytes());
            }
            return null;
        }
    }
    
    /**
     *  通过特殊符号获取数据包长度
     */
    private static int indexOf(ByteBuf haystack, ByteBuf needle) {
        for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i ++) {
            int haystackIndex = i;
            int needleIndex;
            for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex ++) {
                if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) {
                    break;
                } else {
                    haystackIndex ++;
                    if (haystackIndex == haystack.writerIndex() &&
                        needleIndex != needle.capacity() - 1) {
                        return -1;
                    }
                }
            }
    
            if (needleIndex == needle.capacity()) {
                // Found the needle from the haystack!
                return i - haystack.readerIndex();
            }
        }
        return -1;
    }
    

    在 2 行代码中,判断成员变量 lineBasedDecoder 是否为 null,如果不为 null 则使用该类对数据包进行解码。

    在 8 行,对分隔符数组进行遍历。在 9 行,通过 indexOf 方法获取分隔符所在buffer 的索引。在 10 行,如果数据包长大于 0 并且小于 最小数据包长。在 11-12行,设置最小数据包长等于当前数据包长,设置最小分隔符等于当前分隔符。

    在 16 行,最小分隔符不为 null 的时候,获取最小分隔符所在 ByteBuf 的所占的空间大小。在 20 行,如果 discardingTooLongFrame 为 true 时,buffer要丢弃 minFrameLength + minDelimLength 个字节数。

    在 34 行,最小包长 minFrameLength 大于最大包长 maxFrameLength时,buffer 要丢弃 minFrameLength + minDelimLength 个字节数。

    在 41 行,如果 stripDelimiter 为true 时,在 42 行从 buffer 中根据最小数据包长度 minFrameLength 获取缓冲区子区域的一个新的ByteBuf,在 43 行跳过 buffer 中的特殊字符长度 minDelimLength 的字节。stripDelimiter 默认值为 true,也就是在不设置的时候会走这个逻辑。

    stripDelimiter 为 false 的时候,在 45 行从buffer 中根据最小数据包长度加最小分隔符长度 minFrameLength + minDelimLength,获取缓冲区子区域的一个新的 frame 并返回。最小包长在 6 行已设置,为 Integer类型的最大值。

    一句话,在 9 行获取数据包中最小索引的分隔符,以及最小索引分隔符的长度。在 42 行,根据最小索引的分隔符分离出一个新的ByteBuf ,并跳过索引,返回新的 ByteBuf,并进行下一轮的分割,直到缓冲区的数据处理完毕。

    重要的源码已经看过一遍了,接下来看服务器端代码:

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.PooledByteBufAllocator;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.LineEncoder;
    import io.netty.handler.codec.string.LineSeparator;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.util.CharsetUtil;
    import io.netty.util.concurrent.Future;
    import io.netty.util.concurrent.GenericFutureListener;
    
    /**
     * Netty服务器端
     * @author 程就人生
     * @date 2023年01月06日
     * @Description 
     *
     */
    public class TestServer {
    
        public void bind(final int port){
            // 配置服务端的Nio线程组,boosGroup负责新客户端接入
            EventLoopGroup boosGroup = new NioEventLoopGroup();
            // workerGroup负责I/O消息处理
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try{            
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(boosGroup, workerGroup)
                // 线程组设置为非阻塞
                .channel(NioServerSocketChannel.class)
                //连接缓冲池的大小
              .option(ChannelOption.SO_BACKLOG, 1024)
              //设置通道Channel的分配器
              .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                //设置长连接
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                // 采用匿名内部类的方式,声明hanlder
                .childHandler(new ChannelInitializer<SocketChannel>(){
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                      // 3.1、以特殊符号作为一个包的结束符
                        ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                      // ByteBuf 转 字符串
                      ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                      
                      // 字符串 转 ByteBuf
                      ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                      // 3.2 、每次发出去的数据都增加特殊符号
                      ch.pipeline().addLast(new LineEncoder(new LineSeparator("$_"), CharsetUtil.UTF_8)); 
                        // 事件处理绑定
                        ch.pipeline().addLast(new ServerHandler());
                    }               
                });
                // 绑定端口
                ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
                // 服务端启动监听事件
            channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
                    public void operationComplete(Future<? super Void> future) throws Exception {
                      //启动成功后的处理
                        if (future.isSuccess()) {
                           System.out.println("服务器启动成功,Started Successed:" + port);
                        } else {
                          System.out.println("服务器启动失败,Started Failed:" + port);
                        }
                    }
                });
            // 等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                // 优雅退出
                boosGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }        
        }
        
        public static void main(String[] argo){
            new TestServer().bind(8080);
        }
    }
    
    /**
     * 服务器端handler
     * @author 程就人生
     * @date 2023年01月06日
     * @Description 
     *
     */
    class ServerHandler extends ChannelInboundHandlerAdapter{
        // 对接收的消息进行计数
        private static int counter;
        // I/O消息的接收处理
        @Override
        public void channelRead(ChannelHandlerContext ctx,Object msg){
            try{
                // 把接收到的内容输出到控制台
                System.out.println("这里是服务器端控制台:" + msg + ",计数:" + ++counter);
                // 1.2、发送字符串
                String resp = "来自服务器端的消息~!";            
                // 返回信息给客户端
                ctx.writeAndFlush(resp);            
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
           // 遇到异常时关闭ChannelHandlerContext 
           ctx.close();
        }
    }
    

    在 54-55 行,加入了 DelimiterBasedFrameDecoder 解码器,设置 $_ 为每个数据包的解码分隔符。

    在 63 行,加入了 LineEncoder 编码器,通过这个编码类,可以在每个将要发出去数据包的包尾加入特殊分隔符 $_ ,这样就可以全局控制,随时换分隔符,无需在每个需要发送的数据包后面手动添加了。

    客户端代码:

    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.LineEncoder;
    import io.netty.handler.codec.string.LineSeparator;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.util.CharsetUtil;
    
    /**
     * netty客户端
     * @author 程就人生
     * @date 2023年01月06日
     * @Description 
     *
     */
    public class TestClient {
      
        public void connect(int port, String host){
            // 客户端Nio线程组
            EventLoopGroup group = new NioEventLoopGroup();
            try{   
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group)
                // 线程组设置为非阻塞
                .channel(NioSocketChannel.class)
              .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<SocketChannel>(){
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {                   
                      // 3.1、以特殊符号作为一个包的结束符
                        ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                      // ByteBuf 转 字符串
                      ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                      
                      // 字符串 转 ByteBuf
                      ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                      // 3.2 、每次发出去的数据都增加特殊符号
                      ch.pipeline().addLast(new LineEncoder(new LineSeparator("$_"), CharsetUtil.UTF_8)); 
                        // 事件处理绑定
                        ch.pipeline().addLast(new ClientHandler());
                    }
                });            
                // 建立连接
                ChannelFuture channelFuture = bootstrap.connect(host, port);
                // 等待服务端监听端口关闭
                channelFuture.channel().closeFuture().sync();
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                // 优雅退出
                group.shutdownGracefully();
            }
        }
        
        public static void main(String[] argo){
            new TestClient().connect(8080, "localhost");
        }
    }
    
    /**
     * 客户端处理handler
     * @author 程就人生
     * @date 2023年01月06日
     * @Description 
     *
     */
    class ClientHandler extends ChannelInboundHandlerAdapter{
        // 对接收的消息次数进行计数
        private static int counter;
        
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
          // 1.2、发送字符串
            String req = "来自客户端的消息~!";
            // 连接成功后,发送消息,连续发送100次,模拟数据交互的频繁
            for(int i = 0;i<100;i++){
                ctx.writeAndFlush(req);
            }
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx,Object msg){
            try{
                System.out.println("这里是客户端控制台:" + msg + ",计数:" + ++counter);
            }catch(Exception e){
                e.printStackTrace();
            }        
        }
        
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            //释放资源
            ctx.close();
        }
    }
    

    在 41-52 行的编解码,需要和服务端保持一致。接下来分别运行服务器端和客户端代码。

    服务器端控制台输出:

    客户端控制台输出:

    服务器端控制台和客户端控制台,都打印了100次的数据,没有发生粘包/拆包问题。

    如果有两个换行符会是什么情况呢?

    服务器端编码调整第一部分:在解码的时候,增加一个特殊分隔符 @@。

    ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
    ByteBuf delimiter1 = Unpooled.copiedBuffer("@@".getBytes());
    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter, delimiter1));
    

    服务器端调整第二部分:在响应客户端的时候,增加特殊字符并增加文字。由于编码器 LineEncoder 只支持一组字符,因此我们只能在发送的文字里加第二组分隔符。

    // 1.2、发送字符串
    String resp = "来自服务器端的消息~!@@来自服务器端的消息~!";
    

    客户端修改同服务器保持一致。

    服务器端运行结果,客户端发了 100 条消息,但是有两个分隔符,每条消息都被拆成了两个数据包,因此最后计数为 200,没问题。

    客户端运行结果:服务器收到了 200 条数据,响应客户端 200 条数据,响应客户端的一条数据里又有两个特殊字符,因此客户端收到的数据计数为 400,也没有问题。

    以上便是 LineEncoder 编码器和 DelimiterBasedFrameDecoder 解码器的固定搭配。

    相关文章

      网友评论

          本文标题:【网络编程】Netty中的编解码之LineEncoder和Del

          本文链接:https://www.haomeiwen.com/subject/qclyldtx.html