Netty通用TCP黏包解决方案--LengthFieldBas

作者: L千年老妖 | 来源:发表于2018-10-07 16:16 被阅读0次

    前言

    TCP以流的方式进行数据传输,上层应用协议为了对消息进行区分,往往采用如下4种方式。

    1. 消息长度固定:累计读取到固定长度为LENGTH之后就认为读取到了一个完整的消息。然后将计数器复位,重新开始读下一个数据报文。
    2. 回车换行符作为消息结束符:在文本协议中应用比较广泛。
    3. 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符。
    4. 通过在消息头中定义长度字段来标示消息的总长度。

    netty中针对这四种场景均有对应的解码器作为解决方案,比如:

    1. 通过FixedLengthFrameDecoder 定长解码器来解决定长消息的黏包问题;
    2. 通过LineBasedFrameDecoder和StringDecoder来解决以回车换行符作为消息结束符的TCP黏包的问题;
    3. 通过DelimiterBasedFrameDecoder 特殊分隔符解码器来解决以特殊符号作为消息结束符的TCP黏包问题;
    4. 最后一种,也是本文的重点,通过LengthFieldBasedFrameDecoder 自定义长度解码器解决TCP黏包问题。

    大多数的协议都会在协议头中携带长度字段,用于标识消息体或则整包消息的长度。LengthFieldBasedFrameDecoder通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息,只要传入正确的参数,就可以轻松解决“黏包”的问题。

    LengthFieldBasedFrameDecoder配置

    1. lengthFieldOffset: 长度字段的偏差
    2. lengthFieldLength: 长度字段占的字节数
    3. lengthAdjustment: 添加到长度字段的补偿值
    4. initialBytesToStrip: 从解码帧中第一次去除的字节数

    范例1:2bytes长度字段+消息体,保留长度字段

    ex1.png

    在解码前字节缓冲区占了14个字节,其中前两个字节是标识长度的字节,length=12,表示后面12个字节的消息体长度。

    lengthFieldOffset = 0  // 第一个字段就是长度字段,偏移为0
    lengthFieldLength = 2  // 长度字段length为2bytes
    lengthAdjustment = 0   // 不需要进行调整
    initialBytesToStrip = 0  // 不忽略长度字段
    

    范例2:2bytes长度字段+消息体,去除长度字段

    ex2.png
    因为我们可以通过调用ByteBuf#readableBytes()来获取内容的长度,因此便可以指定initialBytesToStrip去除长度字段。
    lengthFieldOffset = 0  // 第一个字段就是长度字段,偏移为0
    lengthFieldLength = 2  // 长度字段length为2bytes
    lengthAdjustment = 0   // 不需要进行调整
    initialBytesToStrip = 2  // 忽略长度字段(the length of the Length field)
    

    范例3:2bytes长度字段(标识整个消息的长度)+消息体,保留长度字段

    在大多数的应用场景中,长度字段仅用来标识消息体的长度,这类协议通常由消息长度字段+消息体组成,如上图所示的几个例子。但是,对于某些协议,长度字段还包含了消息头的长度。在这种应用场景中,往往需要使用lengthAdjustment进行修正。由于整个消息(包含消息头)的长度往往大于消息体的长度,所以,lengthAdjustment为负数。下图展示了通过指定lengthAdjustment字段来包含消息头的长度:

    lengthFieldOffset = 0
    lengthFieldLength = 2
    lengthAdjustment = -2
    initialBytesToStrip = 0
    
    ex3.png

    范例4:两个请求头,长度字段偏移1byte+消息体,保留长度字段

    由于协议的种类繁多,并不是所有的协议都将长度字段放在消息头的首位,当标识消息长度的字段位于消息头的中间或者尾部时,需要使用lengthFieldOffset字段进行标识,下面的参数组合给出了如何解决消息长度字段不在首位的问题:

    lengthFieldOffset = 2
    lengthFieldLength = 3
    lengthAdjustment = 0
    initialBytesToStrip = 0
    

    由于消息头1的长度为2,所以长度字段的偏移量为2;消息长度字段Length为3,所以lengthFieldLength值为3。由于长度字段仅仅标识消息体的长度,所以lengthAdjustment和initialBytesToStrip都为0。


    ex4.png

    范例5:两个请求头,长度字段+第二个请求头+消息体,保留长度字段

    长度字段和消息体之前还存在其他请求头,可以指定lengthAdjustment将额外的头长度计入帧长度计算:

    lengthFieldOffset = 0
    lengthFieldLength = 3
    lengthAdjustment = 2(the length of Header 1)
    initialBytesToStrip = 0
    
    ex5.png

    范例6:2byte长度字段位于两个2byte字段中间,去除第一个字段和长度字段

    长度字段夹在两个消息头之间或者长度字段位于消息头的中间,前后都有其它消息头字段,在这种场景下如果想忽略长度字段以及其前面的其它消息头字段,通过initialBytesToStrip参数来跳过要忽略的字节长度,它的组合配置示意如下:
    lengthFieldOffset = 1 (the length of HDR1)
    lengthFieldLength = 2
    lengthAdjustment = 1 (the length of HDR2)
    initialBytesToStrip = 3 (the length of HDR1 + LEN)

    image.png

    范例7:2byte长度字段(标识整个消息的长度)位于两个2byte字段中间,去除第一个字段和长度字段

    和范例6相似,只不过长度字段为整个消息的长度,因此我们在去除前两个字段时不需要考虑HDR2的长度,只需要调整HDR1 + LEN的字节长度。

    lengthFieldOffset = 1
    lengthFieldLength = 2
    lengthAdjustment = -3(the length of HDR1 + LEN)
    initialBytesToStrip = 3
    
    ex7.png

    事实上,通过4个参数的不同组合,可以达到不同的解码效果,用户在使用过程中可以根据业务的实际情况进行灵活调整。

    实战[nodejs客户端,netty服务端]自定义编解码

    自定义消息 AccessReq:

    /*--- 消息头 ---*/
    // 消息类型,用来区分协议
    private byte flag;
    // 消息主体总长度
    private int len;
    
    /*--- 消息主体 ---*/
    // 消息名长度
    private int msgNameLen;
    // 消息名
    private String msgName;
    // 消息体
    private String body;
    

    长度字段前有一个1字节falg,lengthFieldOffset=1,长度字段4字节,lengthFieldLength=4,要保留整个消息,lengthAdjustment,initialBytesToStrip都为0

    服务端解码并回消息:

    ByteBuf buf = (ByteBuf) msg;
    flag = buf.readByte();
    len = buf.readInt();
    msgNameLen = buf.readInt();
    msgName = buf.readCharSequence(msgNameLen, CharsetUtil.UTF_8).toString();
    body = buf.readCharSequence(buf.readableBytes(), CharsetUtil.UTF_8).toString();
    
    ```
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
        AccessReq req = new AccessReq(msg);
    
        log.info("[SOCKET REQ-{}]:{}", req.getMsgName(), req.getBody());
    
        String obj = deelMsg(req);
        ByteBuf buf = Unpooled.copiedBuffer(obj, CharsetUtil.UTF_8);
        ctx.writeAndFlush(buf);
    }
    private String deelMsg(AccessReq req) {
        return "hello netty";
    }
    

    nodejs编码并发送

    const net = require("net");
    const socket = new net.Socket();
    const client = socket.connect(
       4200,
       "127.0.0.1",
       function() {
           console.log("connect success");
           const name = "req";
           const body = "json";
           const nl = Buffer.byteLength(name);
           const bl = Buffer.byteLength(body);
           const buf = Buffer.alloc(bl + nl + 9);
    
           // 消息头
           buf.write("G"); // 消息类型  默认G,scoket连接
           buf.writeInt32BE(bl + nl + 4, 1); // 消息体总长度
           // 消息体
           buf.writeInt32BE(nl, 5); // 消息名长度
           buf.write(name, 9); // 消息名
           buf.write(body, nl + 9); // 消息体
           client.write(buf);
           console.log(buf);
        
       }
    );
    client.on("data", data => {
        console.log("[RECV] " + data);
    });
    

    打印结果

    server:

    2018-08-01 11:25:12.525  INFO 3884 --- [ntLoopGroup-5-1] c.c.g.a.h.n.NativeSocketHandler          : - U: [SOCKET REQ-req]:json
    

    client:

    chaoyer:netty-client apple$ node client.js
    connect success
    <Buffer 47 00 00 00 0b 00 00 00 03 72 65 71 6a 73 6f 6e>
    [RECV] hello netty
    

    LengthFieldBasedFrameDecoder源码解析

    下面我们就来看看基于消息长度的半包解码器,首先看看入口方法:

    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
          
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }
    

    内部调用decode(ChannelHandlerContext ctx, ByteBuf in) 如果解码成功,就将其加入到输出的List out列表中。该函数较长我们还是分几部分来分析:
    (1)判断discardingTooLongFrame标识,看是否需要丢弃当前可读的字节缓冲区,如果为真,则执行求其操作。

    if (discardingTooLongFrame) {
        //获取需要丢弃的长度
        long bytesToDiscard = this.bytesToDiscard;
        //丢弃的长度不能超过当前缓冲区可读的字节数
        int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
        //跳过需要忽略的字节长度
        in.skipBytes(localBytesToDiscard);
        //bytesToDiscard减去已经忽略的字节长度
        bytesToDiscard -= localBytesToDiscard;
        this.bytesToDiscard = bytesToDiscard;
        failIfNecessary(false);
    }
    

    (2)对当前缓冲区中可读字节数和长度偏移量进行对比,如果小于偏移量,谁明缓冲区数据报不够,直接返回null.

    //数据报内数据不够,返回null,由IO线程继续读取数据。  
    if (in.readableBytes() < lengthFieldEndOffset) {
        return null;
    }
    
    int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
    long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
    
    if (frameLength < 0) {
        in.skipBytes(lengthFieldEndOffset);
        throw new CorruptedFrameException("negative pre-adjustment length field: " + frameLength);
    }
    
    frameLength += lengthAdjustment + lengthFieldEndOffset;
    
    if (frameLength < lengthFieldEndOffset) {
        in.skipBytes(lengthFieldEndOffset);
        throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less than lengthFieldEndOffset: " + lengthFieldEndOffset);
    }
    

    其实核心就是:对消息进行解码,解码之后将解码后的字节数据放到一个新的ByteBuf中返回,并更新原来的消息msg对象的读写索引值。

    相关文章

      网友评论

        本文标题:Netty通用TCP黏包解决方案--LengthFieldBas

        本文链接:https://www.haomeiwen.com/subject/heblaftx.html