美文网首页
Netty ReplayingDecoder 源码分析与特性解读

Netty ReplayingDecoder 源码分析与特性解读

作者: 嘟嘟碰碰叮叮当当 | 来源:发表于2021-01-12 15:57 被阅读0次

    转自:https://blog.csdn.net/wzq6578702/article/details/78826494

    在介绍ReplayingDecoder之前 想看一下它的用法,构建一个服务端和客户端的模型:

    服务端:

    public class MyServer {
        public static void main(String[] args) throws InterruptedException {
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try{
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                        .childHandler(new MyServerInitializer());
                ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
                channelFuture.channel().closeFuture().sync();
            }finally{
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    

    服务端initializer:

    public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipline = ch.pipeline();
            pipline.addLast(new MyReplayingDecoder());//使用ReplayingDecoder
            pipline.addLast(new MyLongToByteEncoder());
            pipline.addLast(new MyServerHandler());
        }
    }
    

    ServerHandler:

    public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
            System.out.println(ctx.channel().remoteAddress()+" --> "+msg);
            ctx.writeAndFlush(654321L);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    

    MyReplayingDecoder:

    public class MyReplayingDecoder extends ReplayingDecoder<Void> {
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            System.out.println("MyReplayingDecoder decode invoked!");
            //注意没有判断字节数!!!!
            out.add(in.readLong());
        }
    }
    

    MyLongToByteEncoder

    public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
        @Override
        protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
            System.out.println("encode invoked");
            System.out.println(msg);
            out.writeLong(msg);
        }
    }
    

    客户端:

    public class Myclient {
        public static void main(String[] args) throws InterruptedException {
            EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyClientIniatializer());
                ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
                channelFuture.channel().writeAndFlush("hello");
                channelFuture.channel().closeFuture().sync();
            } finally {
                eventLoopGroup.shutdownGracefully();
            }
        }
    }
    
    

    客户端Iniatializer:

    public class MyClientIniatializer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipline = ch.pipeline();
    
            pipline.addLast(new MyReplayingDecoder());
            pipline.addLast(new MyLongToByteEncoder());
            pipline.addLast(new MyClientHandler());
        }
    }
    
    

    客户端Handler:

    public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
            System.out.println(ctx.channel().remoteAddress());
            System.out.println("client output "+msg);
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.writeAndFlush(123456L);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            super.exceptionCaught(ctx, cause);
            ctx.close();
        }
    }
    

    运行服务端,之后运行客户端:
    服务端输出结果:

    MyReplayingDecoder decode invoked!
    /127.0.0.1:4448 --> 123456
    encode invoked
    654321
    

    客户端输出结果:

    encode invoked
    123456
    MyReplayingDecoder decode invoked!
    localhost/127.0.0.1:8899
    client output 654321
    

    加了下,数据传输流程分析

    /**
     * describe:  1. client channelActive 发送12345
     *            2. client 触发encode方法发送到server端
     *            3. server 触发decode方法
     *            4. server channelRead0
     *            5. server write flush 发送出去
     *            6. server 触发encode方法发送到客户端
     *            7. clinet 触发decode方法
     *            8. clinet 触发channelRead0
     */
    

    ReplayingDecoder java doc

    ByteToMessageDecoder一种特殊变体,它可以在阻塞I / O范例中实现非阻塞解码器。
    最大的区别ReplayingDecoder和ByteToMessageDecoder是ReplayingDecoder可以
    让你实现decode()和decodeLast()方法,就像已经获得所有所需的字节,而不是检查
    所需的字节的可用性。

    例如,以下ByteToMessageDecoder实现:
       public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {
      
          @Override
         protected void decode(ChannelHandlerContext ctx,
                                 ByteBuf buf, List<Object> out) throws Exception {
      
           if (buf.readableBytes() < 4) {
              return;
           }
      
           buf.markReaderIndex();
           int length = buf.readInt();
      
           if (buf.readableBytes() < length) {
              buf.resetReaderIndex();
              return;
           }
      
           out.add(buf.readBytes(length));
         }
       }
    
    使用ReplayingDecoder了如下简化:
       public class IntegerHeaderFrameDecoder
            extends ReplayingDecoder<Void> {
      
         protected void decode(ChannelHandlerContext ctx,
                                 ByteBuf buf) throws Exception {
      
           out.add(buf.readBytes(buf.readInt()));
         }
       }
    
    这是如何运作的?

    ReplayingDecoder通过一个专门的ByteBuf实现其抛出一个Error的某些类型的时候有没有在缓冲区足够的数据。 在上面的IntegerHeaderFrameDecoder ,您仅假设调用buf.readInt()时缓冲区中将有4个或更多字节。 如果缓冲区中确实有4个字节,它将按预期返回整数标头。 否则,将引发Error并将控件返回给ReplayingDecoder 。 如果ReplayingDecoder捕获到Error ,则它将把缓冲区的readerIndex倒回到“初始”位置(即缓冲区的开头),并在缓冲区中收到更多数据时再次调用readerIndex decode(..)方法。
    请注意, ReplayingDecoder始终会抛出相同的缓存Error实例,以避免创建新Error并为每次抛出填充其堆栈跟踪的开销。

    局限性

    以简单为代价, ReplayingDecoder强制您执行一些限制:
    禁止某些缓冲区操作。
    如果网络速度慢且消息格式复杂,则性能可能会变差,这与上面的示例不同。 在这种情况下,您的解码器可能不得不一遍又一遍地解码消息的相同部分。
    您必须记住,可以多次调用decode(..)方法来解码一条消息。 例如,以下代码将不起作用:

     public class MyDecoder extends ReplayingDecoder<Void> {
      
         private final Queue<Integer> values = new LinkedList<Integer>();
      
          @Override
         public void decode(.., ByteBuf buf, List<Object> out) throws Exception {
      
           // A message contains 2 integers.
           values.offer(buf.readInt());
           values.offer(buf.readInt());
      
           // This assertion will fail intermittently since values.offer()
           // can be called more than two times!
           assert values.size() == 2;
           out.add(values.poll() + values.poll());
         }
       }
    

    正确的实现如下所示,您还可以利用“检查点”功能,下一部分将对此进行详细说明。

     public class MyDecoder extends ReplayingDecoder<Void> {
      
         private final Queue<Integer> values = new LinkedList<Integer>();
      
          @Override
         public void decode(.., ByteBuf buf, List<Object> out) throws Exception {
      
           // Revert the state of the variable that might have been changed
           // since the last partial decode.
           values.clear();
      
           // A message contains 2 integers.
           values.offer(buf.readInt());
           values.offer(buf.readInt());
      
           // Now we know this assertion will never fail.
           assert values.size() == 2;
           out.add(values.poll() + values.poll());
         }
       }
    
    改善表现

    幸运的是,使用checkpoint()方法可以显着提高复杂解码器实现的性能。
    该checkpoint()这样方法更新缓冲区的“初始”位置ReplayingDecoder倒回readerIndex缓冲剂与在那里你叫的最后一个位置checkpoint()方法。

    用Enum调用checkpoint(T)

    尽管您可以只使用checkpoint()方法并自己管理解码器的状态,
    但是管理解码器状态的最简单方法是
    创建一个代表解码器当前状态的Enum类型并调用checkpoint(T)状态改变时的方法。
    根据要解码的消息的复杂性,可以根据需要设置任意多个状态:

       public enum MyDecoderState {
         READ_LENGTH,
         READ_CONTENT;
       }
      
       public class IntegerHeaderFrameDecoder
            extends ReplayingDecoder<MyDecoderState> {
      
         private int length;
      
         public IntegerHeaderFrameDecoder() {
           // Set the initial state.
           super(MyDecoderState.READ_LENGTH);
         }
      
          @Override
         protected void decode(ChannelHandlerContext ctx,
                                 ByteBuf buf, List<Object> out) throws Exception {
           switch (state()) {
           case READ_LENGTH:
             length = buf.readInt();
             checkpoint(MyDecoderState.READ_CONTENT);
           case READ_CONTENT:
             ByteBuf frame = buf.readBytes(length);
             checkpoint(MyDecoderState.READ_LENGTH);
             out.add(frame);
             break;
           default:
             throw new Error("Shouldn't reach here.");
           }
         }
       }
    
    没有参数调用checkpoint()

    管理解码器状态的另一种方法是自己管理。

       public class IntegerHeaderFrameDecoder
            extends ReplayingDecoder<Void> {
      
         private boolean readLength;
         private int length;
      
          @Override
         protected void decode(ChannelHandlerContext ctx,
                                 ByteBuf buf, List<Object> out) throws Exception {
           if (!readLength) {
             length = buf.readInt();
             readLength = true;
             checkpoint();
           }
      
           if (readLength) {
             ByteBuf frame = buf.readBytes(length);
             readLength = false;
             checkpoint();
             out.add(frame);
           }
         }
       }
    
    用流水线中的另一个解码器替换一个解码器

    如果你打算写一个协议复用器,你可能会想更换ReplayingDecoder与另一个(协议检测) ReplayingDecoder , ByteToMessageDecoder或MessageToMessageDecoder (实际协议解码器)。 不能仅通过调用ChannelPipeline.replace(ChannelHandler, String, ChannelHandler)来实现此目的,但是需要一些其他步骤:

    
       public class FirstDecoder extends ReplayingDecoder<Void> {
      
            @Override
           protected void decode(ChannelHandlerContext ctx,
                                   ByteBuf buf, List<Object> out) {
               ...
               // Decode the first message
               Object firstMessage = ...;
      
               // Add the second decoder
               ctx.pipeline().addLast("second", new SecondDecoder());
      
               if (buf.isReadable()) {
                   // Hand off the remaining data to the second decoder
                   out.add(firstMessage);
                   out.add(buf.readBytes(super.actualReadableBytes()));
               } else {
                   // Nothing to hand off
                   out.add(firstMessage);
               }
               // Remove the first decoder (me)
               ctx.pipeline().remove(this);
           }
    

    几种常见的编解码器:

    LineBasedFrameDecoder

    解码器,将接收到的ByteBuf在行尾拆分。
    "\n"和"\r\n"都被处理。 有关基于分隔符的更通用解码器,请参见DelimiterBasedFrameDecoder 
    

    FixedLengthFrameDecoder

    解码器,将接收到的ByteBuf为固定的字节数。 例如,如果您收到以下四个分段的数据包:
       +---+----+------+----+
       | A | BC | DEFG | HI |
       +---+----+------+----+
       
    FixedLengthFrameDecoder (3)会将它们解码为以下三个具有固定长度的数据包:
       +-----+-----+-----+
       | ABC | DEF | GHI |
       +-----+-----+-----+
       
    

    DelimiterBasedFrameDecoder

    一种解码器, ByteBuf通过一个或多个定界符对接收到的ByteBuf拆分。 这对于解码以分隔符(例如NUL或换行符)结尾的帧特别有用。
    预定义的分隔符
    为了方便起见, Delimiters定义了常用的定界符。
    指定多个定界符
    DelimiterBasedFrameDecoder允许您指定多个定界符。 如果在缓冲区中找到多个定界符,它将选择产生`最短帧`的定界符。 例如,如果缓冲区中包含以下数据:
       +--------------+
       | ABC\nDEF\r\n |
       +--------------+
       
    DelimiterBasedFrameDecoder ( Delimiters.lineDelimiter() )将选择'\n'作为第一个定界符并产生两个帧:
       +-----+-----+
       | ABC | DEF |
       +-----+-----+
       
    而不是错误地选择'\r\n'作为第一个定界符:
       +----------+
       | ABC\nDEF |
       +----------+
       
    

    LengthFieldBasedFrameDecoder

    解码器按消息中的length字段的值动态拆分接收到的ByteBuf 。 当您解码二进制消息时,此消息特别有用,该二进制消息具有代表消息正文或整个消息长度的整数头字段。
    LengthFieldBasedFrameDecoder具有许多配置参数,因此它可以解码带有长度字段的任何消息,这在专有的客户端-服务器协议中经常出现。 以下是一些示例,可让您基本了解哪个选项可以执行什么操作。
    2字节长度的字段,偏移量为0,不剥离标题
    在此示例中,长度字段的值为12(0x0C) ,代表“ HELLO,WORLD”的长度。 默认情况下,解码器假定length字段表示在length字段之后的字节数。 因此,可以使用简单的参数组合对其进行解码。
       lengthFieldOffset   = 0
       lengthFieldLength   = 2
       lengthAdjustment    = 0
       initialBytesToStrip = 0 (= do not strip header)
      
       BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
       +--------+----------------+      +--------+----------------+
       | Length | Actual Content |----->| Length | Actual Content |
       | 0x000C | "HELLO, WORLD" |      | 0x000C | "HELLO, WORLD" |
       +--------+----------------+      +--------+----------------+
       
    2个字节的长度字段,偏移量为0,带头
    因为我们可以通过调用ByteBuf.readableBytes()获得内容的长度,所以您可能希望通过指定initialBytesToStrip来剥离长度字段。 在此示例中,我们指定2 ,它与length字段的长度相同,以剥离前两个字节。
       lengthFieldOffset   = 0
       lengthFieldLength   = 2
       lengthAdjustment    = 0
       initialBytesToStrip = 2 (= the length of the Length field)
      
       BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)
       +--------+----------------+      +----------------+
       | Length | Actual Content |----->| Actual Content |
       | 0x000C | "HELLO, WORLD" |      | "HELLO, WORLD" |
       +--------+----------------+      +----------------+
       
    2个字节的长度字段,偏移量为0,不剥离标题,该长度字段表示整个消息的长度
    在大多数情况下,长度字段仅表示消息正文的长度,如前面的示例所示。 但是,在某些协议中,长度字段表示整个消息的长度,包括消息头。 在这种情况下,我们指定一个非零的lengthAdjustment 。 因为此示例消息中的length值总是比主体长度大2 ,所以我们将-2指定为lengthAdjustment以进行补偿。
       lengthFieldOffset   =  0
       lengthFieldLength   =  2
       lengthAdjustment    = -2 (= the length of the Length field)
       initialBytesToStrip =  0
      
       BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
       +--------+----------------+      +--------+----------------+
       | Length | Actual Content |----->| Length | Actual Content |
       | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |
       +--------+----------------+      +--------+----------------+
       
    3个字节的长度字段位于5个字节的报头末尾,不剥离报头
    以下消息是第一个示例的简单变体。 该消息之前会附加一个额外的标头值。 lengthAdjustment再次为零,因为解码器始终在帧长度计算过程中考虑前置数据的长度。
       lengthFieldOffset   = 2 (= the length of Header 1)
       lengthFieldLength   = 3
       lengthAdjustment    = 0
       initialBytesToStrip = 0
      
       BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
       +----------+----------+----------------+      +----------+----------+----------------+
       | Header 1 |  Length  | Actual Content |----->| Header 1 |  Length  | Actual Content |
       |  0xCAFE  | 0x00000C | "HELLO, WORLD" |      |  0xCAFE  | 0x00000C | "HELLO, WORLD" |
       +----------+----------+----------------+      +----------+----------+----------------+
       
    5字节标题开头的3字节长度字段,不剥离标题
    这是一个高级示例,显示了在length字段和消息正文之间存在一个额外的标头的情况。 您必须指定一个正的lengthAdjustment,以便解码器将额外的标头计入帧长度计算中。
       lengthFieldOffset   = 0
       lengthFieldLength   = 3
       lengthAdjustment    = 2 (= the length of Header 1)
       initialBytesToStrip = 0
      
       BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
       +----------+----------+----------------+      +----------+----------+----------------+
       |  Length  | Header 1 | Actual Content |----->|  Length  | Header 1 | Actual Content |
       | 0x00000C |  0xCAFE  | "HELLO, WORLD" |      | 0x00000C |  0xCAFE  | "HELLO, WORLD" |
       +----------+----------+----------------+      +----------+----------+----------------+
       
    2个字节的长度字段在4字节标题的中间偏移量1处,除去第一个标题字段和长度字段
    这是以上所有示例的组合。 在length字段之前有前置报头,在length字段之后有多余的报头。 前置标头会影响lengthFieldOffset ,额外标头会影响lengthAdjustment 。 我们还指定了一个非零的initialBytesToStrip来从帧中剥离长度字段和前置标头。 如果不想剥离前置标头,则可以将initialBytesToSkip指定为0 。
       lengthFieldOffset   = 1 (= the length of HDR1)
       lengthFieldLength   = 2
       lengthAdjustment    = 1 (= the length of HDR2)
       initialBytesToStrip = 3 (= the length of HDR1 + LEN)
      
       BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
       +------+--------+------+----------------+      +------+----------------+
       | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
       | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
       +------+--------+------+----------------+      +------+----------------+
       
    2个字节的长度字段位于4字节标题的中间偏移量1处,去掉第一个标题字段和length字段,length字段代表整个消息的长度
    让我们再对前面的示例进行一些修改。 与前一个示例的唯一区别是,长度字段表示整个消息的长度,而不是消息主体,就像第三个示例一样。 我们必须将HDR1的长度和Length计入lengthAdjustment中。 请注意,我们不需要考虑HDR2的长度,因为length字段已经包含了整个标头长度。
       lengthFieldOffset   =  1
       lengthFieldLength   =  2
       lengthAdjustment    = -3 (= the length of HDR1 + LEN, negative)
       initialBytesToStrip =  3
      
       BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
       +------+--------+------+----------------+      +------+----------------+
       | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
       | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
       +------+--------+------+----------------+      +------+----------------+
       
    也可以看看:
    LengthFieldPrepender
    

    相关文章

      网友评论

          本文标题:Netty ReplayingDecoder 源码分析与特性解读

          本文链接:https://www.haomeiwen.com/subject/lqceaktx.html