美文网首页
【网络编程】Netty编解码之 字头 + 数据长度 + 消息体的

【网络编程】Netty编解码之 字头 + 数据长度 + 消息体的

作者: 程就人生 | 来源:发表于2023-03-08 20:42 被阅读0次

    在上一篇文章中介绍了 LengthFieldPrepender + StringEncoder 长度前置的文本编码器 和 LengthFieldBasedFrameDecoder + StringDecoder 基本长度解析的文本解码器组合的使用。

    LengthFieldBasedFrameDecoder 解码器的功能还是比较强大的,可以解析 字头 + 数据长度 + 消息体 的协议格式,不止 6 种,还可以有很多变种。

    在编码的时候,LengthFieldPrepender 长度前置编码类,就有些局限,只能添加数据长度,再添加一个字头就实现不了,今天就在 LengthFieldPrepender 的基础上改造一下,实现增加字头编码的功能。

    做过物联网的朋友都知道,硬件那边使用的都是字节数组,很少有文本类型的信息,所以文本编码器 StringEncoder 会换成 ByteArrayEncoder 字节数组编码器,文本解码器 StringDecoder 会换成 ByteArrayDecoder 字节数组解码器。

    在处理业务逻辑的时候,我们处理的是一组字节数组,在给客户端发送指令的时候,也发送一组字节数组,客户端发给服务器的时候,服务器接收到的也是一组字节数组。

    在项目的历史版本中,服务器端使用拼接的方式,拼接字头,拼接数据长度,然后发给客户端。在这个版本中,我要把字头做成公共的,数据长度也做成公共的,在业务处理器中接收完整的数据包,去掉字头,根据数据长度获取消息体,毕竟消息体是不固定的,至于解析处理的字节代表什么意思,在此可以忽略不计。

    还是老规矩,客户端给服务器端发送100条数据,服务器端反馈给客户端100条数据,查看运行结果是否会发生粘包/拆包问题。

    服务器端代码:

    package com.test.nio.stickyBag;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.ByteBufUtil;
    import io.netty.buffer.PooledByteBufAllocator;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.codec.bytes.ByteArrayDecoder;
    import io.netty.handler.codec.bytes.ByteArrayEncoder;
    import io.netty.util.concurrent.Future;
    import io.netty.util.concurrent.GenericFutureListener;
    
    /**
     * Netty服务器端
     * @author 程就人生
     * @date
     * @Description 
     *
     */
    public class TestServer {
    
        public void bind(final int port){
            // 配置服务端的Nio线程组,boosGroup负责新客户端接入
            EventLoopGroup boosGroup = new NioEventLoopGroup();
            // workerGroup负责I/O消息处理
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try{            
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(boosGroup, workerGroup)
                // 线程组设置为非阻塞
                .channel(NioServerSocketChannel.class)
                //连接缓冲池的大小
              .option(ChannelOption.SO_BACKLOG, 1024)
              //设置通道Channel的分配器
              .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                //设置长连接
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                // 采用匿名内部类的方式,声明hanlder
                .childHandler(new ChannelInitializer<SocketChannel>(){
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                      /**
                       * 包长最大长度不能超过65535
                       * 长度偏移量 1 个字节
                       * 长度占用2个字节
                       * 长度调整-3
                       * 剥离初始字节0
                       */
                      ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,1,2,-3,0));                  
                        // Bytebuf 转 byte数组
                        ch.pipeline().addLast(new ByteArrayDecoder());
                        
                        // 对要发出去的数据进行编码
                        /**
                         * 根据 LengthFieldPrepender解码类改写
                         * 长度占用2个字节
                         * 长度调整0
                         * 长度字段中包含长度本身占用的字节数,
                         * 字头为0XFE
                         * 字头占用1个字节
                         */
                        ch.pipeline().addLast(new LengthHeaderFieldPrepender(2, 0, true, 0XFE, 1));
                        // byte数组  转 Bytebuf
                      ch.pipeline().addLast(new ByteArrayEncoder());   
                      
                        // 事件处理绑定,具体的业务逻辑处理
                        ch.pipeline().addLast(new ServerHandler());
                    }               
                });
                // 绑定端口
                ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
                // 服务端启动监听事件
            channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
                    public void operationComplete(Future<? super Void> future) throws Exception {
                      //启动成功后的处理
                        if (future.isSuccess()) {
                           System.out.println("服务器启动成功,Started Successed:" + port);
                        } else {
                          System.out.println("服务器启动失败,Started Failed:" + port);
                        }
                    }
                });
            // 等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                // 优雅退出
                boosGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }        
        }
        
        public static void main(String[] argo){
            new TestServer().bind(8080);
        }
    }
    
    /**
     * 服务器端handler
     * @author 程就人生
     * @date
     * @Description 
     *
     */
    class ServerHandler extends ChannelInboundHandlerAdapter{
        // 对接收的消息进行计数
        private static int counter;
        // I/O消息的接收处理
        @Override
        public void channelRead(ChannelHandlerContext ctx,Object msg){
            try{
               // 把接收到的内容输出到控制台
                byte[] data = (byte[]) msg;
               int dataLength = data.length;
               ByteBuf buf = Unpooled.buffer(dataLength);
               buf.writeBytes(data);
               System.out.println("这里是服务器端控制台:" + ByteBufUtil.hexDump(buf).toUpperCase() + "计数:" + ++counter);
               
               buf = Unpooled.buffer(15);
               buf.writeByte(0X08);
               buf.writeByte(0XD4);
               buf.writeByte(0X9B);
               buf.writeByte(0X06);
               buf.writeByte(0XB6);
               buf.writeByte(0X01);
               buf.writeByte(0X11);  
               buf.writeByte(0X01);
               buf.writeByte(0X01);
               buf.writeByte(0X00);
               buf.writeByte(0X12);
               buf.writeByte(0X02);
               buf.writeByte(0XF4);
               buf.writeByte(0X01);
               buf.writeByte(0X60);
               // 返回信息给客户端
               ctx.writeAndFlush(buf.array());
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
           // 遇到异常时关闭ChannelHandlerContext 
           ctx.close();
        }
    }
    

    在 59 行,我们对进来的数据先使用 LengthFieldBasedFrameDecoder 类进行解码,数据包长最大不能超过 65535,长度偏移量 1 个字节(字头的长度),长度占用2个字节,长度调整 -3 (字头占用的字节 + 数据长度占用的字节),剥离初始字节 0 个字节 (解析后还需要完整的数据包)。

    经过 LengthFieldBasedFrameDecoder 解码后,在 61 行再经过 ByteArrayDecoder 字节数组解码器解码成字节数组,最后传递给 ServerHandler 进行具体的业务逻辑处理。

    在业务处理类 ServerHandler 中的 channelRead 方法中,我们将接收到的字节数组在控制台打印,并给客户端发送消息体长度为 15 个字节的字节数组,加上消息头和数据长度,客户端收到的应该是 18 个字节的字节数组。在 128 行,使用工具类 ByteBufUtil.hexDump(buf).toUpperCase() 打印客户端发过来的数据到控制台。

    在 72 行,使用 LengthHeaderFieldPrepender 编码类对将要发出的字节数组进行加工,加上字头和数据长度,最后交给 ByteArrayEncoder 编码类将数据写入到 Channel 通道中,发送给客户端。

    根据 LengthFieldPrepender 改写的 LengthHeaderFieldPrepender 代码:

    import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
    import java.nio.ByteOrder;
    import java.util.List;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandler.Sharable;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageEncoder;
    import io.netty.util.internal.ObjectUtil;
    /**
     * 根据 LengthFieldPrepender 类改写
     * @author 程就人生
     * @Date
     * 
     * 解码前:                                                                  解码后:                     
     * +--------+--------+----------------+        +--------+--------+----------------+
     * |  Header| Length | Actual Content |  ----->| Header | Length | Actual Content | 
     * |  0xFE  | 0x0006 | OX00 0X01 0X11 |        |  0xFE  | 0x0006 | OX00 0X01 0X11 |  
     * +------ -+--------+----------------+        +--------+--------+----------------+
     */
    @Sharable
    public class LengthHeaderFieldPrepender extends MessageToMessageEncoder<ByteBuf> {
    
        private final ByteOrder byteOrder;
        private final int lengthFieldLength;
        private final boolean lengthIncludesLengthFieldLength;
        private final int lengthAdjustment;    
        private final int header;
        private final int headerLength;
    
        /**
         * Creates a new instance.
         *
         * @param lengthFieldLength the length of the prepended length field.
         *                          Only 1, 2, 3, 4, and 8 are allowed.
         *
         * @throws IllegalArgumentException
         *         if {@code lengthFieldLength} is not 1, 2, 3, 4, or 8
         */
        public LengthHeaderFieldPrepender(int lengthFieldLength) {
            this(lengthFieldLength, false);
        }
    
        /**
         * Creates a new instance.
         *
         * @param lengthFieldLength the length of the prepended length field.
         *                          Only 1, 2, 3, 4, and 8 are allowed.
         * @param lengthIncludesLengthFieldLength
         *                          if {@code true}, the length of the prepended
         *                          length field is added to the value of the
         *                          prepended length field.
         *
         * @throws IllegalArgumentException
         *         if {@code lengthFieldLength} is not 1, 2, 3, 4, or 8
         */
        public LengthHeaderFieldPrepender(int lengthFieldLength, boolean lengthIncludesLengthFieldLength) {
            this(lengthFieldLength, 0, lengthIncludesLengthFieldLength);
        }
    
        /**
         * Creates a new instance.
         *
         * @param lengthFieldLength the length of the prepended length field.
         *                          Only 1, 2, 3, 4, and 8 are allowed.
         * @param lengthAdjustment  the compensation value to add to the value
         *                          of the length field
         *
         * @throws IllegalArgumentException
         *         if {@code lengthFieldLength} is not 1, 2, 3, 4, or 8
         */
        public LengthHeaderFieldPrepender(int lengthFieldLength, int lengthAdjustment) {
            this(lengthFieldLength, lengthAdjustment, false);
        }
    
        /**
         * Creates a new instance.
         *
         * @param lengthFieldLength the length of the prepended length field.
         *                          Only 1, 2, 3, 4, and 8 are allowed.
         * @param lengthAdjustment  the compensation value to add to the value
         *                          of the length field
         * @param lengthIncludesLengthFieldLength
         *                          if {@code true}, the length of the prepended
         *                          length field is added to the value of the
         *                          prepended length field.
         *
         * @throws IllegalArgumentException
         *         if {@code lengthFieldLength} is not 1, 2, 3, 4, or 8
         */
        public LengthHeaderFieldPrepender(int lengthFieldLength, int lengthAdjustment, boolean lengthIncludesLengthFieldLength) {
            this(ByteOrder.BIG_ENDIAN, lengthFieldLength, lengthAdjustment, lengthIncludesLengthFieldLength, 0, 0);
        }
        
        public LengthHeaderFieldPrepender(int lengthFieldLength, int lengthAdjustment, boolean lengthIncludesLengthFieldLength, int header, int headerLength) {
            this(ByteOrder.BIG_ENDIAN, lengthFieldLength, lengthAdjustment, lengthIncludesLengthFieldLength, header, headerLength);
        }
    
        /**
         * Creates a new instance.
         *
         * @param byteOrder         the {@link ByteOrder} of the length field
         * @param lengthFieldLength the length of the prepended length field.
         *                          Only 1, 2, 3, 4, and 8 are allowed.
         * @param lengthAdjustment  the compensation value to add to the value
         *                          of the length field
         * @param lengthIncludesLengthFieldLength
         *                          if {@code true}, the length of the prepended
         *                          length field is added to the value of the
         *                          prepended length field.
         *
         * @throws IllegalArgumentException
         *         if {@code lengthFieldLength} is not 1, 2, 3, 4, or 8
         */
        public LengthHeaderFieldPrepender(
                ByteOrder byteOrder, int lengthFieldLength,
                int lengthAdjustment, boolean lengthIncludesLengthFieldLength, int header, int headerLength) {
            if (lengthFieldLength != 1 && lengthFieldLength != 2 &&
                lengthFieldLength != 3 && lengthFieldLength != 4 &&
                lengthFieldLength != 8) {
                throw new IllegalArgumentException(
                        "lengthFieldLength must be either 1, 2, 3, 4, or 8: " +
                        lengthFieldLength);
            }
            ObjectUtil.checkNotNull(byteOrder, "byteOrder");
    
            this.byteOrder = byteOrder;
            this.lengthFieldLength = lengthFieldLength;
            this.lengthIncludesLengthFieldLength = lengthIncludesLengthFieldLength;
            this.lengthAdjustment = lengthAdjustment;
            this.header = header;
            this.headerLength = headerLength;
        }
    
        @Override
        protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
            int length = msg.readableBytes() + lengthAdjustment + headerLength;
            if (lengthIncludesLengthFieldLength) {
                length += lengthFieldLength;
            }
            // 写字头(新加)
            out.add(ctx.alloc().buffer(1).order(byteOrder).writeByte((byte) header));
            
            checkPositiveOrZero(length, "length");
            
            switch (lengthFieldLength) {
            case 1:
                if (length >= 256) {
                    throw new IllegalArgumentException(
                            "length does not fit into a byte: " + length);
                }
                out.add(ctx.alloc().buffer(1).order(byteOrder).writeByte((byte) length));
                break;
            case 2:
                if (length >= 65536) {
                    throw new IllegalArgumentException(
                            "length does not fit into a short integer: " + length);
                }
                out.add(ctx.alloc().buffer(2).order(byteOrder).writeShort((short) length));
                break;
            case 3:
                if (length >= 16777216) {
                    throw new IllegalArgumentException(
                            "length does not fit into a medium integer: " + length);
                }
                out.add(ctx.alloc().buffer(3).order(byteOrder).writeMedium(length));
                break;
            case 4:
                out.add(ctx.alloc().buffer(4).order(byteOrder).writeInt(length));
                break;
            case 8:
                out.add(ctx.alloc().buffer(8).order(byteOrder).writeLong(length));
                break;
            default:
                throw new Error("should not reach here");
            }        
            out.add(msg.retain());
        }
    }
    

    在该编码类中,增加了两个成员变量,一个是字头 header,另一个是字头长度 headerLength,分别用在了 137 行和 142 行,计算消息体长度的时候用到了字头长度 headerLength,写入字头的时候用到了字头 header。其实在写入字头的时候也应该像 146 行一样,根据字头长度,写入不同数据类型的字头,这里先免了吧,严谨一点还是要加的。

    客户端代码:

    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.ByteBufUtil;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.codec.bytes.ByteArrayDecoder;
    import io.netty.handler.codec.bytes.ByteArrayEncoder;
    
    /**
     * netty客户端
     * @author 程就人生
     * @date
     * @Description 
     *
     */
    public class TestClient {
      
        public void connect(int port, String host){
            // 客户端Nio线程组
            EventLoopGroup group = new NioEventLoopGroup();
            try{   
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group)
                // 线程组设置为非阻塞
                .channel(NioSocketChannel.class)
              .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<SocketChannel>(){
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {    
                      
                      /**
                       * 包长最大长度不能超过65535
                       * 长度偏移量 1 个字节
                       * 长度占用2个字节
                       * 长度调整-3
                       * 剥离初始字节0
                       */
                      ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,1,2,-3,0));                  
                        // Bytebuf 转 byte数组
                        ch.pipeline().addLast(new ByteArrayDecoder());
                        
                        // 对要发出去的数据进行编码
                        /**
                         * 根据 LengthFieldPrepender解码类改写
                         * 长度占用2个字节
                         * 长度调整0
                         * 长度字段中包含长度本身占用的字节数,
                         * 字头为0XFE
                         * 字头占用1个字节
                         */
                        ch.pipeline().addLast(new LengthHeaderFieldPrepender(2, 0, true, 0XFE, 1));
                        // byte数组  转 Bytebuf
                      ch.pipeline().addLast(new ByteArrayEncoder());    
                      
                        // 事件处理绑定,具体的业务逻辑处理
                        ch.pipeline().addLast(new ClientHandler());
                    }
                });            
                // 建立连接
                ChannelFuture channelFuture = bootstrap.connect(host, port);
                // 等待服务端监听端口关闭
                channelFuture.channel().closeFuture().sync();
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                // 优雅退出
                group.shutdownGracefully();
            }
        }
        
        public static void main(String[] argo){
            new TestClient().connect(8080, "localhost");
        }
    }
    
    /**
     * 客户端处理handler
     * @author 程就人生
     * @date 
     * @Description 
     *
     */
    class ClientHandler extends ChannelInboundHandlerAdapter{
        // 对接收的消息次数进行计数
        private static int counter;
        
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            // 连接成功后,发送消息,连续发送100次,模拟数据交互的频繁
            ByteBuf firstMessage = null;
            for(int i = 0;i<100;i++){
                firstMessage = Unpooled.buffer(7);
                firstMessage.writeByte(0X08);
                firstMessage.writeByte(0X87);
                firstMessage.writeByte(0X9A);
                firstMessage.writeByte(0X01);
                firstMessage.writeByte(0X5D);
                firstMessage.writeByte(0X01);
                firstMessage.writeByte(0X90);     
                ctx.writeAndFlush(firstMessage.array());
            }        
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx,Object msg){
            try{
              byte[] data = (byte[]) msg;
                int dataLength = data.length;
                ByteBuf buf = Unpooled.buffer(dataLength);
                buf.writeBytes(data);
                System.out.println("这里是客户端控制台:" + ByteBufUtil.hexDump(buf).toUpperCase() + ";计数: " + ++counter);
                // 释放资源
                ReferenceCountUtil.release(buf);
            }catch(Exception e){
                e.printStackTrace();
            }        
        }
        
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            //释放资源
            ctx.close();
        }
    }
    

    客户端在使用匿名内部类设置编解码规则时,需要和服务端保持一致。

    在 97 行的 channelActive 方法中,客户端向服务器发送 100 次数据,每条数据不包含字头数据长度占用7个字节,加上字头数据长度,要占用10个字节。服务器端输出的也应该是10个字节的数据。

    在客户端业务逻辑处理器 ClientHandler 中的 channelRead 方法中,使用工具类 ByteBufUtil.hexDump(buf).toUpperCase() 打印服务器端发过来的数据到控制台,并使用 ReferenceCountUtil.release(buf) 方法对资源进行释放。

    服务器端运行结果:

    客户端运行结果:

    以上便是 LengthFieldBasedFrameDecoder + ByteArrayDecoder 解码类和 LengthHeaderFieldPrepender + ByteArrayEncoder 编码类组合使用的示例。

    相关文章

      网友评论

          本文标题:【网络编程】Netty编解码之 字头 + 数据长度 + 消息体的

          本文链接:https://www.haomeiwen.com/subject/qeioldtx.html