美文网首页晓我课堂
netty粘包和拆包

netty粘包和拆包

作者: wavefreely | 来源:发表于2021-11-19 09:29 被阅读0次

    粘包和拆包是TCP网络编程中不可避免的,无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制。

    拆包与粘包同时发生在数据的发送方与接收方两方。

    产生原因

    发送方通过网络每发送一批二进制数据包,那么这次所发送的数据包就称为一帧,即

    Frame。在进行基于 TCP 的网络传输时,TCP 协议会将用户真正要发送的数据根据当前缓存

    的实际情况对其进行拆分或重组,变为用于网络传输的 Frame。在 Netty 中就是将 ByteBuf

    中的数据拆分或重组为二进制的 Frame。而接收方则需要将接收到的 Frame 中的数据进行重

    组或拆分,重新恢复为发送方发送时的 ByteBuf 数据。

    具体场景描述:

    • 发送方发送的 ByteBuf 较大,在传输之前会被 TCP 底层拆分为多个 Frame 进行发送,这

    个过程称为发送拆包;接收方在接收到需要将这些 Frame 进行合并,这个合并的过程称

    为接收方粘包。

    • 发送方发送的 ByteBuf 较小,无法形成一个 Frame,此时 TCP 底层会将很多的这样的小

    的 ByteBuf 合并为一个 Frame 进行传输,这个合并的过程称为发送方的粘包;接收方在

    接收到这个 Frame 后需要进行拆包,拆分出多个原来的小的 ByteBuf,这个拆分的过程

    称为接收方拆包。

    • 当一个 Frame 无法放入整数倍个 ByteBuf 时,最后一个 ByteBuf 会会发生拆包。这个

    ByteBuf 中的一部分入入到了一个 Frame 中,另一部分被放入到了另一个 Frame 中。这

    个过程就是发送方拆包。但对于将这些 ByteBuf 放入到一个 Frame 的过程,就是发送方

    粘包;当接收方在接收到两个 Frame 后,对于第一个 Frame 的最后部分,与第二个 Frame

    的最前部分会进行合并,这个合并的过程就是接收方粘包。但在将 Frame 中的各个

    ByteBuf 拆分出来的过程,就是接收方拆包。

    具体情况如下图所示:

    netty粘包拆包.png

    解决方案

    固定长度

    对于使用固定长度的粘包和拆包场景,可以使用:

    FixedLengthFrameDecoder:每次读取固定长度的消息,如果当前读取到的消息不足指定长度,那么就会等待下一个消息到达后进行补足。其使用也比较简单,只需要在构造函数中指定每个消息的长度即可。

    bootstrap.group(parentGroup, childGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 1024)
            //接收套接字缓冲区大小
            .option(ChannelOption.SO_RCVBUF, 1024 * 1024)
            //发送套接字缓冲区大小
            .option(ChannelOption.SO_SNDBUF, 1024 * 1024)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<SocketChannel>() {
    
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    // 这里将FixedLengthFrameDecoder添加到pipeline中,指定长度为100
                    pipeline.addLast(new FixedLengthFrameDecoder(100));
                    // StringEncoder:字符串编码器,将String编码为将要发送到Channel中的ByteBuf
                    pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
                    // StringDecoder:字符串解码器,将Channel中的ByteBuf数据解码为String
                    pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
                    //绑定处理器(可绑定多个)
                    pipeline.addLast(new ServerHandler()); //处理业务
                }
            });
    
    行拆包

    LineBasedFrameDecoder:每个应用层数据包,都以换行符作为分隔符,进行分割拆分,LineBasedFrameDecoder依次遍历ByteBuf中的可读字节,判断是否有"\n"或者"\r\n",如果有就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行,它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度,如果连续读到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。这个使用也比较简单:

    childHandler(new ChannelInitializer<SocketChannel>() {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            // 这里将FixedLengthFrameDecoder添加到pipeline中,指定长度为100
            //pipeline.addLast(new FixedLengthFrameDecoder(100));
            //这里将LineBasedFrameDecoder添加到pipeline中,设置最大长度为1024
            pipeline.addLast(new LineBasedFrameDecoder(1024));
            // StringEncoder:字符串编码器,将String编码为将要发送到Channel中的ByteBuf
            pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
            // StringDecoder:字符串解码器,将Channel中的ByteBuf数据解码为String
            pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
            //绑定处理器(可绑定多个)
            pipeline.addLast(new ServerHandler()); //处理业务
        }
    });
    
    指定分隔符

    对于通过分隔符进行粘包和拆包问题的处理,Netty提供了

    DelimiterBasedFrameDecoder:通过用户指定的分隔符对数据进行粘包和拆包处理,用法如下:

    childHandler(new ChannelInitializer<SocketChannel>() {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            // 这里将FixedLengthFrameDecoder添加到pipeline中,指定长度为100
            // pipeline.addLast(new FixedLengthFrameDecoder(100));
            //这里将LineBasedFrameDecoder添加到pipeline中,设置最大长度为1024
            // pipeline.addLast(new LineBasedFrameDecoder(1024));
    
            //被按照$_$进行分隔,这里1024指的是分隔的最大长度,即当读取到1024个字节的数据之后,
            // 若还是未读取到分隔符,则舍弃当前数据段,因为其很有可能是由于码流紊乱造成的
            ByteBuf delimiter = copiedBuffer(Constants.MESSAGE_DELIMITER.getBytes(Charset.forName("UTF-8")));
            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
    
            // StringEncoder:字符串编码器,将String编码为将要发送到Channel中的ByteBuf
            pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
            // StringDecoder:字符串解码器,将Channel中的ByteBuf数据解码为String
            pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
            //绑定处理器(可绑定多个)
            pipeline.addLast(new ServerHandler()); //处理业务
        }
    });
    
    基于数据包长度的拆包

    LengthFieldBasedFrameDecoder:将应用层数据包的长度,作为接收端应用层数据包的拆分依据。按照应用层数据包的大小,拆包。这个拆包器,有一个要求,就是应用层协议中包含数据包的长度,应用如下:

    childHandler(new ChannelInitializer<SocketChannel>() {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            // 这里将LengthFieldBasedFrameDecoder添加到pipeline的首位,因为其需要对接收到的数据
            // 进行长度字段解码,这里也会对数据进行粘包和拆包处理
            pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
            // LengthFieldPrepender是一个编码器,主要是在响应字节数据前面添加字节长度字段
            pipeline.addLast(new LengthFieldPrepender(2));
            // StringEncoder:字符串编码器,将String编码为将要发送到Channel中的ByteBuf
            pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
            // StringDecoder:字符串解码器,将Channel中的ByteBuf数据解码为String
            pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
            //绑定处理器(可绑定多个)
            pipeline.addLast(new ServerHandler()); //处理业务
        }
    });
    
    自定义粘包拆包器

    可以通过实现MessageToByteEncoderByteToMessageDecoder来实现自定义粘包和拆包处理的目的。

    • MessageToByteEncoder:作用是将响应数据编码为一个ByteBuf对象

    • ByteToMessageDecoder:将接收到的ByteBuf数据转换为某个对象数据

    最后我们也可以自定义编码器MessageToMessageEncoder和自定义解码器MessageToMessageDecoder,来实现消息内容的转换,比如序列化成某个对象,处理器里面我们就可以不用再去转换对象,具体实现如下:

    自定义解码器
    /**
     * @Description:  自定义解码器
     * @author: dy
     */
    public class CustomDecoder extends MessageToMessageDecoder<ByteBuf> {
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
            System.out.println("====1111111111===="+msg.toString(Charset.forName("UTF-8")));
            out.add(JSON.parseObject(msg.toString(Charset.forName("UTF-8")), Message.class));
        }
    }
    

    使用只需要在构造器里面加入我们自定义的编码器就可以了:

    childHandler(new ChannelInitializer<SocketChannel>() {
    
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                       
                                // 这里将LengthFieldBasedFrameDecoder添加到pipeline的首位,因为其需要对接收到的数据
                                // 进行长度字段解码,这里也会对数据进行粘包和拆包处理
                                pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
                                // LengthFieldPrepender是一个编码器,主要是在响应字节数据前面添加字节长度字段
                                pipeline.addLast(new LengthFieldPrepender(2));
                                // StringEncoder:字符串编码器,将message对象编码为将要发送到Channel中的ByteBuf
                                pipeline.addLast(new CustomDecoder());
                                 // StringDecoder:字符串解码器,将Channel中的ByteBuf数据解码为String
                                pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
                                //绑定处理器(可绑定多个)
                                pipeline.addLast(new ServerHandler()); //处理业务
                            }
                        });
    

    相关文章

      网友评论

        本文标题:netty粘包和拆包

        本文链接:https://www.haomeiwen.com/subject/hipltrtx.html