美文网首页
netty如何处理拆包与粘包

netty如何处理拆包与粘包

作者: hello_kd | 来源:发表于2021-10-05 12:10 被阅读0次

拆包与粘包是网络编程中必会出现的一个问题。因此,本文先介绍下什么是拆包与粘包,然后通过例子演示这个现象,最后再介绍netty中如何处理拆包与粘包

所谓的拆包就是发送方发送一条数据,接收方分为多次接收了该条数据。比如A发送123456789,B接收到了两条消息12345、6789,而实际上这是一条消息的;粘包就是发送方发送了多条数据,接收方读取数小于发送数。比如A发送123、456、789,B收到了两条消息,1234、56789等

造成拆包与粘包的原因主要有两个层面的,分别为应用层和传输层(tcp协议层面的),本文就不讲述传输层导致的,有兴趣的可以自己看下计算机网络tcp协议相关资料,本文主要分析应用层产生的原因。

在netty中产生这两个现象是因为ByteBuf缓冲区引起的,当ByteBuf的容量比较小时,而socket缓冲区数据又较多,那么ByteBuf就需要多次从socket缓冲区读取数据,导致拆包现象。当ByteBuf的容量比较大时,就可以一次性从socket缓冲区读取数据,导致粘包现象。netty中ByteBuf接收缓冲区的初始值为1024个字节,可以看下这篇文章

下面通过实际的例子来演示下

粘包例子

//客户端
ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
                    }
                })
                .connect(new InetSocketAddress("localhost", 8899));

        Channel channel = channelFuture.sync().channel();
        for (int i = 0; i < 10; i++) {
            channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeByte(i));
        }
//服务器端
new ServerBootstrap()
                .group(new NioEventLoopGroup(), new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
                    }
                })
                .bind(8899);

客户端分10次发送了10条数据,每次发送一个字节。服务端收到的是一条数据,共10个字节,因为ByteBuf有足够的容量将socket缓冲区的数据一次性读取到应用程序中。

拆包例子
为了更好的演示拆包例子,手动将netty的ByteBuf的接收容量改为64个字节,而不是默认的1024个字节。

//服务端
new ServerBootstrap()
                .group(new NioEventLoopGroup(), new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
//这里是修改默认的接收缓冲区初始大小
                .childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 64, 1024))
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
                    }
                })
                .bind(8899);
//客户端
ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
                    }
                })
                .connect(new InetSocketAddress("localhost", 8899));

        Channel channel = channelFuture.sync().channel();

        channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes(getContent()));

//获取要发送的内容
private static byte[] getContent() {
        byte[] content = new byte[65];
        for (int i = 0; i < 65; i++) {
            content[i] = (byte)i;
        }
        return content;
    }

这个例子中,客户端一次性发送了65个字节,服务端的接收ByteBuf容量初始值为64个字节,因此需要分两次读取,第一次读取64个字节,第二次读取1个字节,也就是产生了拆包现象。

那么如何处理这两个现象呢,首先就是接收方要知道发送方发送消息的边界,比如说,每条消息的长度多少、消息以什么结尾的等等。当知道了消息边界后,就可以对消息进行解析,获取到完整的一条消息。

netty中提供了4个handler来处理拆包与粘包,分别为LineBasedFrameDecoder、DelimiterBasedFrameDecoder、FixedLengthFrameDecoder、LengthFieldBasedFrameDecoder

下面再介绍这几种handler的使用时,就不分别编写服务端和客户端了,用netty提供的EmbeddedChannel来测试

  • LineBasedFrameDecoder:当遇到了换行符,就当做是一条完整的消息。
/**LineBasedFrameDecoder构造参数的意义,1000表示这个handler解析的帧的最大长度
第二个参数表示的是解析出来的消息是否不包含换行分隔符、第三个参数,表示当解析超过最大帧长度时还未遇到换行分隔符,是否要报错
*/
EmbeddedChannel channel = new EmbeddedChannel(new LoggingHandler(LogLevel.DEBUG), new LineBasedFrameDecoder(1000, false, true),
        new ChannelInboundHandlerAdapter(){
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ByteBuf buf = ((ByteBuf) msg);
                String content = buf.toString(StandardCharsets.UTF_8);
                System.out.println(content);
            }
        });
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello\nworld\nwelcome\n".getBytes(StandardCharsets.UTF_8)));

在这个例子中,channel接收到了这样的一条消息hello\nworld\nwelcome\n,会根据换行符进行消息的解析处理,解析后有3条消息

  • DelimiterBasedFrameDecoder:与LineBasedFrameDecoder的基本一样,就是分隔符可以自定义,且可以定义多种分隔符
ByteBuf delimeter1 = Unpooled.buffer().writeBytes("\n".getBytes(StandardCharsets.UTF_8));
ByteBuf delimeter2 = Unpooled.buffer().writeBytes("\r".getBytes(StandardCharsets.UTF_8));
EmbeddedChannel channel = new EmbeddedChannel(new LoggingHandler(LogLevel.DEBUG), new DelimiterBasedFrameDecoder(10, true, true, delimeter1, delimeter2),
        new ChannelInboundHandlerAdapter(){
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ByteBuf buf = ((ByteBuf) msg);
                String content = buf.toString(StandardCharsets.UTF_8);
                System.out.println(content);
            }
        });
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello\nworld\nwelcome\r".getBytes(StandardCharsets.UTF_8)));
  • FixedLengthFrameDecoder:固定长度对消息进行拆分,若消息没有达到这个长度,那么就不是一条完整的消息
//这里每条消息设置的固定长度是5
EmbeddedChannel channel = new EmbeddedChannel(new LoggingHandler(LogLevel.DEBUG), new FixedLengthFrameDecoder(5),
        new ChannelInboundHandlerAdapter(){
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ByteBuf buf = ((ByteBuf) msg);
                String content = buf.toString(StandardCharsets.UTF_8);
                System.out.println(content);
            }
        });
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello\nworld\nwelcome\r".getBytes(StandardCharsets.UTF_8)));
  • LengthFieldBasedFrameDecoder:这个比较复杂一些,会将消息分为两部分,一部分为消息头部,一部分为实际的消息体。其中消息头部是固定长度的,消息体是可变的,且消息头部一般会包含一个Length字段,有的自定义消息协议中Length字段值是指的整条消息的长度,包含头部本身,有的Length字段值指的是消息体实际的长度等等,而LengthFieldBasedFrameDecoder就是用来解析这种消息协议的,构造方法中有以下几个重要参数
    lengthFieldOffset: Length字段在帧的起始偏移位置
    lengthFieldLength: Length字段占用的字节数
    lengthAdjustment: 对Length值的调整数
    initialBytesToStrip: 解析后的实际消息需要跳过帧头部的字节数
    其中Length值+lengthAdjustment=帧中length字节后面的字节长度,比如看下这个例子
//   Length字段值为0x0010=16,表示16字节+lengthAdjustment (-3)= 13,
//表示的是Length占用字节后面的字节长度,也就是HDR2 + Actual Content的实际长度为13个字节。
//initialBytesToStrip=3,表示解析后的实际消息需要跳过帧头部的前3个字节,因此最后得到的消息是HDR2 + Actual Content,
//如果initialBytesToStrip=4,那么最后的实际消息是 Actual Content

   lengthFieldOffset   =  1
   lengthFieldLength   =  2
   lengthAdjustment    = -3 (= the length of HDR1 + LEN, negative)
   initialBytesToStrip =  3
  
   BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
   +------+--------+------+----------------+      +------+----------------+
   | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
   | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
   +------+--------+------+----------------+      +------+----------------+

下面通过实际的代码演示下

EmbeddedChannel channel = new EmbeddedChannel(new LoggingHandler(LogLevel.DEBUG),
        new LengthFieldBasedFrameDecoder(100, 2, 4, -8, 7),
        new ChannelInboundHandlerAdapter(){
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ByteBuf buf = ((ByteBuf) msg);
                String content = buf.toString(StandardCharsets.UTF_8);
                System.out.println(content);
            }
        });
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes(frame()));

/**
 * 这个例子中有3个头部 H1、Length、H2以及实际的content hello world
 * H1 占用 2个字节
 * Length占用4个字节
 * H2占用1个字节
 * content占用11个字节
 * lengthFieldOffset   = 2
 * lengthFieldLength   = 4
 * lengthAdjustment    = -8
 * initialBytesToStrip = 6
 *
 * @return
 */
private static ByteBuf frame() {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    buffer.writeShort(11);
    buffer.writeInt(20);
    buffer.writeByte(2);
    buffer.writeBytes("hello world".getBytes(StandardCharsets.UTF_8));
    return buffer;
}

这个程序最终输出的是content,hello world。

相关文章

网友评论

      本文标题:netty如何处理拆包与粘包

      本文链接:https://www.haomeiwen.com/subject/krnwnltx.html