美文网首页
Netty原理与基础(五)

Netty原理与基础(五)

作者: smallmartial | 来源:发表于2020-10-25 18:08 被阅读0次

    1.Decoder原理

    1.1什么叫作Netty的解码器呢?

    首先,它是一个InBound入站处理器,解码器负责处理“入站数据”。其次,它能将上一站Inbound入站处理器传过来的输入(Input)数据,进行数据的解码或者格式转换,然后输出(Output)到下一站Inbound入站处理器。一个标准的解码器将输入类型为ByteBuf缓冲区的数据进行解码,输出一个一个的Java POJO对象。Netty内置了这个解码器,叫作ByteToMessageDecoder,位在Netty的io.netty.handler.codec包中。


    image.png
    • ByteToMessageDecoder仅仅提供了一个流程性质的框架:它仅仅将子类的decode方法解码之后的Object结果,放入自己内部的结果列表List<Object>中,最终,父类会负责将List<Object>中的元素,一个一个地传递给下一个站

    1.2代码示例

    //解码
    public class Byte2IntegerDecoder extends ByteToMessageDecoder {
        @Override
        public void decode(ChannelHandlerContext ctx, ByteBuf in,
                           List<Object> out) {
            while (in.readableBytes() >= 4) {
                int i = in.readInt();
                Logger.info("解码出一个整数: " + i);
                out.add(i);
            }
        }
    }
    //处理程序
    public class IntegerProcessHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Integer integer = (Integer) msg;
            Logger.info("打印出一个整数: " + integer);
        }
    }
    //测试类
    public class Byte2IntegerDecoderTester {
        /**
         * 整数解码器的使用实例
         */
        @Test
        public void testByteToIntegerDecoder() {
            ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
                protected void initChannel(EmbeddedChannel ch) {
                    ch.pipeline().addLast(new Byte2IntegerDecoder());
                    ch.pipeline().addLast(new IntegerProcessHandler());
                }
            };
            EmbeddedChannel channel = new EmbeddedChannel(i);
    
            for (int j = 0; j < 100; j++) {
                ByteBuf buf = Unpooled.buffer();
                buf.writeInt(j);
                channel.writeInbound(buf);
            }
    
            try {
                Thread.sleep(Integer.MAX_VALUE);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    • ByteBuf缓冲区由谁负责进行引用计数和释放管理

    基类ByteToMessageDecoder负责解码器的ByteBuf缓冲区的释放工作,它会调用ReferenceCountUtil.release(in)方法,将之前的ByteBuf缓冲区的引用数减1。

    1.3 ReplayingDecoder解码器

    image.png

    ReplayingDecoder类是ByteToMessageDecoder的子类。其作用是:
    · 在读取ByteBuf缓冲区的数据之前,需要检查缓冲区是否有足够的字节。
    · 若ByteBuf中有足够的字节,则会正常读取;反之,如果没有足够的字节,则会停止解码。

    • ReplayingDecoder进行长度判断的原理,其实很简单:它的内部定义了一个新的二进制缓冲区类,对ByteBuf缓冲区进行了装饰,这个类名为ReplayingDecoderBuffer。该装饰器的特点是:在缓冲区真正读数据之前,首先进行长度的判断:如果长度合格,则读取数据;否则,抛出ReplayError。ReplayingDecoder捕获到ReplayError后,会留着数据,等待下一次IO事件到来时再读取。
    • ReplayingDecoder的作用,远远不止于进行长度判断,它更重要的作用是用于分包传输的应用场景

    1.4整数分包解码器

    • 底层通信协议是分包传输的,一份数据可能分几次达到对端。发送端出去的包在传输过程中会进行多次的拆分和组装。接收端所收到的包和发送端所发送的包不是一模一样的


      image.png

      在Java OIO流式传输中,不会出现这样的问题,因为它的策略是:不读到完整的信息,就一直阻塞程序,不向后执行。但是,在Java的NIO中,由于NIO的非阻塞性,就会出现上述情况

    可以使用ReplayingDecoder来解决
    要完成以上的例子,需要用到ReplayingDecoder一个很重要的属性——state成员属性。该成员属性的作用就是保存当前解码器在解码过程中的当前阶段

    • ReplayingDecoder源码
        protected ReplayingDecoder() {
            this((Object)null);
        }
    
        protected ReplayingDecoder(S initialState) {
            this.replayable = new ReplayingDecoderByteBuf();
            this.checkpoint = -1;
            this.state = initialState;
        }
    
        protected void checkpoint() {
            this.checkpoint = this.internalBuffer().readerIndex();
        }
    
        protected void checkpoint(S state) {
            this.checkpoint();
            this.state(state);
        }
    
    • checkpoint(Status)方法有两个作用
      (1)设置state属性的值,更新一下当前的状态。
      (2)还有一个非常大的作用,就是设置“读断点指针”。
      (3)“读断点指针”是ReplayingDecoder类的另一个重要的成员,它保存着装饰器内部ReplayingDecoderBuffer成员的起始读指针,有点儿类似于mark标记。当读数据时,一旦可读数据不够,ReplayingDecoderBuffer在抛出ReplayError异常之前,ReplayingDecoder会把读指针的值还原到之前的checkpoint(IntegerAddDecoder.Status)方法设置的“读断点指针”(checkpoint)。于是乎,在ReplayingDecoder下一次读取时,还会从之前设置的断点位置开始。

    1.5分包解码器

    在原理上,字符串分包解码和整数分包解码是一样的。有所不同的是:整数的长度是固定的,目前在Java中是4个字节;而字符串的长度不是固定的,是可变长度的,这就是一个小小的难题

    • 如何获取字符串的长度信息呢?
      (1)在协议的Head部分放置字符串的字节长度。Head部分可以用一个整型int来描述即可。
      (2)在协议的Content部分,放置的则是字符串的字节数组。
    public class StringReplayDecoder
            extends ReplayingDecoder<StringReplayDecoder.Status> {
    
        enum Status {
            PARSE_1, PARSE_2
        }
    
        private int length;
        private byte[] inBytes;
    
        public StringReplayDecoder() {
            //构造函数中,需要初始化父类的state 属性,表示当前阶段
            super(Status.PARSE_1);
        }
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in,
                              List<Object> out) throws Exception {
    
    
            switch (state()) {
                case PARSE_1:
                    //第一步,从装饰器ByteBuf 中读取长度
                    length = in.readInt();
                    inBytes = new byte[length];
                    // 进入第二步,读取内容
                    // 并且设置“读指针断点”为当前的读取位置
                    checkpoint(Status.PARSE_2);
                    break;
                case PARSE_2:
                    //第二步,从装饰器ByteBuf 中读取内容数组
                    in.readBytes(inBytes, 0, length);
                    out.add(new String(inBytes, "UTF-8"));
                    // 第二步解析成功,
                    // 进入第一步,读取下一个字符串的长度
                    // 并且设置“读指针断点”为当前的读取位置
                    checkpoint(Status.PARSE_1);
                    break;
                default:
                    break;
            }
    
        }
    
    public class StringProcessHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String s = (String) msg;
            System.out.println("打印: " + s);
        }
    }
    
    public class StringReplayDecoderTester {
        static String content = "smallmartial:Netty知识学习";
    
        /**
         * 字符串解码器的使用实例
         */
        @Test
        public void testStringReplayDecoder() {
            ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
                protected void initChannel(EmbeddedChannel ch) {
                    ch.pipeline().addLast(new StringReplayDecoder());
                    ch.pipeline().addLast(new StringProcessHandler());
                }
            };
            EmbeddedChannel channel = new EmbeddedChannel(i);
            byte[] bytes = content.getBytes(Charset.forName("utf-8"));
            for (int j = 0; j < 100; j++) {
                //1-3之间的随机数
                int random = RandomUtil.randInMod(3);
                ByteBuf buf = Unpooled.buffer();
                buf.writeInt(bytes.length * random);
                for (int k = 0; k < random; k++) {
                    buf.writeBytes(bytes);
                }
                channel.writeInbound(buf);
            }
            try {
                Thread.sleep(Integer.MAX_VALUE);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    image.png
    • ReplayingDecoder解码器不足
      (1)不是所有的ByteBuf操作都被ReplayingDecoderBuffer装饰类所支持,可能有些ByteBuf操作在ReplayingDecoder子类的decode实现方法中被使用时就会抛出ReplayError异常。
      (2)在数据解析逻辑复杂的应用场景,ReplayingDecoder在解析速度上相对较差。

    1.6MessageToMessageDecoder解码器

    MessageToMessageDecoder<I>。在继承它的时候,需要明确的泛型实参<I>。这个实参的作用就是指定入站消息JavaPOJO类型。

    相关文章

      网友评论

          本文标题:Netty原理与基础(五)

          本文链接:https://www.haomeiwen.com/subject/ddsnmktx.html