美文网首页
netty源码分析(七) - 解码 - 2常用解码器

netty源码分析(七) - 解码 - 2常用解码器

作者: 进击的蚂蚁zzzliu | 来源:发表于2020-10-26 19:17 被阅读0次

FixedLengthFrameDecoder:固定长度解码器

固定长度3字节为例

原始TCP数据报中字节流

 * +---+----+------+----+
 * | A | BC | DEFG | HI |
 * +---+----+------+----+

解码后字节流

 * +-----+-----+-----+
 * | ABC | DEF | GHI |
 * +-----+-----+-----+

代码分析

protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    Object decoded = decode(ctx, in);
    if (decoded != null) {
        out.add(decoded);
    }
}

protected Object decode(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    //可读数据小于固定解码长度,不用处理,等待累加器累加后再解码
    if (in.readableBytes() < frameLength) {
        return null;
    } else {
        //返回的是切片,会增加in引用计数,防止被回收了
        return in.readRetainedSlice(frameLength);
    }
}
  1. 可读数据小于固定解码长度,不用处理,等待累加器累加后再解码
  2. in.readRetainedSlice(frameLength);使用保留切片,切出固定长的缓冲区(这里为什么要保留切片呢,因为切片是共享原缓冲区的数据的,如果源缓冲区用完了可能被释放,所以需要保留一下,增加引用计数,当然在切片释放的时候,也会释放源缓冲区的),引用计数在ByteToMessageDecoder.channelRead # cumulation.release();会释放;

示例代码

Client:send
ByteBuf msg = Unpooled.buffer(SIZE);
msg.writeBytes("A".getBytes());
msg.writeBytes("BC".getBytes());
msg.writeBytes("DEFG".getBytes());
msg.writeBytes("HI".getBytes());

Server:使用解码器
new FixedLengthFrameDecoder(3)

Server:handler接收数据
ByteBuf bf = ((ByteBuf)msg);
byte[] arr = new byte[bf.readableBytes()];
bf.readBytes(arr);
System.out.println("CodecHandler收到数据:"+ new String(arr));

打印接收数据
CodecHandler收到数据:ABC
CodecHandler收到数据:DEF
CodecHandler收到数据:GHI

LineBasedFrameDecoder:行解码器

根据换行符(\r\n 或 \n)进行解码

举例

原始TCP数据报中字节流

 * +--------------+
 * | ABC\nDEF\r\n |
 * +--------------+

解码后字节流

 * +-----+-----+
 * | ABC | DEF |
 * +-----+-----+

代码分析

//可解码最大长度
private final int maxLength;
//一旦超过maxLength,是否立即引发异常,默认false
private final boolean failFast;
//是否跳过分隔符,默认true
private final boolean stripDelimiter;
//是否有丢弃数据
private boolean discarding;
//丢弃字节长度
private int discardedBytes;
//最后扫描位置
private int offset;
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
    //找分隔符位置
    final int eol = findEndOfLine(buffer);
    //默认没有丢弃
    if (!discarding) {
        if (eol >= 0) {
            final ByteBuf frame;
            //到readerIndex到分隔符长度
            final int length = eol - buffer.readerIndex();
            //分隔符长度,\r即\r\n:2个字节, 否则'\n':1个字节
            final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
            //length > 最大长度,直接抛出异常
            if (length > maxLength) {
                buffer.readerIndex(eol + delimLength);
                fail(ctx, length);
                return null;
            }
            //忽略分隔符,读取数据不包含分隔符;否则包含分隔符
            if (stripDelimiter) {
                frame = buffer.readRetainedSlice(length);
                //buffer跳过分隔符
                buffer.skipBytes(delimLength);
            } else {
                frame = buffer.readRetainedSlice(length + delimLength);
            }
            return frame;
        } else {
            //没找到分割符,length:可读长度
            final int length = buffer.readableBytes();
            //可读长度 > 最大长度,直接丢弃
            if (length > maxLength) {
                //设置丢弃长度
                discardedBytes = length;
                //readerIndex置为writerIndex,相当于置为不可读
                buffer.readerIndex(buffer.writerIndex());
                //有丢弃置为true
                discarding = true;
                offset = 0;
                //默认false,继续处理
                if (failFast) {
                    fail(ctx, "over " + discardedBytes);
                }
            }
            return null;
        }
    } else {//有丢弃标志为true,即之前已经丢弃过数据
        if (eol >= 0) {
            final int length = discardedBytes + eol - buffer.readerIndex();
            final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
            //直接跳过 上一次丢弃的数据 + 这一次分隔符之前的数据
            buffer.readerIndex(eol + delimLength);
            discardedBytes = 0;
            discarding = false;
            if (!failFast) {
                fail(ctx, length);
            }
        } else {
            //没找到分隔符,累加discardedBytes
            discardedBytes += buffer.readableBytes();
            //readerIndex置为writerIndex,相当于置为不可读
            buffer.readerIndex(buffer.writerIndex());
            // We skip everything in the buffer, we need to set the offset to 0 again.
            offset = 0;
        }
        return null;
    }
}
  1. 上一次没有丢弃数据
    • 有分割符,可读长度超过了最大长度,直接抛异常;否则相应长度的切片
    • 没有分隔符,可读长度超过了最大长度就丢弃;否则不做处理
  2. 上一次有丢弃数据
    • 有分割符,说明这个是上一次丢弃的那部分所属的同一个消息的,直接跳过不做处理
    • 没有分隔符,继续丢弃,直接置为不可读

示例代码

Client:send
ByteBuf msg = Unpooled.buffer(SIZE);
firstMessage.writeBytes("ABC\nDEF\r\n".getBytes());

Server:使用解码器
new LineBasedFrameDecoder(64)

Server:handler接收数据
ByteBuf bf = ((ByteBuf)msg);
byte[] arr = new byte[bf.readableBytes()];
bf.readBytes(arr);
System.out.println("CodecHandler收到数据:"+ new String(arr));

打印接收数据
CodecHandler收到数据:ABC
CodecHandler收到数据:DEF

DelimiterBasedFrameDecoder:分隔符解码器

根据换行符(\r\n 或 \n)进行解码

以666、888分隔符为例

原始TCP数据报中字节流

 * +--------------+
 * | ABC666DEF888 |
 * +--------------+

解码后字节流

 * +-----+-----+
 * | ABC | DEF |
 * +-----+-----+

代码分析

//分隔符,数组
private final ByteBuf[] delimiters;
//每次能允许的最大解码长度
private final int maxFrameLength;
//是否跳过分隔符,默认true
private final boolean stripDelimiter;
//超过最大解码长度时,是否抛出异常,默认true
private final boolean failFast;
//是否丢弃超过最大限度的帧,默认false
private boolean discardingTooLongFrame;
//记录超过最大范围的字节数值
private int tooLongFrameLength;
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
    //如果分隔符是 \r\n 或 \n 直接使用行解码器解码
    if (lineBasedDecoder != null) {
        return lineBasedDecoder.decode(ctx, buffer);
    }
    // Try all delimiters and choose the delimiter which yields the shortest frame.
    //找到第一个分隔符位置
    int minFrameLength = Integer.MAX_VALUE;
    ByteBuf minDelim = null;
    for (ByteBuf delim: delimiters) {
        int frameLength = indexOf(buffer, delim);
        if (frameLength >= 0 && frameLength < minFrameLength) {
            minFrameLength = frameLength;
            minDelim = delim;
        }
    }

    if (minDelim != null) { //有分隔符
        //分隔符长度
        int minDelimLength = minDelim.capacity();
        ByteBuf frame;
        //丢弃超过最大限度的字节流,则这次找到的分隔符之前的字节流继续丢弃,然后充值discardingTooLongFrame和tooLongFrameLength
        if (discardingTooLongFrame) {
            // We've just finished discarding a very large frame.
            // Go back to the initial state.
            discardingTooLongFrame = false;
            buffer.skipBytes(minFrameLength + minDelimLength);

            int tooLongFrameLength = this.tooLongFrameLength;
            this.tooLongFrameLength = 0;
            if (!failFast) {
                fail(tooLongFrameLength);
            }
            return null;
        }

        //分隔符位置超过最大解码长度,直接丢弃并抛出异常
        if (minFrameLength > maxFrameLength) {
            // Discard read frame.
            buffer.skipBytes(minFrameLength + minDelimLength);
            fail(minFrameLength);
            return null;
        }
        //忽略分隔符,读取数据不包含分隔符;否则包含分隔符
        if (stripDelimiter) {
            frame = buffer.readRetainedSlice(minFrameLength);
            buffer.skipBytes(minDelimLength);
        } else {
            frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
        }

        return frame;
    } else { //没有分隔符
        if (!discardingTooLongFrame) {
            //没有丢弃超长字节流,但是这次可读长度超过了最大解码长度,跳过,同时设置tooLongFrameLength和discardingTooLongFrame
            if (buffer.readableBytes() > maxFrameLength) {
                // Discard the content of the buffer until a delimiter is found.
                tooLongFrameLength = buffer.readableBytes();
                buffer.skipBytes(buffer.readableBytes());
                discardingTooLongFrame = true;
                if (failFast) {
                    fail(tooLongFrameLength);
                }
            }
        } else {
            // Still discarding the buffer since a delimiter is not found.
            //没有分隔符并且之前已经有丢弃超长字节流,继续丢弃,同时累加tooLongFrameLength
            tooLongFrameLength += buffer.readableBytes();
            buffer.skipBytes(buffer.readableBytes());
        }
        return null;
    }
}

示例代码

Client:send
ByteBuf msg = Unpooled.buffer(SIZE);
firstMessage.writeBytes("ABC666DEF888".getBytes());

Server:使用解码器
ByteBuf[] delimiter = new ByteBuf[] {Unpooled.wrappedBuffer("666".getBytes()), Unpooled.wrappedBuffer("888".getBytes())};
new DelimiterBasedFrameDecoder(64, delimiter);

Server:handler接收数据
ByteBuf bf = ((ByteBuf)msg);
byte[] arr = new byte[bf.readableBytes()];
bf.readBytes(arr);
System.out.println("CodecHandler收到数据:"+ new String(arr));

打印接收数据
CodecHandler收到数据:ABC
CodecHandler收到数据:DEF

LengthFieldBasedFrameDecoder:可变长度解码器

可变长度解码器几乎所有和长度相关的二进制协议都可以通过TA来实现,非常强大,这里就不分析了,可以参考https://www.jianshu.com/p/a0a51fd79f62

相关文章

网友评论

      本文标题:netty源码分析(七) - 解码 - 2常用解码器

      本文链接:https://www.haomeiwen.com/subject/qslamktx.html