Netty的内存分配相关知识&零拷贝机制

作者: 右耳菌 | 来源:发表于2022-07-25 18:04 被阅读0次

    1. Netty 自己的ByteBuf

    ByteBuf是为解决ByteBuffer的问题和满足网络应用程序开发人员的日常需求而设计的。

    • JDK ByteBuffer的缺点:
    • 无法动态扩容
      长度固定,无法动态扩展和收缩,当数据大于ByteBuffer容量时,会发生索引越界异常。
    • API使用复杂
      读写的时候需要手工调用flip() 和 rewind() 等方法,使用时需要非常谨慎的使用这些api,否则很容易出现错误。

    2. ByteBuf做了哪些增强

    • API操作便捷
    • 动态扩容
    • 多种ByteBuf实现
    • 高效的零拷贝机制

    3. ByteBuf 操作

    ByteBuf三个重要属性:capacity容量、readerIndex读取位置、writerlndex写入位置。
    提供了两个指针变量来支持顺序读和写操作,分别是readerlndex和写操作writerIndex。

    • 常用方法定义:
    • 随机访问索引 getByte
    • 顺序读 read*
    • 顺序写 write*
    • 清除已读内容 discardReadBytes
    • 清除缓冲区 clear
    • 搜索操作
    • 标记和重置
    • 引用计数和释放
    • 下图显示了一个缓冲区是如何被两个指针分割成三个区域的:
    • 一个完整的示例:
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import org.junit.Test;
    
    import java.util.Arrays;
    
    /**
     * bytebuf的常规API操作示例
     */
    public class ByteBufDemo {
        @Test
        public void apiTest() {
            //  +-------------------+------------------+------------------+
            //  | discardable bytes |  readable bytes  |  writable bytes  |
            //  |                   |     (CONTENT)    |                  |
            //  +-------------------+------------------+------------------+
            //  |                   |                  |                  |
            //  0      <=       readerIndex   <=   writerIndex    <=    capacity
    
            // 1.创建一个非池化的ByteBuf,大小为10个字节
            ByteBuf buf = Unpooled.buffer(10);
            System.out.println("原始ByteBuf为====================>" + buf.toString());
            System.out.println("1.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
    
            // 2.写入一段内容
            byte[] bytes = {1, 2, 3, 4, 5};
            buf.writeBytes(bytes);
            System.out.println("写入的bytes为====================>" + Arrays.toString(bytes));
            System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
            System.out.println("2.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
    
            // 3.读取一段内容
            byte b1 = buf.readByte();
            byte b2 = buf.readByte();
            System.out.println("读取的bytes为====================>" + Arrays.toString(new byte[]{b1, b2}));
            System.out.println("读取一段内容后ByteBuf为===========>" + buf.toString());
            System.out.println("3.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
    
            // 4.将读取的内容丢弃
            buf.discardReadBytes();
            System.out.println("将读取的内容丢弃后ByteBuf为========>" + buf.toString());
            System.out.println("4.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
    
            // 5.清空读写指针
            buf.clear();
            System.out.println("将读写指针清空后ByteBuf为==========>" + buf.toString());
            System.out.println("5.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
    
            // 6.再次写入一段内容,比第一段内容少
            byte[] bytes2 = {1, 2, 3};
            buf.writeBytes(bytes2);
            System.out.println("写入的bytes为====================>" + Arrays.toString(bytes2));
            System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
            System.out.println("6.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
    
            // 7.将ByteBuf清零
            buf.setZero(0, buf.capacity());
            System.out.println("将内容清零后ByteBuf为==============>" + buf.toString());
            System.out.println("7.ByteBuf中的内容为================>" + Arrays.toString(buf.array()) + "\n");
    
            // 8.再次写入一段超过容量的内容
            byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
            buf.writeBytes(bytes3);
            System.out.println("写入的bytes为====================>" + Arrays.toString(bytes3));
            System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
            System.out.println("8.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
            //  随机访问索引 getByte
            //  顺序读 read*
            //  顺序写 write*
            //  清除已读内容 discardReadBytes
            //  清除缓冲区 clear
            //  搜索操作
            //  标记和重置
            //  完整代码示例:参考
            // 搜索操作 读取指定位置 buf.getByte(1);
            //
        }
    }
    
    • 执行结果
    原始ByteBuf为====================>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
    1.ByteBuf中的内容为===============>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    
    写入的bytes为====================>[1, 2, 3, 4, 5]
    写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 5, cap: 10)
    2.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
    
    读取的bytes为====================>[1, 2]
    读取一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 2, widx: 5, cap: 10)
    3.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
    
    将读取的内容丢弃后ByteBuf为========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
    4.ByteBuf中的内容为===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]
    
    将读写指针清空后ByteBuf为==========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
    5.ByteBuf中的内容为===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]
    
    写入的bytes为====================>[1, 2, 3]
    写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
    6.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
    
    将内容清零后ByteBuf为==============>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
    7.ByteBuf中的内容为================>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    
    写入的bytes为====================>[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
    写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 14, cap: 64)
    8.ByteBuf中的内容为===============>[0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    

    4. ByteBuf 动态扩容

    capacity默认值: 256 字节、最大值:Integer.MAX_VALUE(2GB)

    write* 方法调用时,通过AbstractByteBuf.ensureWriteable0 进行检查。
    容量计算方法:AbstractByteBufAllocator.calculateNewCapacity (新capacity的最小要求,capacity最大值)

    根据新capacity的最小值要求,对应有两套计算方法:

    • 没超过4兆:从64字节开始,每次增加一倍,直至计算出来的newCapacity满足新容量最小要求。
      示例:当前大小256,已写250,继续写10字节数据,需要的容量最小要求是261,则新容量是6422*2=512

    • 超过4兆:新容量=新容量最小要求/4兆*4兆+4兆
      示例:当前大小3兆,已写3兆,继续写2兆数据,需要的容量最小要求是5兆,则新容量是9兆〈不能超过最大值)。

    • 4兆的来源:一个固定的阈值AbstractByteBufAllocator.CALCULATE_THRESHOLD


    5. 选择合适的ButeBuf 实现

    了解核心的: 3个维度的划分方式,8 种具体实现


    在使用中,都是通过ByteBufAllocator分配器进行申请,同时分配器具备有内存管理的功能

    Netty 中默认使用的是 PooledUnsafeDirectByteBuf,但是推荐我们使用的是UnpooledHeapByteBuf。

    • 例子
    package com.study.hc.net.netty.demo;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import org.junit.Test;
    
    import java.util.Arrays;
    
    /**
     * 堆外内存的常规API操作示例
     */
    public class DirectByteBufDemo {
        @Test
        public void apiTest() {
            //  +-------------------+------------------+------------------+
            //  | discardable bytes |  readable bytes  |  writable bytes  |
            //  |                   |     (CONTENT)    |                  |
            //  +-------------------+------------------+------------------+
            //  |                   |                  |                  |
            //  0      <=       readerIndex   <=   writerIndex    <=    capacity
    
            // 1.创建一个非池化的ByteBuf,大小为10个字节
            ByteBuf buf = Unpooled.directBuffer(10);
            System.out.println("原始ByteBuf为====================>" + buf.toString());
            // 下面的这个没有实现,所以无法正常运行
            // System.out.println("1.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
    
            // 2.写入一段内容
            byte[] bytes = {1, 2, 3, 4, 5};
            buf.writeBytes(bytes);
            System.out.println("写入的bytes为====================>" + Arrays.toString(bytes));
            System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
            //System.out.println("2.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
    
            // 3.读取一段内容
            byte b1 = buf.readByte();
            byte b2 = buf.readByte();
            System.out.println("读取的bytes为====================>" + Arrays.toString(new byte[]{b1, b2}));
            System.out.println("读取一段内容后ByteBuf为===========>" + buf.toString());
           //System.out.println("3.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
    
            // 4.将读取的内容丢弃
            buf.discardReadBytes();
            System.out.println("将读取的内容丢弃后ByteBuf为========>" + buf.toString());
            //System.out.println("4.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
    
            // 5.清空读写指针
            buf.clear();
            System.out.println("将读写指针清空后ByteBuf为==========>" + buf.toString());
            //System.out.println("5.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
    
            // 6.再次写入一段内容,比第一段内容少
            byte[] bytes2 = {1, 2, 3};
            buf.writeBytes(bytes2);
            System.out.println("写入的bytes为====================>" + Arrays.toString(bytes2));
            System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
           // System.out.println("6.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
    
            // 7.将ByteBuf清零
            buf.setZero(0, buf.capacity());
            System.out.println("将内容清零后ByteBuf为==============>" + buf.toString());
           // System.out.println("7.ByteBuf中的内容为================>" + Arrays.toString(buf.array()) + "\n");
    
            // 8.再次写入一段超过容量的内容
            byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
            buf.writeBytes(bytes3);
            System.out.println("写入的bytes为====================>" + Arrays.toString(bytes3));
            System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
           // System.out.println("8.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
            //  随机访问索引 getByte
            //  顺序读 read*
            //  顺序写 write*
            //  清除已读内容 discardReadBytes
            //  清除缓冲区 clear
            //  搜索操作
            //  标记和重置
            //  完整代码示例:参考
            // 搜索操作 读取指定位置 buf.getByte(1);
            //
        }
    
    }
    

    在使用中,都是通过ByteBufAllocator 分配器进行申请,同时分配器具备有内存管理的功能。

    6. Unsafe的实现

    unsafe意味着不安全的操作。但是更底层的操作会带来性能提升和特殊功能,Netty中会尽力使用unsafe.
    Java语言很重要的特性是“一次编写到处运行”,所以它针对底层的内存或者其他操作,做了很多封装。而unsafe提供了一系列我们操作底层的方法,可能会导致不兼容或者不可知的异常。


    7. PooledByteBuf对象、内存复用

    PoolThreadCache : PooledByteBufAllocator实例维护的一个线程变量。
    多种分类的MemoryRegionCache数组用作内存缓存,MemoryRegionCache内部是链表,队列里面存ChunkoPoolChunk里面维护了内存引用,内存复用的做法就是把buf的memory指向chunk的memoryo
    PooledByteBufAllocator.ioBuffer运作过程梳理:


    8. 零拷贝机制

    Netty的零拷贝机制,是一种应用层的实现。和底层JVM、操作系统内存机制并无过多关联。

    1、CompositeByteBuf,将多个ByteBuf合并为一个逻辑上的ByteBuf,避免了各个ByteBuf之间的拷贝。


    2、wrapedBuffer()方法,将byte[]数组包装成ByteBuf对象。


    3、slice()方法。将一个ByteBuf对象切分成多个ByteBuf对象。


    使用ByteBuf是Netty高性能很重要的一个原因!

    • 示例
    package com.study.hc.net.netty.demo;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.CompositeByteBuf;
    import io.netty.buffer.Unpooled;
    
    import java.nio.charset.Charset;
    
    /**
     * 零拷贝示例
     */
    public class ZeroCopyTest {
        @org.junit.Test
        public void wrapTest() {
            byte[] arr = {1, 2, 3, 4, 5};
            ByteBuf byteBuf = Unpooled.wrappedBuffer(arr);
            System.out.println(byteBuf.getByte(4));
            arr[4] = 6;
            System.out.println(byteBuf.getByte(4));
        }
    
        @org.junit.Test
        public void sliceTest() {
            ByteBuf buffer1 = Unpooled.wrappedBuffer("hello".getBytes());
            ByteBuf newBuffer = buffer1.slice(1, 2);
            newBuffer.unwrap();
            System.out.println(newBuffer.toString());
        }
    
        @org.junit.Test
        public void compositeTest() {
            ByteBuf buffer1 = Unpooled.buffer(3);
            buffer1.writeByte(1);
            ByteBuf buffer2 = Unpooled.buffer(3);
            buffer2.writeByte(4);
            CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
            CompositeByteBuf newBuffer = compositeByteBuf.addComponents(true, buffer1, buffer2);
            System.out.println(newBuffer);
        }
    
    }
    

    如果觉得有收获就点个赞吧,更多知识,请点击关注查看我的主页信息哦~

    相关文章

      网友评论

        本文标题:Netty的内存分配相关知识&零拷贝机制

        本文链接:https://www.haomeiwen.com/subject/gcagirtx.html