美文网首页
Netty源码分析(五) ByteToMessageDecode

Netty源码分析(五) ByteToMessageDecode

作者: skyguard | 来源:发表于2018-11-07 15:03 被阅读0次

下面来分析Netty的解码器ByteToMessageDecoder,同样,ByteToMessageDecoder也是继承自ChannelInboundHandlerAdapter,内部有2个累加器:MERGE_CUMULATOR和COMPOSITE_CUMULATOR,MERGE_CUMULATOR是基于内存的复制,COMPOSITE_CUMULATOR则是维护了一个复杂的索引,可以减少内存的复制。

MERGE_CUMULATOR

 public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
    @Override
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        final ByteBuf buffer;
        if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() // 超过空间大小,需要扩容
                || cumulation.refCnt() > 1 // 引用大于 1 ,说明用户使用了 slice().retain() 或 duplicate().retain() 使refCnt增加并且大于 1 ,
                                           // 此时扩容返回一个新的累积区ByteBuf,方便用户对老的累积区ByteBuf进行后续处理。
                || cumulation.isReadOnly()) { // 只读,不可累加,所以需要改成可写
            // Expand cumulation (by replace it) when either there is not more room in the buffer
            // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
            // duplicate().retain() or if its read-only.
           
            // 扩容,返回新的 buffer
            buffer = expandCumulation(alloc, cumulation, in.readableBytes());
        } else {
            // 使用老的 buffer
            buffer = cumulation;
        }
        // 写入 in 到 buffer 中
        buffer.writeBytes(in);
        // 释放输入 in
        in.release();
        // 返回 buffer
        return buffer;
    }

};

COMPOSITE_CUMULATOR

public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {

    @Override
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        ByteBuf buffer;
        // 和 MERGE_CUMULATOR 的情况类似
        if (cumulation.refCnt() > 1) {
            // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user
            // use slice().retain() or duplicate().retain().

            buffer = expandCumulation(alloc, cumulation, in.readableBytes());
            buffer.writeBytes(in);
            in.release();
        } else {
            CompositeByteBuf composite;
            // 原来是 CompositeByteBuf 类型,直接使用
            if (cumulation instanceof CompositeByteBuf) {
                composite = (CompositeByteBuf) cumulation;
            // 原来不是 CompositeByteBuf 类型,创建,并添加到其中
            } else {
                composite = alloc.compositeBuffer(Integer.MAX_VALUE);
                composite.addComponent(true, cumulation);
            }
            // 添加 in 到 composite 中
            composite.addComponent(true, in);
            // 赋值给 buffer
            buffer = composite;
        }
        // 返回 buffer
        return buffer;
    }

};

然后在channelRead方法里进行解码

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        // 创建 CodecOutputList 对象
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            ByteBuf data = (ByteBuf) msg;
            // 判断是否首次
            first = cumulation == null;
            // 若首次,直接使用读取的 data
            if (first) {
                cumulation = data;
            // 若非首次,将读取的 data ,累积到 cumulation 中
            } else {
                cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
            }
            // 执行解码
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e; // 抛出异常
        } catch (Exception e) {
            throw new DecoderException(e); // 封装成 DecoderException 异常,抛出
        } finally {
            // cumulation 中所有数据被读取完,直接释放全部
            if (cumulation != null && !cumulation.isReadable()) {
                numReads = 0; // 重置 numReads 次数
                cumulation.release(); // 释放 cumulation
                cumulation = null; // 置空 cumulation
            // 读取次数到达 discardAfterReads 上限,释放部分的已读
            } else if (++ numReads >= discardAfterReads) {
                numReads = 0; // 重置 numReads 次数
                discardSomeReadBytes(); // 释放部分的已读
            }

            // 解码消息的数量
            int size = out.size();
            // 是否解码到消息
            decodeWasNull = !out.insertSinceRecycled();

            // 触发 Channel Read 事件。可能是多条消息
            fireChannelRead(ctx, out, size);

            // 回收 CodecOutputList 对象
            out.recycle();
        }
    } else {
        // 触发 Channel Read 事件
        ctx.fireChannelRead(msg);
    }
}

执行解码业务逻辑

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        // 循环读取,直到不可读
        while (in.isReadable()) {
            // 记录
            int outSize = out.size();
            // out 非空,说明上一次解码有解码到消息
            if (outSize > 0) {
                // 触发 Channel Read 事件。可能是多条消息
                fireChannelRead(ctx, out, outSize);
                // 清空
                out.clear();

               
                if (ctx.isRemoved()) {
                    break;
                }
                outSize = 0;
            }

            // 记录当前可读字节数
            int oldInputLength = in.readableBytes();

            // 执行解码。如果 Handler 准备移除,在解码完成后,进行移除。
            decodeRemovalReentryProtection(ctx, in, out);

            // 用户主动删除该 Handler ,继续操作 in 是不安全的
            // Check if this handler was removed before continuing the loop.
            // If it was removed, it is not safe to continue to operate on the buffer.
           
            if (ctx.isRemoved()) {
                break;
            }

            // 整列判断 `out.size() == 0` 比较合适。因为,如果 `outSize > 0` 那段,已经清理了 out 。
            if (outSize == out.size()) {
                // 如果未读取任何字节,结束循环
                if (oldInputLength == in.readableBytes()) {
                    break;
                // 如果可读字节发生变化,继续读取
                } else {
                    continue;
                }
            }

            // 如果解码了消息,但是可读字节数未变,抛出 DecoderException 异常。说明,有问题。
            if (oldInputLength == in.readableBytes()) {
                throw new DecoderException(StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message.");
            }

            // 如果开启 singleDecode ,表示只解析一次,结束循环
            if (isSingleDecode()) {
                break;
            }
        }
    } catch (DecoderException e) {
        throw e;
    } catch (Exception cause) {
        throw new DecoderException(cause);
    }
}

ByteToMessageDecoder的分析就到这里了。

相关文章

网友评论

      本文标题:Netty源码分析(五) ByteToMessageDecode

      本文链接:https://www.haomeiwen.com/subject/vnlvxqtx.html