美文网首页vert.x & netty
Netty系列(7) ByteBuf & zero-copy

Netty系列(7) ByteBuf & zero-copy

作者: suxin1932 | 来源:发表于2020-04-01 22:16 被阅读0次

    1.ByteBuf

    1.1 概述

    1.网络数据传输的基本单位是字节,java NIO提供ByteBuffer作为字节的容器,但是ByteBuffer使用起来过于复杂和繁琐。
    
    2.ByteBuf是netty的Server与Client之间通信的数据传输载体(Netty的数据容器),
    它提供了一个byte数组(byte[])的抽象视图,既解决了JDK API的局限性,又为网络应用程序的开发者提供了更好的API
    
    3.ByteBuffer缺点
    >> ByteBuffer长度固定,一旦分配完成,它的容量不能动态扩展和收缩,
    当需要编码的POJO对象大于ByteBuffer的容量时,会发生索引越界异常;
    >> ByteBuffer只有一个标识位置的指针position,读写的时候需要手工调用flip()和rewind()等,
    使用者必须小心谨慎地处理这些API,否则很容易导致程序处理失败;
    >> ByteBuffer的API功能有限,一些高级和实用的特性它不支持,需要使用者自己编程实现。
    
    4.ByteBuf优点
    >> 容量可以按需增长
    >> 读写模式切换不需要调用flip()
    >> 读写使用了不同的索引
    >> 支持方法的链式调用
    >> 支持引用计数
    >> 支持池化
    >> 可以被用户自定义的缓冲区类型扩展
    >> 通过内置的复合缓冲区类型实现透明的零拷贝
    
    #5.三种缓冲区
    >> HeapByteBuf
    将数据存储到JVM的堆空间中, 并将实际的数据放到 byte array 中来实现
    优点: 可以快速的创建和释放, 并且提供了直接访问内部字节数组的方法
    缺点: 每次读写数据时, 都需要先将数据复制到直接缓冲区中, 然后再进行网络传输
    
    >> DirectByteBuf
    在堆外直接分配内存空间, 不会占用堆的容量空间, 因为它是由操作系统在本地内存进行的数据分配
    优点: 在使用socket进行数据传输时, 性能好, 因为是zero copy, 不需要将数据从JVM中复制到直接缓冲区中
    缺点: 因为直接在OS的内存中, 所以内存空间的分配与释放比堆空间更加复杂, 速度要慢一些.
    netty通过提供内存池来解决'这个缺点', 直接缓冲区不是通过字节数组的方式来访问数据的
    
    >> composite buffer
    
    #6.适用场景:
    1.对于后端的业务消息的编解码来说, 推荐使用 HeapByteBuf
    2.对于IO通信线程在读写缓冲区时, 推荐使用 DirectByteBuf
    

    1.2 API详解

    1.ByteBuf工作机制:
    ByteBuf维护了两个不同的索引,一个用于读取,一个用于写入。
    readerIndex和writerIndex的初始值都是0,当从ByteBuf中读取数据时,
    它的readerIndex将会被递增(它不会超过writerIndex),当向ByteBuf写入数据时,它的writerIndex会递增。
    
    2.名称以readXXX或者writeXXX开头的ByteBuf方法,会推进对应的索引,而以setXXX或getXXX开头的操作不会。
    
    3.在读取之后,0~readerIndex的就被视为discard的,
    调用discardReadBytes方法,可以释放这部分空间,它的作用类似ByteBuffer的compact()方法。
    
    4.readerIndex和writerIndex之间的数据是可读取的,等价于ByteBuffer的position和limit之间的数据。
    writerIndex和capacity之间的空间是可写的,等价于ByteBuffer的limit和capacity之间的可用空间。
    

    1.2.1 readerIndex 与 writerIndex 变化情况

    #初始分配
        +-------------------------------+
        |       writable bytes          |
        +-------------------------------+
        |                               |
        0=readerIndex=writerIndex       capacity
    
    #写入N个字节
        +------------------+-------------------+
        |  readable bytes  |    writable bytes |
        +------------------+-------------------+
        |                  |                   |
        0=readerIndex      N=writerIndex       capacity
    
    #读取M(<N)个字节之后
    
        +-------------------+------------------+------------------+
        | discardable bytes |  readable bytes  |  writable bytes  |
        +-------------------+------------------+------------------+
        |                   |                  |                  |
        0               M=readerIndex    N=writerIndex       capacity
    
    #调用discardReadBytes操作之后
        +------------------+----------------------+
        |  readable bytes  |    writable bytes    |
        +------------------+----------------------+
        |                  |                      |
        0=readerIndex   N-M=writerIndex         capacity
    
    #调用clear操作之后
        +-------------------------------+
        |       writable bytes          |
        +-------------------------------+
        |                               |
        0=readerIndex=writerIndex       capacity
    

    1.2.2 读写操作 API

    #1.setXXX 只会设定指定位置的 value, 不会改变 writerIndex; getXXX 只会获取指定位置的 value, 不会改变 readerIndex
    setBoolean (int , boolean)  设定给定索引处的 Boolean 值
    getBoolean(int)     返回给定索引处的 Boolean 值
    setByte(int index, int value)   设定给定索引处的字节值
    getByte(int)    返回给定索引处的字节
    getUnsignedByte(int )   将给定索引处的无符号字节值作为 short 返回
    setMedium(int index , int value)    设定给定索引处的 24 位的中等 int值
    getMedium(int)  返回给定索引处的 24 位的中等 int 值
    getUnsignedMedium (int)     返回给定索引处的无符号的 24 位的中等 int 值
    setint(int index , int value)   设定给定索引处的 int 值
    getint (int)    返回给定索引处的 int 值
    getUnsignedint(int)     将给定索引处的无符号 int 值作为 long 返回
    setLong(int index, long value)  设定给定索引处的 long 值
    getLong(int)    返回给定索引处的 long 值
    setShort(int index, int value)  设定给定索引处的 short 值
    getShort(int)   返回给定索引处的 short 值
    getUnsignedShort(int)   将给定索引处的无符号 short 值作为 int 返回
    getBytes (int, …)   将该缓冲区中从给定索引开始的数据传送到指定的目的地
    
    #2.writeXXX 设定指定位置的 value, 同时改变 writerIndex; readXXX 获取指定位置的 value, 同时改变 readerIndex
    readBytes(int length)  
    writeBytes(byte[] bytes)
    

    1.2.3 派生 API

    派生缓冲区为ByteBuf提供以专门的方式呈现其内容的视图。
    duplicate()
    slice()
    slice(int, int)
    Unpooled.unmodifiableBuffer ()
    order (ByteOrder)
    readSlice (int)
    
    每个这些方法都将返回一个新的ByteBuf实例,它具有自己的读索引、写索引和标记索引。
    其内部存储和 JDK 的ByteBuffer一样也是共享的。这使得派生缓冲区的创建成本是很低廉的,
    但是这也意味着,如果你修改了它的内容,也同时修改了其对应的源实例,所以要小心。
    

    1.2.4 ByteBuf 复制

    copy()
    如果需要一个现有缓冲区的真实副本,请使用copy()或者copy(int,int)。
    不同于派生缓冲区,由这个调用所返回的ByteBuf拥有独立的数据副本 。
    

    1.2.5 其他 API

    isReadable ()   如果至少有一个字节可供读取,则返回 true
    isWritable ()   如果至少有一个字节可被写入,则返回 true
    readableBytes()     返回可被读取的字节数
    writableBytes()     返回可被写入的字节数
    capacity()  返回 ByteBuf 可容纳的字节数 。在此之后,它会尝试再次扩展直到达到maxCapacity ()
    maxCapacity()   返问 ByteBuf 可以容纳的最大字节数
    hasArray()  如果 ByteBuf 由一个字节数组支撑,则返回 true (只有 堆内存 ByteBuf, 才返回 true; 堆外内存 ByteBuf, 返回 false)
    array ()    如果 ByteBuf 由一个字节数组支撑则返问该数组;否则,它将抛出 一个 UnsupportedOperat工onException 异常
    discardReadBytes()    将readerIndex设为0, writerIndex变为 writerIndex-readerIndex。
    clear()    将readerIndex和writerIndex都设为0。
    

    1.5 相关代码实现

    private static final String HELLO = "hello";
    
    /**
     * 1.初始化一个池化的 byteBuf >>>>>>>>>>>>>>>>>>>>
     * readerIndex: {0}, writerIndex: {0}, isWritable: {true}, isReadable: {false}, capacity: {4}, maxCapacity: {1024}.
     * 2.调用 writeBytes 方法后 >>>>>>>>>>>>>>>>>>>>
     * readerIndex: {0}, writerIndex: {5}, isWritable: {true}, isReadable: {true}, capacity: {16}, maxCapacity: {1024}.
     * 3.调用 readBytes(int i) 方法后 >>>>>>>>>>>>>>>>>>>>
     * readerIndex: {2}, writerIndex: {5}, isWritable: {true}, isReadable: {true}, capacity: {16}, maxCapacity: {1024}.
     * 4.调用 discardReadBytes() 方法后 >>>>>>>>>>>>>>>>>>>>
     * readerIndex: {0}, writerIndex: {3}, isWritable: {true}, isReadable: {true}, capacity: {16}, maxCapacity: {1024}.
     * 5.调用 clear 方法后 >>>>>>>>>>>>>>>>>>>>
     * readerIndex: {0}, writerIndex: {0}, isWritable: {true}, isReadable: {false}, capacity: {16}, maxCapacity: {1024}.
     */
    @Test
    public void fn01() {
        // FIXME 初始化一个 directBuffer, 初始容量为4, 最大容量设置为 1024, 当超过4不超过1024时, 可扩容
        ByteBuf directBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(4, 1024);
        System.out.println("1.初始化一个池化的 byteBuf >>>>>>>>>>>>>>>>>>>>");
        System.out.println(String.format("readerIndex: {%s}, writerIndex: {%s}, isWritable: {%s}, isReadable: {%s}, capacity: {%s}, maxCapacity: {%s}.", directBuffer.readerIndex(), directBuffer.writerIndex(), directBuffer.isWritable(), directBuffer.isReadable(), directBuffer.capacity(), directBuffer.maxCapacity()));
    
        System.out.println("2.调用 writeBytes 方法后 >>>>>>>>>>>>>>>>>>>>");
        directBuffer.writeBytes(HELLO.getBytes(StandardCharsets.UTF_8));
        System.out.println(String.format("readerIndex: {%s}, writerIndex: {%s}, isWritable: {%s}, isReadable: {%s}, capacity: {%s}, maxCapacity: {%s}.", directBuffer.readerIndex(), directBuffer.writerIndex(), directBuffer.isWritable(), directBuffer.isReadable(), directBuffer.capacity(), directBuffer.maxCapacity()));
    
        System.out.println("3.调用 readBytes(int i) 方法后 >>>>>>>>>>>>>>>>>>>>");
        directBuffer.readBytes(2);
        System.out.println(String.format("readerIndex: {%s}, writerIndex: {%s}, isWritable: {%s}, isReadable: {%s}, capacity: {%s}, maxCapacity: {%s}.", directBuffer.readerIndex(), directBuffer.writerIndex(), directBuffer.isWritable(), directBuffer.isReadable(), directBuffer.capacity(), directBuffer.maxCapacity()));
    
        System.out.println("4.调用 discardReadBytes() 方法后 >>>>>>>>>>>>>>>>>>>>");
        directBuffer.discardReadBytes();
        System.out.println(String.format("readerIndex: {%s}, writerIndex: {%s}, isWritable: {%s}, isReadable: {%s}, capacity: {%s}, maxCapacity: {%s}.", directBuffer.readerIndex(), directBuffer.writerIndex(), directBuffer.isWritable(), directBuffer.isReadable(), directBuffer.capacity(), directBuffer.maxCapacity()));
    
        System.out.println("5.调用 clear 方法后 >>>>>>>>>>>>>>>>>>>>");
        directBuffer.clear();
        System.out.println(String.format("readerIndex: {%s}, writerIndex: {%s}, isWritable: {%s}, isReadable: {%s}, capacity: {%s}, maxCapacity: {%s}.", directBuffer.readerIndex(), directBuffer.writerIndex(), directBuffer.isWritable(), directBuffer.isReadable(), directBuffer.capacity(), directBuffer.maxCapacity()));
    }
    

    2.zero-copy

    #1.zero-copy
    "Zero-copy" describes computer operations in which the CPU does not 
    perform the task of copying data from one memory area to another.
    This is frequently used to save CPU cycles and memory bandwidth when transmitting a file over a network.
    
    即所谓的 Zero-copy, 就是在操作数据时, 不需要将数据 buffer 从一个内存区域拷贝到另一个内存区域. 
    因为少了一次内存的拷贝, 因此 CPU 的效率就得到的提升.
    
    #2.OS层面上的 zero-copy
    指避免在用户态(User-space) 与 内核态(Kernel-space) 之间来回拷贝数据. 
    例如 Linux 提供的 mmap 系统调用, 它可以将一段用户空间内存映射到内核空间, 
    当映射成功后, 用户对这段内存区域的修改可以直接反映到内核空间; 
    同样地, 内核空间对这段区域的修改也直接反映用户空间. 
    正因为有这样的映射关系, 我们就不需要在 用户态(User-space) 与 
    内核态(Kernel-space) 之间拷贝数据, 提高了数据传输的效率.
    
    #3.Netty 中的 Zero-copy 
    传统的zero-copy是IO传输过程中,数据无需中内核态到用户态、
    用户态到内核态的数据拷贝,减少拷贝次数。
    Netty的 Zero-coyp 完全是在用户态(Java 层面)的, 它的 Zero-copy 的
    更多的是偏向于优化数据操作,或者说传输层的zero-copy机制。
    由于协议传输过程中,通常会有拆包、合并包的过程,一般的做法就是System.arrayCopy了,
    但是Netty通过ByteBuf.slice以及Unpooled.wrappedBuffer等方法拆分、合并Buffer无需拷贝数据。
    
    Netty 的 Zero-copy 体现在如下几个个方面:
    >> Netty 提供了 CompositeByteBuf 类, 它可以将多个 ByteBuf 
    合并为一个逻辑上的 ByteBuf, 避免了各个 ByteBuf 之间的拷贝.
    >> 通过 wrap 操作, 我们可以将 byte[] 数组、ByteBuf、ByteBuffer等
    包装成一个 Netty ByteBuf 对象, 进而避免了拷贝操作.
    >> ByteBuf 支持 slice 操作, 因此可以将 ByteBuf 分解为
    多个共享同一个存储区域的 ByteBuf, 避免了内存的拷贝.
    >> 通过 FileRegion 包装的FileChannel.tranferTo 实现文件传输, 
    可以直接将文件缓冲区的数据发送到目标 Channel, 避免了传统通过循环 write 方式导致的内存拷贝问题.
    
    netty-zero-copy.png

    2.1 相关实现

    /**
     *
     * 1. composite 的 zero-copy 操作
     *                            CompositeByteBuf
     *      +-------------------+------------------+------------------+
     *      |               header                 |        body      |
     *      +-------------------+------------------+------------------+
     *                                (composite)
     *      +-------------------+------------------+------------------+
     *      |                          all data                       |
     *      +-------------------+------------------+------------------+
     *
     * 2. slice 的 zero-copy 操作
     *                                  ByteBuf
     *      +-------------------+------------------+------------------+
     *      |                          all data                       |
     *      +-------------------+------------------+------------------+
     *                                (slice)
     *      +-------------------+------------------+------------------+
     *      |               header                 |        body      |
     *      +-------------------+------------------+------------------+
     */
    @Test
    public void fn02() {
        System.out.println("1.copy 数组 >>>>>>>>>>>>>>>>>>>>>> ");
        // 方式1: 希望将一个 byte 数组转换为一个 ByteBuf 对象, 以便于后续的操作, 那么传统的做法是将此 byte 数组拷贝到 ByteBuf 中: array.clone()
        ByteBuf header = Unpooled.copiedBuffer("header".getBytes(StandardCharsets.UTF_8));
    
        System.out.println("2.zero-copy 数组 combine >>>>>>>>>>>>>>>>>>>>>> ");
        // 虽然看起来 CompositeByteBuf 是由两个 ByteBuf 组合而成的, 不过在 CompositeByteBuf 内部, 这两个 ByteBuf 都是单独存在的, CompositeByteBuf 只是逻辑上是一个整体.
        // addComponents(boolean increaseWriterIndex, ByteBuf... buffers) 第一个参数是 true, 表示当添加新的 ByteBuf 时, 自动递增 CompositeByteBuf 的 writeIndex.
        // compositeByteBuf.addComponents(header, body) 方法的 writeIndex 仍然是0, 因此此时我们就不可能从 compositeByteBuf 中读取到数据,
        CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer().addComponents(true, Unpooled.copiedBuffer("a".getBytes(StandardCharsets.UTF_8)), Unpooled.copiedBuffer("b".getBytes(StandardCharsets.UTF_8)));
    
        System.out.println("3.zero-copy 数组 combine >>>>>>>>>>>>>>>>>>>>>> ");
        // 方式2: 也可以使用 Unpooled 的相关方法, 包装这个 byte 数组, 生成一个新的 ByteBuf 实例, 而不需要进行拷贝操作
        // Unpooled.wrappedBuffer 方法来将 bytes 包装成为一个 UnpooledHeapByteBuf 对象, 而在包装的过程中, 是不会有拷贝操作的.
        // 即最后我们生成的生成的 ByteBuf 对象是和 bytes 数组共用了同一个存储空间, 对 bytes 的修改也会反映到 ByteBuf 对象中.
        ByteBuf body = Unpooled.wrappedBuffer("body".getBytes(StandardCharsets.UTF_8));
    
        // 方式3: 合并两个 ByteBuf, zero-copy
        ByteBuf combinedByteBuf = Unpooled.wrappedBuffer(header, body);
    
        System.out.println("4.zero-copy 数组 slice >>>>>>>>>>>>>>>>>>>>>> ");
        // 方式1: 不带参数的 slice 方法等同于 buf.slice(buf.readerIndex(), buf.readableBytes()) 调用, 即返回 buf 中可读部分的切片.
        // ByteBuf slice = combinedByteBuf.slice();
        ByteBuf headerBuf = combinedByteBuf.slice(0, 6);
        // 方式2: slice(int index, int length) 方法相对就比较灵活了, 我们可以设置不同的参数来获取到 buf 的不同区域的切片.
        ByteBuf bodyBuf = combinedByteBuf.slice(6, 10);
    
        // Netty 中使用 FileRegion 实现文件传输的零拷贝, 不过在底层 FileRegion 是依赖于 Java NIO FileChannel.transfer 的零拷贝功能.
        System.out.println("5.zero-copy file --> FileServerHandler 借助 DefaultFileRegion 实现 >>>>>>>>>>>>>>>>>>>>>> ");
    }
    
    
    /**
     * 代码中不断中源文件中读取定长数据到 temp 数组中, 然后再将 temp 中的内容写入目的文件,
     * 这样的拷贝操作对于小文件倒是没有太大的影响, 但是如果我们需要拷贝大文件时, 频繁的内存拷贝操作就消耗大量的系统资源了.
     *
     * @param srcFile
     * @param destFile
     */
    private void bioCopyFile(String srcFile, String destFile) {
        try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(srcFile));
             BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(destFile))) {
            byte[] bytes = new byte[1024];
            int length;
            while ((length = bis.read(bytes)) != -1) {
                bos.write(bytes, 0, length);
            }
        } catch (Exception e) {
            System.out.println("bioCopyFile error.");
            e.printStackTrace();
        }
    }
    
    /**
     * FileChannel 可以直接将源文件的内容直接拷贝(transferTo) 到目的文件中,
     * 而不需要额外借助一个临时 buffer, 避免了不必要的内存操作.
     * @param srcFile
     * @param destFile
     */
    private void nioCopyFile(String srcFile, String destFile) {
        try (RandomAccessFile srcRaf = new RandomAccessFile(srcFile, "r");
             RandomAccessFile destRaf = new RandomAccessFile(destFile, "rw")) {
            FileChannel srcFileChannel = srcRaf.getChannel();
            FileChannel destFileChannel = destRaf.getChannel();
            long position = 0;
            long count = srcFileChannel.size();
            srcFileChannel.transferTo(position, count, destFileChannel);
        } catch (Exception e) {
            System.out.println("nioCopyFile error.");
            e.printStackTrace();
        }
    }
    

    2.2 zero-copy 核心原理

    private static final class Component {
            final ByteBuf srcBuf; // the originally added buffer
            final ByteBuf buf; // srcBuf unwrapped zero or more times
            int srcAdjustment; // index of the start of this CompositeByteBuf relative to srcBuf
            int adjustment; // index of the start of this CompositeByteBuf relative to buf
            int offset; // offset of this component within this CompositeByteBuf
            int endOffset; // end offset of this component within this CompositeByteBuf
            private ByteBuf slice; // cached slice, may be null
            
            ...
    }
    
    
    >> CompositeByteBuf内部类Component, Component 数组中的每一个元素都存储了实际的 ByteBuf, wrap 后的 ByteBuf
    >> 每一个元素都包含了 本元素指向的ByteBuf的起始偏移量 offset  以及 结束偏移量 endOffset.
    >> 上一个元素的结束偏移量紧接着下一个元素的起始偏移量
    

    3. ByteBuf 的引用计数机制

    3.1 为什么要有引用计数器

    >> UnpooledHeapByteBuf 底下的byte[]能够依赖JVM GC自然回收;
    >> UnpooledDirectByteBuf底下是DirectByteBuffer,除了等JVM GC,最好也能主动进行回收;
    >> PooledHeapByteBuf 和 PooledDirectByteBuf,
    则必须要主动将用完的byte[]/ByteBuffer放回池里,否则内存就要爆掉。
    
    所以,Netty ByteBuf需要在JVM的GC机制之外,有自己的引用计数器和回收过程。
    

    3.2 引用计数器基本知识

    >> 计数器基于 AtomicIntegerFieldUpdater,为什么不直接用AtomicInteger?
    因为ByteBuf对象很多,如果都把int包一层AtomicInteger花销较大,
    而AtomicIntegerFieldUpdater只需要一个全局的静态变量。
    >> 所有ByteBuf的引用计数器初始值为1。
    >> 调用release(),将计数器减1,等于零时, deallocate()被调用,各种回收。
    >> 调用retain(),将计数器加1,即使ByteBuf在别的地方被人release()了,在本Class没喊cut之前,不要把它释放掉。
    >> 由duplicate(), slice()和order()所衍生的ByteBuf,与原对象共享底下的buffer,
    也共享引用计数器,所以它们经常需要调用retain()来显示自己的存在。
    >> 当引用计数器为0,底下的buffer已被回收,
    即使ByteBuf对象还在,对它的各种访问操作都会抛出异常。
    

    3.3 谁来负责Release

    在Netty里,因为Handler链的存在,ByteBuf经常要传递到下一个Hanlder去而不复还,
    所以规则变成了谁是最后使用者,谁负责释放。
    
    另外,更要注意的是各种异常情况,ByteBuf没有成功传递到下一个Hanlder,还在自己地界里的话,一定要进行释放。
    
    // 1.InBound Message
    假设在AbstractNioByteChannel.NioByteUnsafe.read() 处创建了ByteBuf, 
    并调用 pipeline.fireChannelRead(byteBuf) 送入Handler链。
    根据上面的谁最后谁负责原则,每个Handler对消息可能有三种处理方式
    >> 对原消息不做处理,调用 ctx.fireChannelRead(msg)把原消息往下传,那不用做什么释放。
    >> 将原消息转化为新的消息并调用 ctx.fireChannelRead(newMsg)往下传,那必须把原消息release掉。
    >> 如果已经不再调用ctx.fireChannelRead(msg)传递任何消息,那更要把原消息release掉。
    
    假设每一个Handler都把消息往下传,Handler并也不知道谁是启动Netty时所设定的Handler链的最后一员,
    所以Netty在Handler链的最末补了一个TailHandler,如果此时消息仍然是ReferenceCounted类型就会被release掉。
     
    // 2.OutBound Message
    要发送的消息由应用所创建,并调用 ctx.writeAndFlush(msg) 进入Handler链。
    在每个Handler中的处理类似InBound Message,最后消息会来到HeadHandler,
    再经过一轮复杂的调用,在flush完成后终将被release掉。
    
    // 3.异常发生时的释放
    多层的异常处理机制,有些异常处理的地方不一定准确知道ByteBuf之前释放了没有,
    可以在释放前加上引用计数大于0的判断避免释放失败;
    
    有时候不清楚ByteBuf被引用了多少次,但又必须在此进行彻底的释放,可以循环调用relase()直到返回true。
    

    3.4 内存泄漏检测

    所谓内存泄漏,主要是针对池化的ByteBuf。
    ByteBuf对象被JVM GC掉之前,没有调用release()把底下的DirectByteBuffer或byte[]归还到池里,会导致池越来越大。
    而非池化的ByteBuf,即使像DirectByteBuf那样可能会用到System.gc(),但终归会被release掉的,不会出大事。
    
    // Netty内存泄漏的监测机制
    Netty默认会从分配的ByteBuf里抽样出大约1%的来进行跟踪。如果泄漏,会有如下语句打印:
    > LEAK: ByteBuf.release() was not called before its garbage-collected. 
    Enable advanced leak reporting to find out where the leak occurred. 
    To enable advanced leak reporting, specify 
    the JVM option '-Dio.netty.leakDetectionLevel=advanced' 
    or call ResourceLeakDetector.setLevel()
    
    这句话报告有泄漏的发生,提示你用-D参数,把防漏等级从默认的simple升到advanced,
    就能具体看到被泄漏的ByteBuf被创建和访问的地方。
    
    >> 禁用(DISABLED) - 完全禁止泄露检测,省点消耗。
    >> 简单(SIMPLE) - 默认等级,告诉我们取样的1%的ByteBuf是否发生了泄露,但总共一次只打印一次,看不到就没有了。
    >> 高级(ADVANCED) - 告诉我们取样的1%的ByteBuf发生泄露的地方。每种类型的泄漏(创建的地方与访问路径一致)只打印一次。对性能有影响。
    >> 偏执(PARANOID) - 跟高级选项类似,但此选项检测所有ByteBuf,而不仅仅是取样的那1%。对性能有绝大的影响。
    
    // 实现细节
    每当各种ByteBufAllocator 创建ByteBuf时,都会问问是否需要采样,
    Simple和Advanced级别下,就是以113这个素数来取模,命中了就创建一个里说的PhantomReference。
    然后创建一个Wrapper,包住ByteBuf和Reference。
    
    simple级别下,wrapper只在执行release()时调用Reference.clear(),Advanced级别下则会记录每一个创建和访问的动作。
    
    当GC发生,还没有被clear()的Reference就会被JVM放入到之前设定的ReferenceQueue里。
    
    在每次创建PhantomReference时,都会顺便看看有没有因为忘记执行release()把Reference给clear掉,
    在GC时被放进了ReferenceQueue的对象,有则以 "io.netty.util.ResourceLeakDetector"为logger name,打印Error级别的日日志。
    
    // 功能测试时
    最好开着"-Dio.netty.leakDetectionLevel=paranoid"。
    如果内存尚够,可以适当把-XX:MaxDirectMemorySize 调大。然后监控其使用量,及时报警。
    

    参考资料
    https://netty.io/wiki/reference-counted-objects.html (官网)
    https://www.cnblogs.com/xys1228/p/6088805.html
    https://www.cnblogs.com/crazymakercircle/p/9904544.html
    https://www.cnblogs.com/549294286/p/5168454.html
    https://emacsist.github.io/2018/04/28/%E7%BF%BB%E8%AF%91netty%E4%B8%AD%E7%9A%84%E5%BC%95%E7%94%A8%E8%AE%A1%E6%95%B0%E5%AF%B9%E8%B1%A1/

    相关文章

      网友评论

        本文标题:Netty系列(7) ByteBuf & zero-copy

        本文链接:https://www.haomeiwen.com/subject/ntpzuhtx.html