美文网首页
ByteToMessageDecoder

ByteToMessageDecoder

作者: Pillar_Zhong | 来源:发表于2019-08-01 21:28 被阅读0次

    ByteToMessageDecoder是解码器的基类, 具有最基本的能力, 将字节解码成消息, 以便在pipeline上进行传递.

    1564365188195.png

    关键属性

    // 对入站数据进行临时缓冲, 直到它准备好处理
    ByteBuf cumulation;
    // 缓冲的策略
    private Cumulator cumulator = MERGE_CUMULATOR;
    // 是否只解码一次
    private boolean singleDecode;
    // 意思是解码有没有结果, true为没有
    private boolean decodeWasNull;
    // 这批数据是不是第一次处理
    private boolean first;
    private int discardAfterReads = 16;
    private int numReads;
    

    channelRead

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 如果消息是ByteBuf类型
        if (msg instanceof ByteBuf) {
            // 从对象池取出一个CodecOutputList, 用来收集解码后的消息
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                // 看累积器是不是为空来决定是不是首次处理
                first = cumulation == null;
                // 看是不是第一次处理,如果是,那么直接赋予累积器
                if (first) {
                    cumulation = data;
                } else {
                    // 否则累加到累积器, 见cumulator部分
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                // 对消息进行解码
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                // 如果累积器的内容已经读取完毕,那么回收掉
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                // 当这批数据已经读了有16次之多后,需要整理下内存
                } else if (++ numReads >= discardAfterReads) {               
                    numReads = 0;
                    // 这里主要是对累积器进行整理,清理discard区域为读写空间腾地方
                    discardSomeReadBytes();
                }
                
                // 将解码后的结果通知下游, 且回收out容器
                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            // 如果不是ByteBuf类型,直接传递给下游
            ctx.fireChannelRead(msg);
        }
    }
    
    

    callDecode

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            // 如果缓冲器有内容可读
            while (in.isReadable()) {
                int outSize = out.size();
    
                // 如果out容器有内容,那么说明解码有结果了,那么马上需要通知下游handle
                // 通知完,清理容器out
                if (outSize > 0) {
                    fireChannelRead(ctx, out, outSize);
                    out.clear();
                    
                    // 如果此时hanler被移除,那么不用继续处理,直接退出
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }
    
                int oldInputLength = in.readableBytes();
                // 子类来处理具体解码的工作,最终将解码后的消息放在out里面就好
                decode(ctx, in, out);
    
                // 如果此时hanler被移除,那么不用继续处理,直接退出
                if (ctx.isRemoved()) {
                    break;
                }
                
                // 如果这时,out容器的大小没有变化,说明子类那边解码还没有结果
                if (outSize == out.size()) {
                    // 如果没有读取任何数据, 那么结束循环
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        // 如果有读取,但不足与产生结果,那么需要继续读取
                        continue;
                    }
                }
                
                // 如果有结果,但是根本就没有读取数据,那么不是很诡异么?
                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                            ".decode() did not read anything but decoded a message.");
                }
                
                // 是否只执行一次
                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable cause) {
            throw new DecoderException(cause);
        }
    }
    

    Cumulator

    MERGE_CUMULATOR

    public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
        @Override
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            ByteBuf buffer;
            // 简单的情况是当前的累积器空间不足,需要扩容
            // 因为read的时候,每隔一段时间都需要对累积器的内存空间进行整理,那么整理的过程会导致
            // 读写index变更, 进而导致浅拷贝后的ByteBuf不可用.
            if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                    || cumulation.refCnt() > 1) { 
                buffer = expandCumulation(alloc, cumulation, in.readableBytes());
            } else {
                buffer = cumulation;
            }
            buffer.writeBytes(in);
            in.release();
            return buffer;
        }
    };
    

    子缓冲区

    Netty中调用ByteBuf.duplicate(),ByteBuf.slice()和ByteBuf.order(ByteOrder)三个方法, 会创建一个子缓冲区,子缓冲区共享父缓冲区的内存区域。子缓冲区没有自己的引用计数,而是 共享父缓冲区的引用计数。

    当父缓冲区release的时候, 会引用计数清零, 导致该内存区域被回收, 进而影响子缓冲区, 导致读写失败. 那么需要注意的事, 子缓冲区需要显示调用retain来提示Netty有其他人在使用, 防止被错误回收. 这里带来额外的坏处是, 所有子缓冲区在使用完后, 要及时release, 防止内存泄漏.

    在前面的场景中可以看到, 解码器在解码完后视情况来决定要不要做内存整理, 而整理的过程会进行数据移动, 且按照整理后的结果重置read和write索引, 这样会影响到子缓冲区的读写. 下面是个简单的例子, 自己体会. 另外一个问题是, 如果源Buffer提前release, 那么子缓冲区也会读写异常.

    ByteBuf source = ByteBufAllocator.DEFAULT.buffer(20, 20);
    
    source.writeInt(1);
    source.readInt();
    
    source.writeInt(2);
    
    ByteBuf duplicate = source.duplicate();
    System.out.println("source" + source.toString());
    System.out.println("duplicate" + duplicate.toString());
    source.discardReadBytes();
    System.out.println("source" + source.toString());
    System.out.println("duplicate" + duplicate.toString());
    System.out.println("duplicate" + duplicate.readInt());
    duplicate.readerIndex(0);
    System.out.println("duplicate" + duplicate.toString());
    System.out.println("duplicate" + duplicate.readInt());
    
    source:PooledUnsafeDirectByteBuf(ridx: 4, widx: 16, cap: 20/20)
    duplicate:UnpooledDuplicatedByteBuf(ridx: 4, widx: 16, cap: 20/20)
    source:PooledUnsafeDirectByteBuf(ridx: 0, widx: 12, cap: 20/20)
    duplicate:UnpooledDuplicatedByteBuf(ridx: 4, widx: 16, cap: 20/20)
    duplicate:0
    duplicate:UnpooledDuplicatedByteBuf(ridx: 0, widx: 16, cap: 20/20)
    duplicate:2
    

    expandCumulation

    可以看到解决上面问题的方案也是简单粗暴, 直接重建一个ByteBuf, 将数据拷贝过来. 这样, 之前的子缓冲区也不会被之后可能的内存整理给影响.

    static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
        ByteBuf oldCumulation = cumulation;
        cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
        cumulation.writeBytes(oldCumulation);
        oldCumulation.release();
        return cumulation;
    }
    

    相关文章

      网友评论

          本文标题:ByteToMessageDecoder

          本文链接:https://www.haomeiwen.com/subject/qwkhdctx.html