美文网首页
netty源码分析(七) - 解码 - 1ByteToMessa

netty源码分析(七) - 解码 - 1ByteToMessa

作者: 进击的蚂蚁zzzliu | 来源:发表于2020-10-24 17:13 被阅读0次

概述

socket流程.png

先来回顾下epoll模式下socket的大致传输流程,其中TCP报文摘除TCP头以后从内核缓冲区拷贝到socket接收缓冲区时有可能会发生粘包半包的问题。针对粘包半包问题netty提供了很多拆箱即用的编解码器,本节先来分析下netty提供的解码器是如何实现的。

解码器体系

Decoder.png
  1. 熟悉的套路netty提供的抽象类ByteToMessageDecoder继承ChannelInboundHandlerAdapter作为一个hanlder,这样就可以在pipeline的事件传播中灵活的使用;
  2. ByteToMessageDecoder提供模板方法并暴露抽象的decode方法,netty内置的几个解码器或用户自定义解码器只需要重写decode方法;
  3. netty内置了几个比较常用的解码器
    • FixedLengthFrameDecoder:固定长度解码器
    • LineBasedFrameDecoder:行解码器
    • DelimiterBasedFrameDecoder:分隔符解码器
    • LengthFieldBasedFrameDecoder:可变长度解码器

ByteToMessageDecoder

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //只处理字节缓冲区类型
    if (msg instanceof ByteBuf) {
        //就是带有回收器的ArrayList
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            first = cumulation == null;
            //新ByteBuf通过累加器进行累加,累加之后一起处理
            cumulation = cumulator.cumulate(ctx.alloc(), first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
            //解码
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            if (cumulation != null && !cumulation.isReadable()) {
                //不为空且不可读,释放累加器
                numReads = 0;
                cumulation.release();
                cumulation = null;
            } else if (++ numReads >= discardAfterReads) {
                //读取数据的次数大于阈值,则尝试丢弃已读的数据
                numReads = 0;
                discardSomeReadBytes();
            }

            int size = out.size();
            //是否要去调用ChannelHandlerContext的read()来设置监听读事件,可能没读完; channelReadComplete事件中使用
            firedChannelRead |= out.insertSinceRecycled();
            //向下传播channelRead事件
            fireChannelRead(ctx, out, size);
            out.recycle();
        }
    } else {
        //其他类型不处理,继续传递
        ctx.fireChannelRead(msg);
    }
}
  1. channelRead入口在AbstractNioByteChannel.read中,每次从接收缓冲区读取一段ByteBuf通过pipeline.fireChannelRead(byteBuf);传播过来;
  2. cumulator.cumulate新ByteBuf通过累加器进行累加,累加之后一起处理,主要针对,有不能解码成一个消息的数据,需要等待下一次读取到数据拼在一起进行解码;
  3. callDecode累加器中字节流进行解码
  4. cumulation != null && !cumulation.isReadable()不为空且不可读(即累加器中ByteBuf已经全部被解码),则释放累加器
  5. fireChannelRead向下传播channelRead事件

public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        //累加器是中是空的,并且新来的ByteBuf(in)又是连续的(说明不是复合缓冲区),则直接返回新ByteBuf
        if (!cumulation.isReadable() && in.isContiguous()) {
            // If cumulation is empty and input buffer is contiguous, use it directly
            cumulation.release();
            return in;
        }
        try {
            final int required = in.readableBytes();
            // 判断累加器是否需要扩容
            // 新的可读大于累加器的最大可写   或新可读大于最大可重新(即可以通过重新分配来增加可写)并且又是共享的  或累加器是只读的
            if (required > cumulation.maxWritableBytes() ||
                    (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
                    cumulation.isReadOnly()) {
                // Expand cumulation (by replacing it) under the following conditions:
                // - cumulation cannot be resized to accommodate the additional data
                // - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
                //   assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
                // 扩容也很简单,就是new一个新的更大的累加器,把原累加器以及新ByteBuf中数据copy过来,之后释放原累加器
                return expandCumulation(alloc, cumulation, in);
            }
            // 追加写入累加器
            cumulation.writeBytes(in, in.readerIndex(), required);
            // 把in的readerIndex置为writeIndex,相当于把in置为不可读
            in.readerIndex(in.writerIndex());
            return cumulation;
        } finally {
            // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
            // for whatever release (for example because of OutOfMemoryError)
            //返回之前释放当前ByteBuf
            in.release();
        }
    }
};
  1. expandCumulation累加器扩容
  2. cumulation.writeBytes追加写入累加器
  3. in.readerIndex把in的readerIndex置为writeIndex,相当于把in置为不可读

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        //累加器中有数据可读
        while (in.isReadable()) {
            int outSize = out.size();
            //有数据已经被解码
            if (outSize > 0) {
                //已经解码的数据往下传递
                fireChannelRead(ctx, out, outSize);
                //传递后清空out
                out.clear();

                // Check if this handler was removed before continuing with decoding.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See:
                // - https://github.com/netty/netty/issues/4635
                //在继续解码之前,检查是否已删除此context
                if (ctx.isRemoved()) {
                    break;
                }
                outSize = 0;
            }
            //记录累加器内还有多少字节可读
            int oldInputLength = in.readableBytes();
            //解码
            decodeRemovalReentryProtection(ctx, in, out);

            // Check if this handler was removed before continuing the loop.
            // If it was removed, it is not safe to continue to operate on the buffer.
            //
            // See https://github.com/netty/netty/issues/1664
            if (ctx.isRemoved()) {
                break;
            }

            //没有解码出数据
            if (outSize == out.size()) {
                //如果解码后累加器中可读数据 = 之前可读数据,说明没有读取到累加器中内容(说明不符合子类解码器的要求),则直接跳出循环;否则继续解码
                if (oldInputLength == in.readableBytes()) {
                    break;
                } else {
                    continue;
                }
            }
            //解码器解码出了数据,但是累加器中可读数据量并没变,说明解码器有问题,报错
            if (oldInputLength == in.readableBytes()) {
                throw new DecoderException(
                        StringUtil.simpleClassName(getClass()) +
                                ".decode() did not read anything but decoded a message.");
            }
            //是否只解码一次
            if (isSingleDecode()) {
                break;
            }
        }
    } catch (DecoderException e) {
        throw e;
    } catch (Exception cause) {
        throw new DecoderException(cause);
    }
}
  1. outSize > 0遍历过程中有数据已经被解码,则fireChannelRead已经解码的数据往下传递,然后out.clear();清空已解码数据
  2. decodeRemovalReentryProtection解码
  3. outSize == out.size()没有解码出数据,如果解码后累加器中可读数据 = 之前可读数据,说明没有读取到累加器中内容(说明不符合子类解码器的要求),则直接跳出循环;否则继续解码

final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
        throws Exception {
    //解码状态:0:初始  1:正在调用子类解码   2:解码器待删除
    decodeState = STATE_CALLING_CHILD_DECODE;
    try {
        //调用子类进行解码
        decode(ctx, in, out);
    } finally {
        //handleRemoved时decodeState置为2
        boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
        decodeState = STATE_INIT;
        if (removePending) {
            //如果有被设置待删除状态,就马上传播下去处理
            fireChannelRead(ctx, out, out.size());
            out.clear();
            //解码器从pipeline中移除
            handlerRemoved(ctx);
        }
    }
}
  1. decode调用子类进行解码
    其他分支逻辑比较简单,下一节分析下几个常用的内置解码器

相关文章

网友评论

      本文标题:netty源码分析(七) - 解码 - 1ByteToMessa

      本文链接:https://www.haomeiwen.com/subject/rnoumktx.html