美文网首页netty
Netty(二) LengthFieldBasedFrameDe

Netty(二) LengthFieldBasedFrameDe

作者: 小左伯爵 | 来源:发表于2020-11-08 18:18 被阅读0次

    1.作用

    • LengthFieldBasedFrameDecoder 解码器根据消息中的length字段的值动态拆分接收到的ByteBuf,在解码二进制报文(包含报文头:报文头存储报文的长度)时,此解码器特别有用

    2.参数解析

    2.1搭建简单的netty通讯

    2.1.1client

    package cn.itbin.decode.client;
    
    import cn.itbin.client.TelnetClientInitializer;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    
    /**
     * @author chenxiaogao
     * @className Client
     * @description TODO
     * @date 2020/11/7
     **/
    public class Client {
        static final String HOST = System.getProperty("host", "127.0.0.1");
        static final int PORT = Integer.parseInt(System.getProperty("port", "8023"));
        public static void main(String[] args) throws Exception {
    
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
                        .handler(new LoggingHandler(LogLevel.INFO))
                       ;
    
                // 开始尝试连接
                Channel ch = b.connect(HOST, PORT).sync().channel();
                String request = "HELLO, WORLD";
                ByteBuf buffer = Unpooled.buffer();
                buffer.writeShort(request.length());
                buffer.writeBytes(request.getBytes());
                ch.writeAndFlush(buffer);
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    
    

    2.1.2server

    • server
    package cn.itbin.decode.server;
    
    import cn.itbin.server.TelnetServerInitializer;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    import java.util.Arrays;
    
    /**
     * @author chenxiaogao
     * @className Server
     * @description TODO
     * @date 2020/11/7
     **/
    public class Server {
        static final int PORT = Integer.parseInt(System.getProperty("port", "8023"));
    
        public static void main(String[] args) throws Exception {
    
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ServerInitializer());
    
                b.bind(PORT).sync().channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    
    • ServerInitializer
    package cn.itbin.decode.server;
    
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.FixedLengthFrameDecoder;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    
    /**
     * @author chenxiaogao
     * @className ServerInitializer
     * @description TODO
     * @date 2020/11/7
     **/
    public class ServerInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2));
            pipeline.addLast(new ServerHandler());
        }
    }
    
    
    • ServerHandler
    package cn.itbin.decode.server;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    import java.nio.charset.Charset;
    
    /**
     * @author chenxiaogao
     * @className ServerHandler
     * @description TODO
     * @date 2020/11/7
     **/
    public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            int length = msg.readableBytes();
            System.out.println("请求总长度 = " + length);
            //首先读取两个字节的长度位
            short len = msg.readShort();
            System.out.println("请求报文体的长度 = " + len);
            System.out.println("读取两个字节后的ByteBuf = " + msg);
            //读取报文内容
            byte[] bytes = new byte[len];
            msg.readBytes(bytes, 0, len);
            String request = new String(bytes, Charset.forName("GBK"));
            System.out.println("请求报文体 = " + request);
    
        }
    }
    
    

    2.2参数详解

    2.2.1 new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2)

    lengthFieldOffset = 0
    lengthFieldLength = 2
    lengthAdjustment = 0
    initialBytesToStrip = 0 (= do not strip header)

     BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
     +--------+----------------+      +--------+----------------+
     | Length | Actual Content |----->| Length | Actual Content |
     | 0x000C | "HELLO, WORLD" |      | 0x000C | "HELLO, WORLD" |
     +--------+----------------+      +--------+----------------+
    假设请求报文的前两个字节为报文体的长度
    并希望服务端能够收到完整的报文
    则加入下面这个解码器
    
    pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2));
    当加入这个解码器时会调用这个构造方法
    public LengthFieldBasedFrameDecoder(
                int maxFrameLength,
                int lengthFieldOffset, int lengthFieldLength) {
            this(maxFrameLength, lengthFieldOffset, lengthFieldLength, 0, 0);
        }
    则实际的参数为
     maxFrameLength:Integer.MAX_VALUE(整体报文的最大长度,包括报文头和报文体)
     lengthFieldOffset:0(长度字段的起始索引)
     lengthFieldLength :2(长度字段的长度)
     lengthAdjustment : 0    
     initialBytesToStrip :0
    
    1.组装请求体:
    String request = "HELLO, WORLD";
    //创建一个字节缓冲区
    ByteBuf buffer = Unpooled.buffer();
    //写入两个字节的报文体长度,长度为12
    buffer.writeShort(request.length());
    //写入报文体
    buffer.writeBytes(request.getBytes());
    //发送
    ch.writeAndFlush(buffer);
    实际请求体为:
    信息: [id: 0x87221f3a, L:/127.0.0.1:52524 - R:/127.0.0.1:8023] WRITE: 14B
             +-------------------------------------------------+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +--------+-------------------------------------------------+----------------+
    |00000000| 00 0c 48 45 4c 4c 4f 2c 20 57 4f 52 4c 44       |..HELLO, WORLD  |
    +--------+-------------------------------------------------+----------------+
    
    2.在服务端解析
    lengthFieldOffset:0(长度字段的起始索引)
     lengthFieldLength :2(长度字段的长度)
    从0索引开始读两个字节为报文体的长度,则长度为 00 0c 即长度为12
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            int length = msg.readableBytes();
            System.out.println("请求总长度 = " + length);
            //首先读取两个字节的长度位
            short len = msg.readShort();
            System.out.println("请求报文体的长度 = " + len);
            System.out.println("读取两个字节后的ByteBuf = " + msg);
            //读取报文内容
            byte[] bytes = new byte[len];
            msg.readBytes(bytes, 0, len);
            String request = new String(bytes, Charset.forName("GBK"));
            System.out.println("请求报文体 = " + request);
    
        }
    请求总长度 = 14
    请求报文体的长度 = 12
    读取两个字节后的ByteBuf = PooledSlicedByteBuf(ridx: 2, widx: 14, cap: 14/14, unwrapped: PooledUnsafeDirectByteBuf(ridx: 14, widx: 14, cap: 1024))
    请求报文体 = HELLO, WORLD
    

    2.2.2new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2,0,2)

    lengthFieldOffset = 0
    lengthFieldLength = 2
    lengthAdjustment = 0
    initialBytesToStrip = 2 (= the length of the Length field)

     BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)
     +--------+----------------+      +----------------+
     | Length | Actual Content |----->| Actual Content |
     | 0x000C | "HELLO, WORLD" |      | "HELLO, WORLD" |
     +--------+----------------+      +----------------+
    
    假设请求报文的前两个字节为报文体的长度
    并希望服务端只接收报文体,丢弃掉长度字节
    则加入下面这个解码器
    new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2,0,2)
    请求报文与2.2.1相同
       @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            int length = msg.readableBytes();
            System.out.println("请求总长度 = " + length);
            String s = msg.toString(0, length, Charset.defaultCharset());
            System.out.println("请求报文体 = " + s);
        }
    请求总长度 = 12
    请求报文体 = HELLO, WORLD
    

    2.2.3new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2,-2,0);

    lengthFieldOffset = 0
    lengthFieldLength = 2
    lengthAdjustment = -2 (= the length of the Length field)
    initialBytesToStrip = 0

     BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
     +--------+----------------+      +--------+----------------+
     | Length | Actual Content |----->| Length | Actual Content |
     | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |
     +--------+----------------+      +--------+----------------+
    
    假设请求报文的前两个字节为总体的长度 
    并希望服务端接收全部报文
    则加入下面这个解码器
    new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2,-2,0)
    
    组装请求体
    String request = "HELLO, WORLD";
    ByteBuf buffer = Unpooled.buffer();
    buffer.writeShort(request.length()+2);
    buffer.writeBytes(request.getBytes());
    信息: [id: 0xa51d3b17, L:/127.0.0.1:56298 - R:/127.0.0.1:8023] WRITE: 14B
             +-------------------------------------------------+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +--------+-------------------------------------------------+----------------+
    |00000000| 00 0e 48 45 4c 4c 4f 2c 20 57 4f 52 4c 44       |..HELLO, WORLD  |
    +--------+-------------------------------------------------+----------------+
    
    @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            int length = msg.readableBytes();
            System.out.println("请求总长度 = " + length);
            //读取两个字节的长度位
            short len = msg.readShort();
            System.out.println("报文总长度为 = " + len);
            //报文的请求体为
            String content = msg.toString(2, len-2, Charset.defaultCharset());
            System.out.println("报文请求体为 = " + content);
        }
    请求总长度 = 14
    报文总长度为 = 14
    报文请求体为 = HELLO, WORLD
    

    其他几个参数如下

     lengthFieldOffset   = 2 (= the length of Header 1)
     lengthFieldLength   = 3
     lengthAdjustment    = 0
     initialBytesToStrip = 0
    
     BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
     +----------+----------+----------------+      +----------+----------+----------------+
     | Header 1 |  Length  | Actual Content |----->| Header 1 |  Length  | Actual Content |
     |  0xCAFE  | 0x00000C | "HELLO, WORLD" |      |  0xCAFE  | 0x00000C | "HELLO, WORLD" |
     +----------+----------+----------------+      +----------+----------+----------------+
    
    lengthFieldOffset   = 0
     lengthFieldLength   = 3
     lengthAdjustment    = 2 (= the length of Header 1)
     initialBytesToStrip = 0
    
     BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
     +----------+----------+----------------+      +----------+----------+----------------+
     |  Length  | Header 1 | Actual Content |----->|  Length  | Header 1 | Actual Content |
     | 0x00000C |  0xCAFE  | "HELLO, WORLD" |      | 0x00000C |  0xCAFE  | "HELLO, WORLD" |
     +----------+----------+----------------+      +----------+----------+----------------+
    
    lengthFieldOffset   = 1 (= the length of HDR1)
     lengthFieldLength   = 2
     lengthAdjustment    = 1 (= the length of HDR2)
     initialBytesToStrip = 3 (= the length of HDR1 + LEN)
    
     BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
     +------+--------+------+----------------+      +------+----------------+
     | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
     | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
     +------+--------+------+----------------+      +------+----------------+
     
    
    lengthFieldOffset   =  1
     lengthFieldLength   =  2
     lengthAdjustment    = -3 (= the length of HDR1 + LEN, negative)
     initialBytesToStrip =  3
    
     BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
     +------+--------+------+----------------+      +------+----------------+
     | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
     | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
     +------+--------+------+----------------+      +------+----------------+
     
    

    3解析长度位为字符串的报文

    • 还有的报文长度是用字符串表示的例如:0002AD 前四个字节表示长度位2,AD为报文内容
      则需要自定义解码器,重写getUnadjustedFrameLength
    • 将上述代码做如下修改

    3.1组装请求报文

    String request = "HELLO, WORLD";
                ByteBuf buffer = Unpooled.buffer();
                //将长度转化为4位字符串
                String strLen = "00"+request.length();
                buffer.writeBytes(strLen.getBytes());
                buffer.writeBytes(request.getBytes());
                ch.writeAndFlush(buffer);
    
    信息: [id: 0x82110268, L:/127.0.0.1:57369 - R:/127.0.0.1:8023] WRITE: 16B
             +-------------------------------------------------+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +--------+-------------------------------------------------+----------------+
    |00000000| 30 30 31 32 48 45 4c 4c 4f 2c 20 57 4f 52 4c 44 |0012HELLO, WORLD|
    +--------+-------------------------------------------------+----------------+
    

    3.2自定义解码器

    package cn.itbin.decode.server;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.handler.codec.DecoderException;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    
    import java.nio.ByteOrder;
    
    /**
     * @author chenxiaogao
     * @className MyLFBFD
     * @description TODO
     * @date 2020/11/8
     **/
    public class MyLFBFD extends LengthFieldBasedFrameDecoder {
        public MyLFBFD(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
            super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
        }
    
        @Override
        protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
            buf = buf.order(order);
            long frameLength;
            switch (length) {
                case 1:
                    frameLength = buf.getUnsignedByte(offset);
                    break;
                case 2:
                    frameLength = buf.getUnsignedShort(offset);
                    break;
                case 3:
                    frameLength = buf.getUnsignedMedium(offset);
                    break;
                case 4:
                    //获取四个字节长度位
                    byte[] bytes = new byte[4];
    //注意此处使用getBytes,不会改变原有的ByteBuf
                    buf.getBytes(0, bytes);
                    String strLen = new String(bytes);
                    frameLength = Long.valueOf(strLen);
                    break;
                case 8:
                    frameLength = buf.getLong(offset);
                    break;
                default:
                    throw new DecoderException(
                            "unsupported lengthFieldLength: " + length + " (expected: 1, 2, 3, 4, or 8)");
            }
            return frameLength;
        }
    }
    
    

    3.3将自定义解码器加入到pipleline中

    public class ServerInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new MyLFBFD(Integer.MAX_VALUE, 0, 4));
            pipeline.addLast(new ServerHandler());
        }
    }
    

    3.4在服务端解析

    public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            int length = msg.readableBytes();
            System.out.println("请求总长度 = " + length);
            String request = msg.toString(4, length - 4, Charset.forName("GBK"));
            System.out.println("request = " + request);
        }
    }
    
    

    相关文章

      网友评论

        本文标题:Netty(二) LengthFieldBasedFrameDe

        本文链接:https://www.haomeiwen.com/subject/ofthbktx.html