美文网首页IO及网络通信java学习快到碗里来Java
netty(十四)Netty提升 - 粘包与半包

netty(十四)Netty提升 - 粘包与半包

作者: 我犟不过你 | 来源:发表于2021-11-15 17:54 被阅读0次

    一、现象分析

    1.1 粘包

    通过代码的方式演示下粘包的现象:

    服务端:

    public class HalfPackageServer {
    
        public static void main(String[] args) {
    
            NioEventLoopGroup boss = new NioEventLoopGroup(1);
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.channel(NioServerSocketChannel.class);
                serverBootstrap.group(boss, worker);
                //设置服务器接收端缓冲区大小为10
                serverBootstrap.option(ChannelOption.SO_RCVBUF,10);
                serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                //此处打印输出,看看收到的内容是10次16字节,还是一次160字节
                                printBuf((ByteBuf) msg);
                                super.channelRead(ctx, msg);
                            }
                        });
                    }
                });
                ChannelFuture channelFuture = serverBootstrap.bind(8080);
                //阻塞等待连接
                channelFuture.sync();
                //阻塞等待释放连接
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                System.out.println("server error:" + e);
            } finally {
                // 释放EventLoopGroup
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    
        static void printBuf(ByteBuf byteBuf){
            StringBuilder stringBuilder = new StringBuilder();
            for (int i = 0; i< byteBuf.writerIndex();i++) {
                stringBuilder.append(byteBuf.getByte(i));
                stringBuilder.append(" ");
            }
    
            stringBuilder.append("| 长度:");
            stringBuilder.append(byteBuf.writerIndex());
            stringBuilder.append("字节");
            System.out.println(stringBuilder);
        }
    }
    

    客户端:

    public class HalfPackageClient {
        public static void main(String[] args) {
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.group(worker);
                bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            //建立连接成功后,会触发active事件
                            @Override
                            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                //循环发送,每次16字节,共10次
                                for (int i = 0; i < 10; i++) {
                                    ByteBuf buffer = ctx.alloc().buffer();
                                    buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
                                    ctx.writeAndFlush(buffer);
                                }
                            }
                        });
                    }
                });
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
                //释放连接
                channelFuture.channel().closeFuture().sync();
    
            } catch (InterruptedException e) {
                System.out.println("client error :" + e);
            } finally {
                //释放EventLoopGroup
                worker.shutdownGracefully();
            }
        }
    }
    

    结果:

    0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | 长度:160字节
    

    如上所示,发送了10次的16个字节,接收到了一个160字节,而不是10个16字节。这就是粘包现象。

    1.2 半包

    半包现象我们仍然使用前面的代码,但是服务端需要多设置一个属性,即修改服务端接收缓冲区的buffer大小,我们这里修改为10个字节。

    serverBootstrap.option(ChannelOption.SO_RCVBUF,10);

    这行代码起初运行完我有点不理解,明明设置是10,但是接收到的内容确是40,可见这个设置的值,不是10个字节,通过源码跟踪,我发现这个数值ChannelOption的泛型是个Ingteger,估计是进行了类型的计算,一个Integer是4个字节,所以有了设置是10,最终却接收40个字节。

    如果同学们觉得这个问题说的不对的话,可以帮我指正一下。感谢!!

    public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");

    服务端代码:

    public class HalfPackageServer {
    
        public static void main(String[] args) {
    
            NioEventLoopGroup boss = new NioEventLoopGroup(1);
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.channel(NioServerSocketChannel.class);
                serverBootstrap.group(boss, worker);
                //设置服务器接收端缓冲区大小为10
                serverBootstrap.option(ChannelOption.SO_RCVBUF,10);
                serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                //此处打印输出,看看收到的内容是10次16字节,还是一次160字节
                                printBuf((ByteBuf) msg);
                                super.channelRead(ctx, msg);
                            }
                        });
                    }
                });
                ChannelFuture channelFuture = serverBootstrap.bind(8080);
                //阻塞等待连接
                channelFuture.sync();
                //阻塞等待释放连接
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                System.out.println("server error:" + e);
            } finally {
                // 释放EventLoopGroup
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    
        static void printBuf(ByteBuf byteBuf){
            StringBuilder stringBuilder = new StringBuilder();
            for (int i = 0; i< byteBuf.writerIndex();i++) {
                stringBuilder.append(byteBuf.getByte(i));
                stringBuilder.append(" ");
            }
    
            stringBuilder.append("| 长度:");
            stringBuilder.append(byteBuf.writerIndex());
            stringBuilder.append("字节");
            System.out.println(stringBuilder);
        }
    }
    

    客户端与前面的粘包相同。

    结果:

    0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 | 长度:36字节
    4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 | 长度:40字节
    12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 | 长度:40字节
    4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 | 长度:40字节
    12 13 14 15 | 长度:4字节
    

    如上所示,总共160字节,总共发送了五次,每一次都含有不完整16字节的数据。这就是半包,其实里面也包含的粘包。

    至于为什么第一个只有36,这里没有具体分析,但是我们可以猜想下,每次连接的首次应该是有表示占用了4个字节。

    二、粘包、半包分析

    产生粘包和半包的本质:TCP是流式协议,消息是无边界的

    2.1 滑动窗口

    TCP是一种可靠地传出协议,每发送一个段就需要进行一次确认应答(ack)处理。如何没收到ack,则会再次发送。

    但是如果对于一个客户端来说,每次发送一个请求,都要等到另一个客户端应该的话,才能发送下一个请求,那么整个构成就成了串行化的过程,大大降低了连接间的传输效率。

    TCP如何解决效率地下的问题?

    引入滑动窗口。窗口大小即决定了无需等待应答而可以继续发送的数据最大值。

    简易滑动过程如下所示:

    滑动窗口 (1).png

    窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用。

    • 只有窗口内的数据才允许被发送(绿色),当应答(蓝色)未到达前,窗口必须停止滑动
    • 如果 0-100 这个段的数据 ack 回来了,窗口就可以向下滑动,新的数据段会被加入进来进行发送。
    • 接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收

    2.2 粘包现象分析

    • 现象,发送了10次的16个字节,接收到了一个160字节,而不是10个16字节。

    • 产生原因:

      • 应用层:接收方 ByteBuf 设置太大(Netty 默认 1024)
      • 滑动窗口(TCP):假设发送方 256 bytes 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大,这 256 bytes 字节就会缓冲在接收方的滑动窗口中,当接收方滑动窗口中缓冲了多个报文就会粘包。
      • Nagle 算法(TCP):会造成粘包

    2.3 半包现象分析

    • 现象,前面的半包示例代码发送了10次的16个字节,接收到的数据有部分是被阶段的,不是完整的16字节。

    • 产生原因

      • 应用层:接收方 ByteBuf 小于实际发送数据量
      • 滑动窗口(TCP):假设接收方的窗口只剩了 128 bytes,发送方的报文大小是 256 bytes,这时放不下了,只能先发送前 128 bytes,等待 ack 后才能发送剩余部分,这就造成了半包
      • MSS 限制:当发送的数据超过 MSS 限制后,会将数据切分发送,就会造成半包

    2.4 引申:Nagle 算法 和 MSS限制

    2.4.1 Nagle算法

    TCP中为了提高网络的利用率,经常使用一个叫做Nagle的算法。

    该算法是指发送端即使还有应该发送的数据,但如果这部分数据很少的话,则进行延迟发送的一种处理机制。

    如果以下两个条件都不满足时,则进行一段时间延迟后,才进行发送:

    • 已发送的数据都已经收到确认应答时
    • 可以发送最大段长度(MSS)的数据时

    根据这个算法虽然网络利用率可以提高,但是可能会发生某种程度的延迟。对于时间要求准确的系统,往往需要关闭该算法。

    在Netty中如下的条件:

    • 如果 SO_SNDBUF 的数据达到 MSS(maximum segment size),则需要发送。
    • 如果 SO_SNDBUF 中含有 FIN(表示需要连接关闭)这时将剩余数据发送,再关闭。
    • 如果 TCP_NODELAY = true,则直接发送。
    • 已发送的数据都收到 ack 时,则需要发送。
    • 上述条件不满足,但发生超时(一般为 200ms)则需要发送。
    • 除上述情况,延迟发送

    2.4.2 MSS限制

    MSS 限制

    数据链路层对一次能够发送的最大数据有限制,每种数据链路的最大传输单元(MTU)都不尽相同。

    • 以太网的 MTU 是 1500
    • FDDI(光纤分布式数据接口)的 MTU 是 4352
    • 本地回环地址的 MTU 是 65535 - 本地测试不走网卡

    MSS 是最大段长度(maximum segment size),它是 MTU 刨去 tcp 头和 ip 头后剩余能够作为数据传输的字节数

    • ipv4 tcp 头占用 20 bytes,ip 头占用 20 bytes,因此以太网 MSS 的值为 1500 - 40 = 1460
    • TCP 在传递大量数据时,会按照 MSS 大小将数据进行分割发送
    • MSS 的值在三次握手时通知对方自己 MSS 的值,然后在两者之间选择一个小值作为 MSS

    三、粘包和半包的解决方案

    3.1 短连接

    每当客户端。发送一条消息后,就与服务器断开连接。

    我们需要对客户端的代码进行改造,其实就是,将内部发送10次消息的循环抽出来,一次连接只发送一个消息,需要建立10次连接,这个我就不演示了,不用想都能得到结果:

    • 客户端发送一定不会产生粘包问题。
    • 服务端只要接收缓冲区足够大,一定不会产生半包的问题,但是不能完全保证。

    此方案的缺点:
    1)建立大量的链接,效率低。
    2)不能解决半包问题。

    3.2 固定长度消息

    Netty针对固定长度消息提供了一个入站处理器FixedLengthFrameDecoder,能够制定接收消息的固定长度。

    服务端:

    public class FixedLengthServer {
    
        public static void main(String[] args) {
    
            NioEventLoopGroup boss = new NioEventLoopGroup(1);
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.channel(NioServerSocketChannel.class);
                serverBootstrap.group(boss, worker);
                serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new FixedLengthFrameDecoder(8));
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                //此处打印输出,看看收到的内容是10次16字节,还是一次160字节
                                printBuf((ByteBuf) msg);
                                super.channelRead(ctx, msg);
                            }
                        });
                    }
                });
                ChannelFuture channelFuture = serverBootstrap.bind(8080);
                //阻塞等待连接
                channelFuture.sync();
                //阻塞等待释放连接
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                System.out.println("server error:" + e);
            } finally {
                // 释放EventLoopGroup
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    
        static void printBuf(ByteBuf byteBuf) {
            StringBuilder stringBuilder = new StringBuilder();
            for (int i = 0; i < byteBuf.writerIndex(); i++) {
                stringBuilder.append(byteBuf.getByte(i));
                stringBuilder.append(" ");
            }
    
            stringBuilder.append("| 长度:");
            stringBuilder.append(byteBuf.writerIndex());
            stringBuilder.append("字节");
            System.out.println(stringBuilder);
        }
    }
    

    客户端:

    public class FixedLengthClient {
        public static void main(String[] args) {
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.group(worker);
                bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            //建立连接成功后,会触发active事件
                            @Override
                            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                // 发送内容随机的数据包
                                Random r = new Random();
                                char c = 1;
                                ByteBuf buffer = ctx.alloc().buffer();
                                for (int i = 0; i < 10; i++) {
                                    byte[] bytes = new byte[8];
                                    for (int j = 0; j < r.nextInt(8); j++) {
                                        bytes[j] = (byte) c;
                                    }
                                    c++;
                                    buffer.writeBytes(bytes);
                                }
                                ctx.writeAndFlush(buffer);
                            }
                        });
                    }
                });
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
                //释放连接
                channelFuture.channel().closeFuture().sync();
    
            } catch (InterruptedException e) {
                System.out.println("client error :" + e);
            } finally {
                //释放EventLoopGroup
                worker.shutdownGracefully();
            }
        }
    }
    

    结果:

    1 1 0 0 0 0 0 0 | 长度:8字节
    2 2 2 2 0 0 0 0 | 长度:8字节
    3 0 0 0 0 0 0 0 | 长度:8字节
    4 4 4 0 0 0 0 0 | 长度:8字节
    5 5 5 5 0 0 0 0 | 长度:8字节
    6 0 0 0 0 0 0 0 | 长度:8字节
    7 0 0 0 0 0 0 0 | 长度:8字节
    8 8 0 0 0 0 0 0 | 长度:8字节
    9 9 9 0 0 0 0 0 | 长度:8字节
    10 10 10 10 10 0 0 0 | 长度:8字节
    

    使用固定长度也有一定的缺点,消息长度不好确定:
    1)过大,造成空间浪费。
    2)过小,对于某些数据包可能不够。

    3.3 分隔符

    1)Netty提供了LineBasedFrameDecoder(换行符帧解码器)。

    默认以 \n 或 \r\n 作为分隔符,需要指定最大长度,如果超出指定长度仍未出现分隔符,则抛出异常。

    2)Netty提供了DelimiterBasedFrameDecoder(自定义分隔符帧解码器)。
    需要自己指定一个ByteBuf类型的分隔符,且需要指定最大长度。

    LineBasedFrameDecoder示例代码:

    服务端

    public class LineBasedServer {
    
        public static void main(String[] args) {
    
            NioEventLoopGroup boss = new NioEventLoopGroup(1);
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.channel(NioServerSocketChannel.class);
                serverBootstrap.group(boss, worker);
                serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                System.out.println(byteBuf.toString(StandardCharsets.UTF_8));
                                super.channelRead(ctx, msg);
                            }
                        });
                    }
                });
                ChannelFuture channelFuture = serverBootstrap.bind(8080);
                //阻塞等待连接
                channelFuture.sync();
                //阻塞等待释放连接
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                System.out.println("server error:" + e);
            } finally {
                // 释放EventLoopGroup
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    }
    

    客户端:

    public class LineBasedClient {
        public static void main(String[] args) {
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.group(worker);
                bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            //建立连接成功后,会触发active事件
                            @Override
                            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                // 发送带有分隔符的数据包
                                ByteBuf buffer = ctx.alloc().buffer();
                                String str = "hello world\nhello world\n\rhello world\nhello world";
                                buffer.writeBytes(str.getBytes(StandardCharsets.UTF_8));
                                ctx.writeAndFlush(buffer);
                            }
                        });
                    }
                });
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
                //释放连接
                channelFuture.channel().closeFuture().sync();
    
            } catch (InterruptedException e) {
                System.out.println("client error :" + e);
            } finally {
                //释放EventLoopGroup
                worker.shutdownGracefully();
            }
        }
    }
    

    结果:

    hello world
    hello world
    hello world
    

    DelimiterBasedFrameDecoder代码示例,大体与前一种相同,下面只给出不同位置的代码:

    服务端:

    protected void initChannel(SocketChannel ch) throws Exception {
                        ByteBuf buffer = ch.alloc().buffer();
                        buffer.writeBytes("||".getBytes(StandardCharsets.UTF_8));
                        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buffer));
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                System.out.println(byteBuf.toString(StandardCharsets.UTF_8));
                                super.channelRead(ctx, msg);
                            }
                        });
                    }
    

    客户端:

    String str = "hello world||hello world||hello world||hello world";
    

    使用分隔符的这种方式,也有其缺点:处理字符数据比较合适,但如果内容本身包含了分隔符,那么就会解析错误。逐个字节去比较,相率也不是很好。

    3.4 预设长度

    LengthFieldBasedFrameDecoder长度字段解码器,允许我们在发送的消息当中,指定消息长度,然后会根据这个长度取读取响应的字节数。

    public LengthFieldBasedFrameDecoder(
                int maxFrameLength,
                int lengthFieldOffset, 
                int lengthFieldLength,
                int lengthAdjustment, 
                int initialBytesToStrip) 
    

    主要看这个有5个字段的构造器,下面我们分别看每个字段的含义是什么。

    字段含义我先列在这:

    maxFrameLength:最大长度。
    lengthFieldOffset:长度字段偏移量。
    lengthFieldLength:长度字段长度。
    lengthAdjustment:以长度字段为基准,还有几个字节是内容。
    initialBytesToStrip:从头剥离几个字节。

    通过Netty提供的几个例子,解释这几个字段的意思。

    例子1:

    • lengthFieldOffset = 0
    • lengthFieldLength = 2
    • lengthAdjustment = 0
    • initialBytesToStrip = 0

    则发送前,总共14个字节,其中lengthFieldLength占据两个字节,0x000C表示消息长度是12:

    Length Actual Content
    0x000C "HELLO, WORLD"

    接收解析后,仍然是14个字节:

    Length Actual Content
    0x000C "HELLO, WORLD"

    例子2:

    • lengthFieldOffset = 0
    • lengthFieldLength = 2
    • lengthAdjustment = 0
    • initialBytesToStrip = 2

    则发送前,总共14个字节,其中lengthFieldLength占据两个字节,0x000C表示消息长度是12,initialBytesToStrip表示从头开始剥离2个字节,则解析后,将长度的字段剥离掉了:

    Length Actual Content
    0x000C "HELLO, WORLD"

    接收解析后,只有12个字节:

    Actual Content
    "HELLO, WORLD"

    例子3:

    • lengthFieldOffset = 2
    • lengthFieldLength = 3
    • lengthAdjustment = 0
    • initialBytesToStrip = 0

    则发送前,总共17个字节,其中lengthFieldLength占据3个字节,长度字段偏移量是2,偏移量用来存放魔数Header 1:

    Header 1 Length Actual Content
    0xCAFE 0x00000C "HELLO, WORLD"

    接收解析后,只有17个字节:

    Header 1 Length Actual Content
    0xCAFE 0x00000C "HELLO, WORLD"

    例子4:

    • lengthFieldOffset = 0
    • lengthFieldLength = 3
    • lengthAdjustment = 2
    • initialBytesToStrip = 0

    则发送前,总共17个字节,其中lengthFieldLength占据3个字节,lengthAdjustment 表示从长度字段开始,还有2个字节是内容:

    Length Header 1 Actual Content
    0x00000C 0xCAFE "HELLO, WORLD"

    接收解析后,只有17个字节:

    Length Header 1 Actual Content
    0x00000C 0xCAFE "HELLO, WORLD"

    例子5:

    • lengthFieldOffset = 1
    • lengthFieldLength = 2
    • lengthAdjustment = 1
    • initialBytesToStrip = 3

    则发送前,总共17个字节,长度字段便宜1个字节,长度字段为2个字节,从长度字段开始,还有一个字节后是内容,忽略前三个字节。

    Header 1 Length Header 2 Actual Content
    0xCA 0x000C 0xFE "HELLO, WORLD"

    接收解析后,只有13个字节:

    Header 2 Actual Content
    0xFE "HELLO, WORLD"

    模拟例子5,写一段实例代码,其中内容略有不同:

    示例代码:

    服务端

    /**
     * @description: TODO
     * @author:weirx
     * @date:2021/11/12 15:34
     * @version:3.0
     */
    public class LineBasedServer {
    
        public static void main(String[] args) {
    
            NioEventLoopGroup boss = new NioEventLoopGroup(1);
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.channel(NioServerSocketChannel.class);
                serverBootstrap.group(boss, worker);
                serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,1,4,1,5));
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                System.out.println(byteBuf.readByte() + "|" + byteBuf.toString(StandardCharsets.UTF_8));
                                super.channelRead(ctx, msg);
                            }
                        });
                    }
                });
                ChannelFuture channelFuture = serverBootstrap.bind(8080);
                //阻塞等待连接
                channelFuture.sync();
                //阻塞等待释放连接
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                System.out.println("server error:" + e);
            } finally {
                // 释放EventLoopGroup
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    }
    

    客户端:

    public class LineBasedClient {
        public static void main(String[] args) {
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.group(worker);
                bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            //建立连接成功后,会触发active事件
                            @Override
                            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                               send(ctx,"hello, world");
                                send(ctx,"HI!");
                            }
                        });
                    }
                });
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
                //释放连接
                channelFuture.channel().closeFuture().sync();
    
            } catch (InterruptedException e) {
                System.out.println("client error :" + e);
            } finally {
                //释放EventLoopGroup
                worker.shutdownGracefully();
            }
        }
    
        static void send(ChannelHandlerContext ctx,String msg){
            ByteBuf buffer = ctx.alloc().buffer();
            byte[] bytes = msg.getBytes();
            int length = bytes.length;
            //先写Header 1
            buffer.writeByte(1);
            //再写长度
            buffer.writeInt(length);
            //再写Header 2
            buffer.writeByte(2);
            //最后写内容
            buffer.writeBytes(bytes);
            ctx.writeAndFlush(buffer);
        }
    }
    

    结果:

    2|hello, world
    2|HI!
    

    关于粘包和半包的介绍就这么多了,有用的话,帮忙点个赞吧~~

    相关文章

      网友评论

        本文标题:netty(十四)Netty提升 - 粘包与半包

        本文链接:https://www.haomeiwen.com/subject/jxaftrtx.html