美文网首页netty
使用LengthFieldBasedFrameDecoder解码

使用LengthFieldBasedFrameDecoder解码

作者: 老鼠AI大米_Java全栈 | 来源:发表于2019-05-13 14:02 被阅读0次

    最近需要做一个长连接的设备管理,使用netty可以方便的做到,还可以配置心跳及解码器

    自定义长度解码器

    LengthFieldBasedFrameDecoder解码器自定义长度解决TCP粘包黏包问题。所以又称为: 自定义长度解码器

    TCP粘包和黏包现象

    1. TCP粘包是指发送方发送的若干个数据包到接收方时粘成一个包。从接收缓冲区来看,后一个包数据的头紧接着前一个数据的尾。

    2. 当TCP连接建立后,Client发送多个报文给Server,TCP协议保证数据可靠性,但无法保证Client发了n个包,服务端也按照n个包接收。Client端发送n个数据包,Server端可能收到n-1或n+1个包。

    为什么出现粘包现象?

    1. 发送方原因: TCP默认会使用Nagle算法。而Nagle算法主要做两件事:1)只有上一个分组得到确认,才会发送下一个分组;2)收集多个小分组,在一个确认到来时一起发送。所以,正是Nagle算法造成了发送方有可能造成粘包现象。

    2. 接收方原因: TCP接收方采用缓存方式读取数据包,一次性读取多个缓存中的数据包。自然出现前一个数据包的尾和后一个收据包的头粘到一起。

    如何解决粘包现象

    就是要选择相应的解码器

    • 添加特殊符号,接收方通过这个特殊符号将接收到的数据包拆分开 - DelimiterBasedFrameDecoder特殊分隔符解码器
    • 每次发送固定长度的数据包 - FixedLengthFrameDecoder定长编码器
    • 在消息头中定义长度字段,来标识消息的总长度 - LengthFieldBasedFrameDecoder自定义长度解码器

    LengthFieldBasedFrameDecoder参数

    自定义长度解码器,所以构造函数中6个参数,基本都围绕那个定义长度域,进行的描述。

    1. maxFrameLength - 发送的数据帧最大长度
    2. lengthFieldOffset - 定义长度域位于发送的字节数组中的下标。换句话说:发送的字节数组中下标为${lengthFieldOffset}的地方是长度域的开始地方
    3. lengthFieldLength - 用于描述定义的长度域的长度。换句话说:发送字节数组bytes时, 字节数组bytes[lengthFieldOffset, lengthFieldOffset+lengthFieldLength]域对应于的定义长度域部分
    4. lengthAdjustment - 满足公式: 发送的字节数组bytes.length - lengthFieldLength = bytes[lengthFieldOffset, lengthFieldOffset+lengthFieldLength] + lengthFieldOffset + lengthAdjustment
    5. initialBytesToStrip - 接收到的发送数据包,去除前initialBytesToStrip位
    6. failFast - true: 读取到长度域超过maxFrameLength,就抛出一个 TooLongFrameException。false: 只有真正读取完长度域的值表示的字节之后,才会抛出 TooLongFrameException,默认情况下设置为true,建议不要修改,否则可能会造成内存溢出
    7. ByteOrder - 数据存储采用大端模式或小端模式

    举例解释参数如何写

    客户端多次发送"HELLO, WORLD"字符串给服务端。"HELLO, WORLD"共12字节(12B)。长度域中的内容是16进制的值,如下:

    1. 0x000c -----> 12

    2. 0x000e -----> 14

    场景1

    数据包大小: 14B = 长度域2B + "HELLO, WORLD"


    1.png

    解释:

    如上图,长度域的值为12B(0x000c)。希望解码后保持一样,根据上面的公式,参数应该为:

    1. lengthFieldOffset = 0

    2. lengthFieldLength = 2

    3. lengthAdjustment = 0 = 数据包长度(14) - lengthFieldOffset - lengthFieldLength - 长度域的值(12)

    4. initialBytesToStrip = 0 - 解码过程中,没有丢弃任何数据

    场景2

    数据包大小: 14B = 长度域2B + "HELLO, WORLD"


    2.png

    解释:

    上图中,解码后,希望丢弃长度域2B字段,所以,只要initialBytesToStrip = 2即可。其他与场景1相同

    1. lengthFieldOffset = 0

    2. lengthFieldLength = 2

    3. lengthAdjustment = 0 = 数据包长度(14) - lengthFieldOffset - lengthFieldLength - 长度域的值(12)

    4. initialBytesToStrip = 2 解码过程中,丢弃2个字节的数据

    场景3

    数据包大小: 14B = 长度域2B + "HELLO, WORLD"。与场景1不同的是:场景3中长度域的值为14(0x000E)


    3.png

    解释:

    如上图,长度域的值为14(0x000E)。希望解码后保持一样,根据上面的公式,参数应该为:

    1. lengthFieldOffset = 0

    2. lengthFieldLength = 2

    3. lengthAdjustment = -2 = 数据包长度(14) - lengthFieldOffset - lengthFieldLength - 长度域的值(14)

    4. initialBytesToStrip = 0 - 解码过程中,没有丢弃任何数据

    场景4

    场景4在长度域前添加2个字节的Header。长度域的值(0x00000C) = 12。总数据包长度: 17=Header(2B) + 长度域(3B) + "HELLO, WORLD"


    4.png

    解释

    如上图。编码解码后,长度保持一致,所以initialBytesToStrip = 0。参数应该为:

    1. lengthFieldOffset = 2

    2. lengthFieldLength = 3

    3. lengthAdjustment = 0 = 数据包长度(17) - lengthFieldOffset(2) - lengthFieldLength(3) - 长度域的值(12)

    4. initialBytesToStrip = 0 - 解码过程中,没有丢弃任何数据

    场景5

    与场景4不同的地方是: Header与长度域的位置换了。总数据包长度: 17=长度域(3B) + Header(2B) + "HELLO, WORLD"


    5.png

    解释

    如上图。编码解码后,长度保持一致,所以initialBytesToStrip = 0。参数应该为:

    1. lengthFieldOffset = 0

    2. lengthFieldLength = 3

    3. lengthAdjustment = 2 = 数据包长度(17) - lengthFieldOffset(0) - lengthFieldLength(3) - 长度域的值(12)

    4. initialBytesToStrip = 0 - 解码过程中,没有丢弃任何数据

    场景6

    如下图,"HELLO, WORLD"域前有多个字段。总数据长度: 16 = HEADER1(1) + 长度域(2) + HEADER2(1) + "HELLO, WORLD"


    6.png
    1. lengthFieldOffset = 1

    2. lengthFieldLength = 2

    3. lengthAdjustment = 1 = 数据包长度(16) - lengthFieldOffset(1) - lengthFieldLength(2) - 长度域的值(12)

    4. initialBytesToStrip = 0 - 解码过程中,没有丢弃任何数据

    自定义协议

    很多时候并不能按照以上参数的方式去解析数据,所以需要自定义协议,LengthFieldBasedFrameDecoder解码器自定义协议.通常,协议的格式如下:

    协议格式.png
    通常来说,使用ByteToMessageDocoder这个编码器,我们要分别解析出Header,length,body这几个字段.而使用LengthFieldBasedFrameDecoder,我们就可以直接接收想要的一部分,相当于在原来的基础上包上了一层,有了这层之后,我们可以控制我们每次只要读想读的字段,这对于自定义协议来说十分方便.
    1. MyProtocolDecoder的定义
    public class MyProtocolDecoder extends LengthFieldBasedFrameDecoder {
        private static final int HEADER_SIZE = 6;
        /**
         *
         * @param maxFrameLength  帧的最大长度
         * @param lengthFieldOffset length字段偏移的地址
         * @param lengthFieldLength length字段所占的字节长
         * @param lengthAdjustment 修改帧数据长度字段中定义的值,可以为负数 因为有时候我们习惯把头部记入长度,若为负数,则说明要推后多少个字段
         * @param initialBytesToStrip 解析时候跳过多少个长度
         * @param failFast 为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异
         */
        public MyProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
            super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
        }
    
        @Override
        protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            //在这里调用父类的方法,实现指得到想要的部分,我在这里全部都要,也可以只要body部分
            in = (ByteBuf) super.decode(ctx,in);  
    
            if(in == null){
                return null;
            }
            if(in.readableBytes()<HEADER_SIZE){
                throw new Exception("字节数不足");
            }
            //读取type字段
            byte type = in.readByte();
            //读取flag字段
            byte flag = in.readByte();
            //读取length字段
            int length = in.readInt();
            
            if(in.readableBytes()!=length){
                throw new Exception("标记的长度不符合实际长度");
            }
            //读取body
            byte []bytes = new byte[in.readableBytes()];
            in.readBytes(bytes);
            return new MyProtocolBean(type,flag,length,new String(bytes,"UTF-8"));
        }
    }
    

    在上述的代码中,调用父类的方法,实现截取到自己想要的字段,如可以判断数据必须以xx开头。

    1. 协议实体的定义
    public class MyProtocolBean {
        //类型  系统编号 0xA 表示A系统,0xB 表示B系统
        private byte type;
        //信息标志  0xA 表示心跳包    0xC 表示超时包  0xC 业务信息包
        private byte flag;
        //内容长度
        private int length;
        //内容
        private String content;
    
        public MyProtocolBean(byte flag, byte type, int length, String content) {
            this.flag = flag;
            this.type = type;
            this.length = length;
            this.content = content;
        }
    }
    

    3.服务端的实现

    public class Server {
    
        private static final int MAX_FRAME_LENGTH = 1024 * 1024;  //最大长度
        private static final int LENGTH_FIELD_LENGTH = 4;  //长度字段所占的字节数
        private static final int LENGTH_FIELD_OFFSET = 2;  //长度偏移
        private static final int LENGTH_ADJUSTMENT = 0;
        private static final int INITIAL_BYTES_TO_STRIP = 0;
    
        private int port;
    
        public Server(int port) {
            this.port = port;
        }
    
        public void start(){
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
    
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new MyProtocolDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,false));
                                ch.pipeline().addLast(new ServerHandler());
                            };
    
                        }).option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true);
                // 绑定端口,开始接收进来的连接
                ChannelFuture future = sbs.bind(port).sync();
    
                System.out.println("Server start listen at " + port );
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port;
            if (args.length > 0) {
                port = Integer.parseInt(args[0]);
            } else {
                port = 8080;
            }
            new Server(port).start();
        }
    }
    
    1. 服务端Hanlder
    public class ServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            MyProtocolBean myProtocolBean = (MyProtocolBean)msg;  //直接转化成协议消息实体
            System.out.println(myProtocolBean.getContent());
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
        }
    }
    

    服务端Handler没什么特别的地方,只是输出接收到的消息

    1. 客户端
    public class Client {
        static final String HOST = System.getProperty("host", "127.0.0.1");
        static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
        static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
    
        public static void main(String[] args) throws Exception {
            // Configure the client.
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new MyProtocolEncoder());
                                ch.pipeline().addLast(new ClientHandler());
                            }
                        });
    
                ChannelFuture future = b.connect(HOST, PORT).sync();
                future.channel().writeAndFlush("Hello Netty Server ,I am a common client");
                future.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully();
            }
        }
    
    }
    
    1. 客户端Handler
    public class ClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            super.channelRead(ctx, msg);
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            MyProtocolBean myProtocolBean = new MyProtocolBean((byte)0xA, (byte)0xC, "Hello,Netty".length(), "Hello,Netty");
            ctx.writeAndFlush(myProtocolBean);
        }
    }
    

    客户端Handler实现发送消息.

    1. 客户端编码器
    public class MyProtocolEncoder extends MessageToByteEncoder<MyProtocolBean> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, MyProtocolBean msg, ByteBuf out) throws Exception {
            if(msg == null){
                throw new Exception("msg is null");
            }
            out.writeByte(msg.getType());
            out.writeByte(msg.getFlag());
            out.writeInt(msg.getLength());
            out.writeBytes(msg.getContent().getBytes(Charset.forName("UTF-8")));
        }
    }
    

    编码的时候,只需要按照定义的顺序依次写入到ByteBuf中.

    小结
    若是上面的参数直接可以满足要求,可以直接使用参数,若不可以则通过自定义的方式去实现,更加灵活。

    相关文章

      网友评论

        本文标题:使用LengthFieldBasedFrameDecoder解码

        本文链接:https://www.haomeiwen.com/subject/goplaqtx.html