美文网首页
Java NIO(九)tcp粘包拆包及ByteToMessage

Java NIO(九)tcp粘包拆包及ByteToMessage

作者: 清雨季 | 来源:发表于2019-08-01 12:44 被阅读0次

    一 ByteToMessageDecoder

    1.1 实例

    ByteToMessageDecoder,用于把一个byte流转换成一个对象,实例:

    public class StringDecoder extends ByteToMessageDecoder {
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            byte[] bytes = new byte[in.readableBytes()];
            in.readBytes(bytes);
            out.add(new String(bytes));
        }
    }
    

    它有一个抽象方法decode,我们实现了这个方法,这个方法的第三个参数是一个List<Object>,所有加入这个List的对象都会被逐一的调用fireChannelRead方法映射事件。

    使用方法:ByteToMessageDecoder其实就是一个ChannelInboundHandler,直接加入到Pipeline即可:

            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override 
                        protected void initChannel(SocketChannel socketChannel) throws Exception { 
                            socketChannel.pipeline().addLast(new StringDecoder());
                            //...
                        }
                        });
    

    这样,ByteBuf数据到达这个Handler之后,会被转成String,然后继续传递数据。

    1.2 实现

    ByteToMessageDecoder是个抽象类,它继承了ChannelInboundHandler,做了以下逻辑:

    • 重写父类的channelRead方法,在这个方法中,把ByteBuf数据交给子类decode方法处理,decode的方法定义如下:
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
    

    它的第二个参数为需要处理的ByteBuf,第三个参数为一个List,用于记录处理后的数据。

    • 子类decode方法返回会,它会遍历参数中的List,把里面的对象依次取出来调用fireChannelRead方法传递事件。
    • 如果decode方法没有把ByteBuf读取完,则会记录这次的ByteBuf对象,然后下一次处理消息时,会把下一次的ByteBuf和这次的ByteBuf合并,然后再交给子类decode处理。

    关于第三点,这个逻辑的目的是为了方便处理TCP的粘包和拆包

    1.3 源码

    源码从ByteToMessageDecoder的channelRead方法开始

    步骤一:把当前的ByteBuf与上次未处理的ByteBuf合并:

    ByteBuf data = (ByteBuf) msg;
    first = cumulation == null;
    if (first) {
        cumulation = data;
    } else {
        cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
    }
    

    例如:上次未处理的ByteBuf是[1,2,3,4,5,0,0],这一次的ByteBuf是[6,7,8],处理完之后结果是[1,2,3,4,5,6,7,8]
    注:可以会有一个疑问是:为什么ByteBuf会扩容成了8,而不是64?因为这里没有使用ByteBuf的扩容逻辑,而是自己实现了一套。

    步骤二:处理子类decode后添加到List中的数据:

                    int outSize = out.size();
                    if (outSize > 0) {
                        fireChannelRead(ctx, out, outSize);
                        out.clear();
                    }
    

    为啥还没调用子类的decode方法就要处理List了?因为这个逻辑是在循环里的,简化代码表示即:

    while (in.isReadable()) { 
                 fireChannelRead(ctx, out, outSize);
                 decode(ctx, in, out);
    }
    

    所以每次循环时处理的都是上一次循环后子类添加到List中的数据。

    步骤三:处理子类decode后的ByteBuf

                    //这个if条件能进去,说明已经读完了
                    if (cumulation != null && !cumulation.isReadable()) {
                        numReads = 0;
                        cumulation.release();
                        cumulation = null;
                    } else if (++ numReads >= discardAfterReads) {
                        numReads = 0;
                        discardSomeReadBytes();
                    }
    

    这一步还有一个逻辑:如果连续16次都没处理ByteBuf,则会把ByteBuf中的数据压缩一次。

    二 tcp 拆包粘包问题

    2.1 问题描述

    tcp粘包,即tcp在发送数据时,可能会把两个tcp包合并成一个发送
    tcp拆包,即tcp在发送数据时,可能会把一个tcp包拆成多个来发送

    例如:客户端分两次给服务端发送了两个消息"ABCD" 和 "EFG"

    • 服务端可能收到三个数据包,分别是"AB", "CD", "EFG",即第一个数据包被拆包成了两个
    • 服务端可能只会收到一个数据包:"ABCDEFG",即两个数据包被合并成了一个包
    • 服务端甚至可能会收到"ABC", "DEFG",会拆包再粘包

    2.2 产生的原因

    分为以下三个原因:

    1. socket缓冲区造成的粘包:
      每个socket都有一个发送缓存区与接收缓冲区,客户端向服务端写数据时,实际上是写到了服务端socket的接收缓冲区中。
      服务端调用read方法时,其实只是把接收缓冲区的内容读取到内存中了。因此,服务端调用read方法时,可能客户端已经写了两个包到接收缓冲区中了,因此read到的数据其实是两个包粘包后的数据。

    2. MSS/MTU限制导致的拆包
      MSS是指TCP每次发送数据允许的最大长度,一般是1500字节,如果某个数据包超过了这个长度,就要分多次发送,这就是拆包。

    3. Nagle算法导致的粘包
      网络数据包都是要带有数据头部的,通常是40字节,假如我们发送一个字节的数据,也要加上这40个字节的头部再发送,显然这样是非常不划算的。
      所以tcp希望尽可能的一次发送大块的数据包,Nagle算法就是做这个事的,它会收集多个小数据包,合并为一个大数据包后再发送,这就是粘包。

    2.3 解决办法

    通常,解决tcp粘包拆包问题,是通过定义通信协议来实现的:

    1. 定长协议
      即规定每个数据包的长度,假如我们规定每个数据包的长度为3,假如服务端收到客户端的数据为:"ABCD", "EF",那么也可以解析出实际的数据包为"ABC", "DEF"。

    2. 特殊分隔符协议
      即规定每个数据包以什么样的字符结尾,如规定以$符号结尾,假如服务端收到的数据包为:"ABCD$EF", "G$",那么可以解析出实际数据包为:"ABCD", "EFG"。这种方式要确保消息体中可能会出现分隔符的情况。

    3. 长度编码协议
      即把消息分为消息头和消息体,在消息头中包含消息的长度

    关于tcp粘包拆包的内容,这有篇文章讲得非常好,强推:TCP粘包、拆包与通信协议

    三 Netty中解决tcp粘包拆包问题的方法

    3.1 自定义一个tcp粘包拆包处理器

    基于ByteToMessageDecoder,我们可以很容易的实现处理tcp粘包拆包问题的Handler,以定长协议为例,我们来实现一个定长协议的tcp粘包拆包处理器:

    public class LengthDecoder extends ByteToMessageDecoder {
        private int length;
    
        public LengthDecoder(int length) {
            this.length = length;
        }
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            while (in.readableBytes() >= length) {
                byte[] buff = new byte[length];
                in.readBytes(buff);
                out.add(new String(buff));
            }
        }
    }
    

    在decode方法中,我们循环判断,如果ByteBuf中未读的数据量大于指定的长度length,我们就读到lenght个数据,然后转成字符串加入到List中。

    后续ByteToMessageDecoder依次把List中的数据fireChannelRead传递事件。

    如果ByteBuf中的未读数据不够length,说明发生了拆包,后续还有数据,这里直接不处理即可,ByteToMessageDecoder会帮我们记住这次的ByteBuf,下一次数据来了之后,会跟这次的数据合并后再处理。

    3.2 Netty中自带的tcp粘包拆包处理器

    Netty中实现了很多种粘包拆包处理器:

    • FixedLengthFrameDecoder:与我们上面自定义的一样,定长协议处理器
    • DelimiterBasedFrameDecoder:特殊分隔符协议的处理器
    • LineBasedFrameDecoder:特殊分隔符协议处理器的一种特殊情况,行分隔符协议处理器。
    • JsonObjectDecoder:json协议格式处理器
    • HttpRequestDecoder:http请求体协议处理器
    • HttpResponseDecoder:http响应体处理器,很明显这个是用于客户端的

    相关文章

      网友评论

          本文标题:Java NIO(九)tcp粘包拆包及ByteToMessage

          本文链接:https://www.haomeiwen.com/subject/cifqdctx.html