美文网首页Netty源码分析系列
Netty源码分析系列--13.ReplayingDecoder

Netty源码分析系列--13.ReplayingDecoder

作者: ted005 | 来源:发表于2018-11-17 23:41 被阅读16次

    ReplayingDecoder的原理

    • ReplayingDecoder继承了ByteToMessageDecoder,但是使用ReplayingDecoder的好处在于:ReplayingDecoder在处理数据时可以认为所有的数据(ByteBuf) 已经接收完毕,而不用判断接收数据的长度。
    
    public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
    
    • ReplayingDecoder使用了特殊的ByteBufReplayingDecoderByteBuf,当数据不够时会抛出一类特殊的错误,然后ReplayingDecoder会重置readerIndex并且再次调用decode方法。
    • 泛型<S>使用 枚举Enum 来表示状态,其内部存在状态管理。如果是无状态的,则使用 Void

    继承基类ByteToMessageDecoder的方式

    下面是一个用来解码带有长整型(Long)数据头head的解码器:

    public class LongHeaderFrameDecoder extends ByteToMessageDecoder {
      
        @Override
        protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {
    
            //总字节数<8,不够Long的长度,返回
            if (buf.readableBytes() < 8) {
              return;
            }
            
            buf.markReaderIndex();
            //读取head的值,例如6,说明body的长度是6个字节
            int length = buf.readLong();
            
            //body的总字节数不够6,返回
            if (buf.readableBytes() < length) {
              buf.resetReaderIndex();
              return;
            }
            
            //读取6个长度的body
            out.add(buf.readBytes(length));
        }
    }
    

    从以上代码可以看出,在decode方法中需要对数据的长度做判断,依据ByteBufreaderIndex来获取真实数据,逻辑比较复杂。

    继承基类ReplayingDecoder的方式

    如果以上的例子选择继承ReplayingDecoder,那逻辑会非常简单。由于不存在状态管理,所以泛型使用Void

    public class LongHeaderFrameDecoder extends ReplayingDecoder<Void> {
      
        @Override
        protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {
            // 读取head的值,例如6,说明body的长度是6个字节
            int length = buf.readLong();   
            // 读取6个长度的body
            out.add(buf.readBytes(length));
        }
    }
    
    

    状态管理和checkpoint方法

    状态可以使用枚举Enum来表示,如:

    public enum MyDecoderState {
         READ_HEAD,
         READ_BODY;
    }
    
    

    当调用checkpoint(MyDecoderState state)时,ReplayingDecoder会将当前readerIndex赋值给int类型的成员变量checkpoint,在后续数据读取过程中方便重置。

    protected void checkpoint(S state) {
        checkpoint();
        state(state);
    }
    
    protected void checkpoint() {
        checkpoint = internalBuffer().readerIndex();
    }
    
    

    使用状态管理后的LongHeaderFrameDecoder:

    public class LongHeaderFrameDecoder
            extends ReplayingDecoder<MyDecoderState> {
         // HEAD的长度
         private int length;
        
         public LongHeaderFrameDecoder() {
           // 初始状态是读取头部HEAD
           super(MyDecoderState.READ_LENGTH);
         }
        
          @Override
         protected void decode(ChannelHandlerContext ctx,
                                 ByteBuf buf, List<Object> out) throws Exception {
           switch (state()) {
               case READ_HEAD:
                 length = buf.readLong();
                 checkpoint(MyDecoderState.READ_BODY);
               case READ_BODY:
                 ByteBuf frame = buf.readBytes(length);
                 checkpoint(MyDecoderState.READ_BODY);
                 out.add(frame);
                 break;
               default:
                 throw new Error("Shouldn't reach here.");
               }
         }
    }
    
    

    相关文章

      网友评论

        本文标题:Netty源码分析系列--13.ReplayingDecoder

        本文链接:https://www.haomeiwen.com/subject/yflwfqtx.html