美文网首页netty
网络编程Netty之ByteBuf详解

网络编程Netty之ByteBuf详解

作者: virtual灬zzZ | 来源:发表于2022-11-29 00:01 被阅读0次

    Netty中的ByteBuf优势

    NIO使用的ByteBuffer有哪些缺点

    1: 无法动态扩容,ByteBuffer的长度是固定的,是初始指定的值,不能够再进行扩容了,当写入的内容大于ByteBuffer的容量时,会报越界异常

    2.: API使用复杂,当要读取数据时,需要调用buffer.flip()方法,转换为读取模式,如果稍微不注意就可能出现错误,读取不到数据或者读取的数据是错误的

    ByteBuf的优势和做了哪些增强

    1: API操作起来更加的方便,可以直接写或者直接读

    2:支持动态扩容,当写入的数据大于ByteBuf的容量时,会动态扩容,不会报错

    3:提供了多种ByteBuf的实现,可以更加灵活的使用

    4:提供了高效的零拷贝机制

    5:ByteBuf可以内存复用

    ByteBuf操作示例
    ByteBuf操作

    ByteBuf中有三个重要的属性:
    1:capacity容量,初始指定的ByteBuf的大小

    2:readIndex读取位置,顺序读的时候,记录读取数据的索引值

    3:writeIndex写入位置,顺序写的时候,记录写入数据的索引值

    ByteBuf常用的方法:
    1:getByte和setByte,获取指定索引处的数据,是随机获取的,不会改变readIndex和writeIndex的值

    2:read*,顺序读,会改变readIndex的值

    3:write*,顺序写,会改变writeIndex的值

    4:discardReadBytes,清除读过的内容

    5:clear,清除缓冲区

    6:搜索操作

    7:标记和重置

    8:引用计数和释放

    简单的Demo示例
    /**
     * ByteBuf的使用示例
     */
    public class ByteBufDemo {
    
        public static void main(String[] args) {
            //分配非池化,10个字节的ByteBuf
            ByteBuf buf = Unpooled.buffer(10);
    
            //看下ByteBuf
            System.out.println("------------------------原始的ByteBuf-------------------------------");
            System.out.println("ByteBuf参数:" + buf.toString());
            System.out.println("ByteBuf中的内容:" + Arrays.toString(buf.array()) + "\n");
    
            //写入内容到ByteBuf
            byte[] bytes = {1, 2, 3, 4, 5};
            buf.writeBytes(bytes);
            System.out.println("------------------------写入内容后的ByteBuf-------------------------------");
            System.out.println("ByteBuf参数:" + buf.toString());
            System.out.println("ByteBuf中的内容:" + Arrays.toString(buf.array()) + "\n");
    
            //从ByteBuf中读取内容
            buf.readByte();
            buf.readByte();
            System.out.println("------------------------读取一些内容后的ByteBuf-------------------------------");
            System.out.println("ByteBuf参数:" + buf.toString());
            System.out.println("ByteBuf中的内容:" + Arrays.toString(buf.array()) + "\n");
    
            //清除读过的内容
            //把读过的数据清除后,readIndex变为0,writeIndex变为3
            //后面尚未读取的内容,会复制到前面去,把原来的值覆盖掉
            //再次写入时,3,4,5后面的4,5会被覆盖掉
            buf.discardReadBytes();
            System.out.println("------------------------清除读过的数据后的ByteBuf-------------------------------");
            System.out.println("ByteBuf参数:" + buf.toString());
            System.out.println("ByteBuf中的内容:" + Arrays.toString(buf.array()) + "\n");
    
            //再次写入内容到ByteBuf
            byte[] bytesO = {6};
            buf.writeBytes(bytesO);
            System.out.println("------------------------再次写入内容后的ByteBuf-------------------------------");
            System.out.println("ByteBuf参数:" + buf.toString());
            System.out.println("ByteBuf中的内容:" + Arrays.toString(buf.array()) + "\n");
    
            //清空读和写的索引值
            //readIndex和writeIndex会重置为0,ByteBuf中的内容并不会重置
            buf.clear();
            System.out.println("------------------------清空读和写的索引值后的ByteBuf-------------------------------");
            System.out.println("ByteBuf参数:" + buf.toString());
            System.out.println("ByteBuf中的内容:" + Arrays.toString(buf.array()) + "\n");
    
            //再次写入内容到ByteBuf
            byte[] bytes2 = {1, 2, 3};
            buf.writeBytes(bytes2);
            System.out.println("------------------------再次写入内容后的ByteBuf-------------------------------");
            System.out.println("ByteBuf参数:" + buf.toString());
            System.out.println("ByteBuf中的内容:" + Arrays.toString(buf.array()) + "\n");
    
            //清空ByteBuf的内容
            //不会重置readIndex和writeIndex
            buf.setZero(0, buf.capacity());
            System.out.println("------------------------清空ByteBuf的内容后的ByteBuf-------------------------------");
            System.out.println("ByteBuf参数:" + buf.toString());
            System.out.println("ByteBuf中的内容:" + Arrays.toString(buf.array()) + "\n");
    
            //再次写入超出指定容量的数据到ByteBuf
            //会进行扩容
            byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
            buf.writeBytes(bytes3);
            System.out.println("------------------------再次写入超出指定容量的数据后的ByteBuf-------------------------------");
            System.out.println("ByteBuf参数:" + buf.toString());
            System.out.println("ByteBuf中的内容:" + Arrays.toString(buf.array()) + "\n");
        }
    }
    

    输出结果:



    上面的例子是使用堆内的ByteBuf,下面看下堆外的ByteBuf例子:

     //分配非池化,10个字节的directBuffer
    ByteBuf buf = Unpooled.directBuffer(10);
    
    //看下ByteBuf
    System.out.println("------------------------原始的ByteBuf-------------------------------");
    System.out.println("ByteBuf参数:" + buf.toString());
    

    directBuffer不能够使用array方法,否则会报错:java.lang.UnsupportedOperationException: direct buffer;而且使用ByteBuf是用它底层的分配器分配的,不是new一个出来,下面会具体说下。


    上图中,可以看到,readIndex和writeIndex把缓冲区分成了三块,readIndex会小于或者等于writeIndex,这个应该好理解,还没有写到那里,就去读取了,能读取到什么呢。

    堆内和堆外内存

    socket是操作系统底层提供给上层应用使用的网络通信API,当要去读取或者写入的数据在JVM的堆中,那么就先需要把JVM堆中需要读取的数据拷贝一份到操作系统中,然后socket再去读取,而直接内存的好处是socket可以直接读取,少了拷贝这一步操作。

    ByteBuf动态扩容
    下面以堆内的ByteBuf为例,查看源码,分析ByteBuf的动态扩容:
    动态扩容肯定是写入数据的时候,ByteBuf的容量不够了,才去扩容的,所以需要跟踪下面的代码:

    buf.writeBytes(bytes);
    

    跟踪上面的writeBytes,首先进入了ByteBuf这个抽象类中,进入了下面这个抽象方法:

    public abstract ByteBuf writeBytes(byte[] src);
    

    它的实现类如下:


    进入第一个AbstractByteBuf的方法:

     @Override
     public ByteBuf writeBytes(byte[] src) {
          writeBytes(src, 0, src.length);
          return this;
      }
    

    再次调用了下面的方法:

     @Override
     public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
          //检查是否可以写入
          ensureWritable(length);
          setBytes(writerIndex, src, srcIndex, length);
          //把当前的写入位置加上写入数据的长度
          writerIndex += length;
          return this;
      }
    

    src是需要写入的数据,length是写入数据的长度
    然后会进入ensureWritable方法,传入的参数是:写入数据的长度

    @Override
    public ByteBuf ensureWritable(int minWritableBytes) {
        //参数校验
        checkPositiveOrZero(minWritableBytes, "minWritableBytes");
        //检查容量是否可以写入这么多数据
        ensureWritable0(minWritableBytes);
        return this;
    }
    
    //检查参数是否小于0
    public static int checkPositiveOrZero(int i, String name) {
        if (i < 0) {
             throw new IllegalArgumentException(name + ": " + i + " (expected: >= 0)");
         }
         return i;
     }
    

    参数校验完成后会进入ensureWritable0方法:

    final void ensureWritable0(int minWritableBytes) {
            //确保缓冲区可以访问
            ensureAccessible();
            //如果写入的数据长度小于等于剩余可写数据的容量,就直接返回
            //就是说,容量足够写入,不需要扩容
            if (minWritableBytes <= writableBytes()) {
                return;
            }
            if (checkBounds) {
                //maxCapacity是int的最大值
                //检查写入的数据长度是否比可以写入的最大容量还要大
                //是的话就抛异常
                if (minWritableBytes > maxCapacity - writerIndex) {
                    throw new IndexOutOfBoundsException(String.format(
                            "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                            writerIndex, minWritableBytes, maxCapacity, this));
                }
            }
    
            //正式的扩容方法
            int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
    
            //把扩容后的新容量设置进去
            capacity(newCapacity);
    }
    

    进入AbstractByteBufAllocator类的扩容方法:

    //常量 4M
    static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page
    
     @Override
     public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
            //校验参数
            checkPositiveOrZero(minNewCapacity, "minNewCapacity");
            //minNewCapacity = writerIndex + minWritableBytes
            //已经写入的数据索引加上当前写入的数据长度,就是需要的最小的容量
            //判断是否比最大容量还大,是的话就抛异常
            if (minNewCapacity > maxCapacity) {
                throw new IllegalArgumentException(String.format(
                        "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                        minNewCapacity, maxCapacity));
            }
            final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
            //如果需要的最小容量等于4M,就直接返回4M,作为扩容后的容量
            if (minNewCapacity == threshold) {
                return threshold;
            }
    
            //如果需要的最小容量大于4M,就按照下面的扩容方式扩容
            if (minNewCapacity > threshold) {
                //newCapacity = 15 / 4194304 * 4194304 
                int newCapacity = minNewCapacity / threshold * threshold;
                //如果计算出的容量大于最大容量减去4M,就把最大容量赋值给新的容量
                if (newCapacity > maxCapacity - threshold) {
                    newCapacity = maxCapacity;
                } else {
                    newCapacity += threshold;
                }
                return newCapacity;
            }
    
            //如果需要的最小容量小于4M,就按照下面的方式扩容
            int newCapacity = 64;
            while (newCapacity < minNewCapacity) {
                newCapacity <<= 1;
            }
    
            return Math.min(newCapacity, maxCapacity);
    }
    
    再看下capacity方法:
    下面的把扩容后的容量放到ByteBuf,就是使用了arraycopy方法
    
     @Override
        public ByteBuf capacity(int newCapacity) {
            checkNewCapacity(newCapacity);
    
            int oldCapacity = array.length;
            byte[] oldArray = array;
            if (newCapacity > oldCapacity) {
                byte[] newArray = allocateArray(newCapacity);
                System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
                setArray(newArray);
                freeArray(oldArray);
            } else if (newCapacity < oldCapacity) {
                byte[] newArray = allocateArray(newCapacity);
                int readerIndex = readerIndex();
                if (readerIndex < newCapacity) {
                    int writerIndex = writerIndex();
                    if (writerIndex > newCapacity) {
                        writerIndex(writerIndex = newCapacity);
                    }
                    System.arraycopy(oldArray, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
                } else {
                    setIndex(newCapacity, newCapacity);
                }
                setArray(newArray);
                freeArray(oldArray);
            }
            return this;
        }
    

    下面是跟踪的代码步骤:


    总结下动态扩容机制:
    1:write*方法调用的时候,会通过ensureWritable0方法检查
    2:calculateNewCapacity方法是用来计算容量的方法

    扩容计算方法:
    1:需要的容量没有超过4M,会从64字节开始扩容,每次增加一倍,直到计算出来的容量满足需要的最小容量,假如,当前大小是256,已经写入了200字节,再次写入60字节,需要的最小容量是260字节,那么扩容后的容量是64 * 2 * 2 * 2=512
    2:需要的容量超过4M,扩容计算方法为:新容量 = 新容量的最小要求 / 4M * 4M + 4M,假如当前大小是3M,已经写了2M,再写入3M,需要的最小容量是5M,那么扩容后的容量是 5 / 4 * 4 + 4 = 8M

    图示1:需要的容量小于4M:


    图示2:需要的容量大于4M:


    ByteBuf有哪些实现
    ByteBuf从3个维度,有8种实现方式:


    ByteBuf类图


    //堆内
    ByteBuf buf = Unpooled.buffer(10);
    //堆外
    ByteBuf buf = Unpooled.directBuffer(10);
    

    ByteBuf提供了Unpooled非池化的类,可以直接使用,没有提供Pool池化的类,下面追踪源码看下ByteBuf是怎样分配的:

    Unpooled.buffer分配方式

    首先进入Unpooled类:

    private static final ByteBufAllocator ALLOC = UnpooledByteBufAllocator.DEFAULT;
    //使用默认的分配器分配堆内buffer
    public static ByteBuf buffer(int initialCapacity) {
      return ALLOC.heapBuffer(initialCapacity);
    }
    

    下面会进入接口类ByteBufAllocator:

    //分配一个指定容量的堆内buf
    ByteBuf heapBuffer(int initialCapacity);
    

    然后进入AbstractByteBufAllocator抽象类:

    //如果没有指定初始容量,默认的初始容量大小是256
    static final int DEFAULT_INITIAL_CAPACITY = 256;
    //最大容量,为int的最大值
    static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE;
    
    @Override
    public ByteBuf heapBuffer(int initialCapacity) {
       return heapBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
    }
    
     @Override
     public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
         //如果初始化的容量是0,最大的容量也是0,就返回一个空的Buf
         if (initialCapacity == 0 && maxCapacity == 0) {
             return emptyBuf;
         }
         validate(initialCapacity, maxCapacity);
         return newHeapBuffer(initialCapacity, maxCapacity);
     }
    
    //校验参数
    private static void validate(int initialCapacity, int maxCapacity) {
       //检查参数
       checkPositiveOrZero(initialCapacity, "initialCapacity");
       //如果初始化的容量大于最大容量,就抛异常
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity: %d (expected: not greater than maxCapacity(%d)",
                    initialCapacity, maxCapacity));
        }
    }
    

    然后是newHeapBuffer抽象方法:

    protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity);
    

    因为这里初始化的是非池化的,所以会进入UnpooledByteBufAllocator类:

    @Override
    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
         //PlatformDependent.hasUnsafe()是检查当前操作系统是否支持unsafe操作
         //根据支持与否,进入不同的类
         return PlatformDependent.hasUnsafe() ?
                 new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
                 new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
     }
    

    支持Unsafe操作的进入下面:

     InstrumentedUnpooledUnsafeHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
             super(alloc, initialCapacity, maxCapacity);
     }
    

    不支持Unsafe的进入下面这个:

    InstrumentedUnpooledHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
             super(alloc, initialCapacity, maxCapacity);
    }
    

    现在以支持Unsafe操作往下面走,进入UnpooledUnsafeHeapByteBuf类:

     UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
            super(alloc, initialCapacity, maxCapacity);
    }
    

    再次调用了父类UnpooledHeapByteBuf:

    //分配器
    private final ByteBufAllocator alloc;
    //byte数组,ByteBuf数据底层就是使用这个存储
    byte[] array;
    
    public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
            super(maxCapacity);
    
            //检查分配器是否为空
            checkNotNull(alloc, "alloc");
            //如果初始化的容量大于最大容量,就抛异常
            if (initialCapacity > maxCapacity) {
                throw new IllegalArgumentException(String.format(
                        "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
            }
    
            this.alloc = alloc;
            //设置当前的数组是分配之后的数组
            setArray(allocateArray(initialCapacity));
            //初始化readIndex和writeIndex
            setIndex(0, 0);
     }
    
    //分配数组
    protected byte[] allocateArray(int initialCapacity) {
         //返回一个具有initialCapacity容量大小的byte数组
         return new byte[initialCapacity];
    }
    
    //set数组
     private void setArray(byte[] initialArray) {
            array = initialArray;
            tmpNioBuf = null;
    }
    

    AbstractByteBuf类下的setIndex方法:

    //初始化readerIndex和writerIndex
    @Override
    public ByteBuf setIndex(int readerIndex, int writerIndex) {
        if (checkBounds) {
            checkIndexBounds(readerIndex, writerIndex, capacity());
        }
        setIndex0(readerIndex, writerIndex);
        return this;
    }
    
    final void setIndex0(int readerIndex, int writerIndex) {
          this.readerIndex = readerIndex;
          this.writerIndex = writerIndex;
    }
    

    上面走到AbstractByteBuf后,就分配完了一个非池化、堆内的ByteBuf,下面是追踪的代码:


    总结:
    可以看到,分配一个非池化、堆内的ByteBuf,它的底层就是byte数组

    Unpooled.directBuffer分配方式

    首先进入的也是Unpooled类:

    public static ByteBuf directBuffer(int initialCapacity) {
         return ALLOC.directBuffer(initialCapacity);
    }
    

    然后进入ByteBufAllocator抽象类:

    ByteBuf directBuffer(int initialCapacity);
    

    然后到AbstractByteBufAllocator类:

    @Override
    public ByteBuf directBuffer(int initialCapacity) {
         return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
     }
     
    @Override
    public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
         //如果初始化的容量和最大容量都是0,就返回一个空的Buf
         if (initialCapacity == 0 && maxCapacity == 0) {
             return emptyBuf;
         }
         //校验参数
         validate(initialCapacity, maxCapacity);
         return newDirectBuffer(initialCapacity, maxCapacity);
     }
     
    protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity);
    

    由于分配的也是一个非池化的,所以newDirectBuffer会进入UnpooledByteBufAllocator类中的实现类:

    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
         final ByteBuf buf;
         //同样的,会判断是否支持unsafe操作
         if (PlatformDependent.hasUnsafe()) {
             buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
                     new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
         } else {
             buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
         }
         return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
     }
    

    以InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf为例,后面两个其实也相差不大,进入UnpooledUnsafeNoCleanerDirectByteBuf类的构造方法:

     UnpooledUnsafeNoCleanerDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
            super(alloc, initialCapacity, maxCapacity);
        }
    

    再次调用的父类UnpooledUnsafeDirectByteBuf:

    ByteBuffer buffer;
    
    public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
            super(maxCapacity);
            if (alloc == null) {
                throw new NullPointerException("alloc");
            }
            //校验参数
            checkPositiveOrZero(initialCapacity, "initialCapacity");
            checkPositiveOrZero(maxCapacity, "maxCapacity");
            if (initialCapacity > maxCapacity) {
                throw new IllegalArgumentException(String.format(
                        "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
            }
    
            this.alloc = alloc;
            setByteBuffer(allocateDirect(initialCapacity), false);
    }
    
    //分配的是一个NIO中的ByteBuffer
    protected ByteBuffer allocateDirect(int initialCapacity) {
          return ByteBuffer.allocateDirect(initialCapacity);
    }
    
    final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
            if (tryFree) {
                ByteBuffer oldBuffer = this.buffer;
                if (oldBuffer != null) {
                    if (doNotFree) {
                        doNotFree = false;
                    } else {
                        freeDirect(oldBuffer);
                    }
                }
            }
            this.buffer = buffer;
            memoryAddress = PlatformDependent.directBufferAddress(buffer);
            tmpNioBuf = null;
            capacity = buffer.remaining();
    }
    

    ByteBuffer类下面的allocateDirect:

     public static ByteBuffer allocateDirect(int capacity) {
            return new DirectByteBuffer(capacity);
     }
    

    代码跟踪图:


    总结:
    分配非池化、堆外的ByteBuf,可以看到底层是NIO的DirectByteBuffer实现的

    ByteBufAllocator类图

    ByteBuf内存复用

    分配池化内存

    在上面根据源码知道了怎么去分配非池化内存,那么池化内存要怎么分配呢?看下面的图示:


    上面就是分配池化内存的步骤,接下来会根据源码具体分析

    内存缓存池

    jemalloc内存分配机制

    1:内存池中有三大区域,分别是:tiny、small、normal
    2:每个区域分了不同大小的格子,每个格子只能缓存对应大小的内存块
    3:支持最大的格子内存是32kb,超过这个大小的不能被缓存,只能被释放掉
    4:每个类型的格子都有对应的数量:tiny:512个,small:256个,normal:64个,例如tiny区域的每个大小的格子都有512个,如果满了就不会被回收,内存会被释放掉

    回收池化内存

    分配池化内存的过程
    上面分析了分配非池化内存,下面看下怎么分配池化内存:

     ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
     //分配的内存最大长度为496
     ByteBuf buf1 = allocator.ioBuffer(495);
     System.out.printf("buf1: 0x%X%n", buf1.memoryAddress());
     //此时会被回收到tiny的512b格子中
     buf1.release();
    
     //从tiny的512b格子去取
     ByteBuf buf2 = allocator.ioBuffer(495);
     System.out.printf("buf2: 0x%X%n", buf2.memoryAddress());
     buf2.release();
    

    先来看下ByteBufAllocator类:

    //默认ByteBuf分配器,在ByteBufUtil中初始化
    ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
    

    跟踪第一次的allocator.ioBuffer(495)代码,首先进入AbstractByteBufAllocator类:

    @Override
    public ByteBuf ioBuffer(int initialCapacity) {
        //如果支持Unsafe,就分配堆外内存
        if (PlatformDependent.hasUnsafe()) {
            return directBuffer(initialCapacity);
        }
        //不支持Unsafe,就分配堆内内存
        return heapBuffer(initialCapacity);
    }
    

    然后调用了该类下面的directBuffer方法:

    @Override
    public ByteBuf directBuffer(int initialCapacity) {
         return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
     }
    
     @Override
     public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
         //如果初始化的容量和最大容量等于0,就返回一个空的ByteBuf
         if (initialCapacity == 0 && maxCapacity == 0) {
             return emptyBuf;
         }
         validate(initialCapacity, maxCapacity);
         return newDirectBuffer(initialCapacity, maxCapacity);
     }
     //校验参数
     private static void validate(int initialCapacity, int maxCapacity) {
            checkPositiveOrZero(initialCapacity, "initialCapacity");
            if (initialCapacity > maxCapacity) {
                throw new IllegalArgumentException(String.format(
                        "initialCapacity: %d (expected: not greater than maxCapacity(%d)",
                        initialCapacity, maxCapacity));
            }
    }
    

    然后会进入池化的ByteBuf分配器PooledByteBufAllocator类,可以实现内存的复用:

    // cache sizes  缓存默认值
    DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
    DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
    DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
    
     @Override
     protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
         //从当前线程中获取cache对象
         PoolThreadCache cache = threadCache.get();
         //从cache中获取Arena
         //Arena可以理解为一个netty提供的实际进行buf分配和管理的工具
         PoolArena<ByteBuffer> directArena = cache.directArena;
    
         final ByteBuf buf;
         //如果有directArena就分配池化内存
         if (directArena != null) {
             buf = directArena.allocate(cache, initialCapacity, maxCapacity);
         } else { //如果没有directArena,就使用非池化Unpooled
             buf = PlatformDependent.hasUnsafe() ?
                     UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                     new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
         }
    
         return toLeakAwareBuffer(buf);
     }
    

    再次跟踪后进入PoolArena类:
    可以看到下面有三种类型tiny、small、normal

    enum SizeClass {
         Tiny,
         Small,
        Normal
    }
    
    PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
          //获取一个ByteBuf对象
          PooledByteBuf<T> buf = newByteBuf(maxCapacity);
          //分配内存
          allocate(cache, buf, reqCapacity);
          return buf;
    }
    
    @Override
    protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
        //如果支持Unsafe,就初始化一个PooledUnsafeDirectByteBuf
        if (HAS_UNSAFE) {
            return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
        } else { //不支持Unsafe,就初始化一个PooledDirectByteBuf
            return PooledDirectByteBuf.newInstance(maxCapacity);
        }
    }
    

    下面进入PooledUnsafeDirectByteBuf类:
    从线程回收栈中获取一个buf,如果栈中没有,就会创建一个新的,如果有,就会返回栈中的buf

    //调用RECYCLER.get()时,线程栈中没有可以复用的时,会调用newObject方法,此时创建出来的buf是空的
     private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
          @Override
           protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
               return new PooledUnsafeDirectByteBuf(handle, 0);
           }
    };
    
    static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
           //RECYCLER,回收机制
           PooledUnsafeDirectByteBuf buf = RECYCLER.get();
           //取出来的可能是之前的buf,使用之前清理一下
           buf.reuse(maxCapacity);
           return buf;
    }
    

    然后再次回到PoolArena类中的allocate方法,分配内存:

    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
             //将需要的内存大小计算为2^n
            final int normCapacity = normalizeCapacity(reqCapacity);
            //需要分配的内存是否是tiny或者small类型
            if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
                int tableIdx;
                PoolSubpage<T>[] table;
                boolean tiny = isTiny(normCapacity);
                if (tiny) { // < 512 //分配一个tiny内存
                    if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                        // was able to allocate out of the cache so move on
                        return;
                    }
                    tableIdx = tinyIdx(normCapacity);
                    table = tinySubpagePools;
                } else {
                    if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                        // was able to allocate out of the cache so move on
                        return;
                    }
                    tableIdx = smallIdx(normCapacity);
                    table = smallSubpagePools;
                }
    
                final PoolSubpage<T> head = table[tableIdx];
    
              
                synchronized (head) {
                    final PoolSubpage<T> s = head.next;
                    if (s != head) {
                        assert s.doNotDestroy && s.elemSize == normCapacity;
                        long handle = s.allocate();
                        assert handle >= 0;
                        s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
                        incTinySmallAllocation(tiny);
                        return;
                    }
                }
                synchronized (this) {
                    //分配一块新的内存
                    allocateNormal(buf, reqCapacity, normCapacity);
                }
    
                incTinySmallAllocation(tiny);
                return;
            }
            if (normCapacity <= chunkSize) {
                if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                synchronized (this) {
                    allocateNormal(buf, reqCapacity, normCapacity);
                    ++allocationsNormal;
                }
            } else {
                // Huge allocations are never served via the cache so just call allocateHuge
                allocateHuge(buf, reqCapacity);
            }
    }
    

    PoolThreadCache类下的allocateTiny方法:

    boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
        return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
    }
    
    //从cache中获取buf
     private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
            int idx = PoolArena.tinyIdx(normCapacity);
            if (area.isDirect()) {
                return cache(tinySubPageDirectCaches, idx);
            }
            return cache(tinySubPageHeapCaches, idx);
        }
    

    根据需要的容量获取对应的格子,走到PoolArena类下面的tinyIdx方法:

    static int tinyIdx(int normCapacity) {
            return normCapacity >>> 4;
    }
    

    PoolThreadCache类下的allocate方法,把缓存格子的内存分配到buf

    private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
          if (cache == null) {
              // no cache found so just return false here
              return false;
          }
          boolean allocated = cache.allocate(buf, reqCapacity);
          if (++ allocations >= freeSweepAllocationThreshold) {
              allocations = 0;
              trim();
          }
          return allocated;
      }
    

    下面是具体跟踪代码的步骤图:


    上面的源码是以tiny类型为例,其他两种类型类似,当第一次分配创建了一块新的内存,然后被成功回收到内存缓冲池后,再次分配对应大小的内存,会直接从内存缓冲池中取,不会再次分配一块新的内存了

    内存回收的过程

    接下来跟踪release()方法,看下内存回收的过程

    buf1.release();
    

    第一次进入AbstractReferenceCountedByteBuf类:
    Buf的引用计数器,用于内存复用,有一个计数器refCnt,retain()计数器加一,release()计数器减一,
    直到计数器为0,才调用deallocate()释放,deallocate()方法由具体的buf自己实现。

     @Override
     public boolean release() {
         return release0(1);
     }
    
     private boolean release0(int decrement) {
            int rawCnt = nonVolatileRawCnt(), realCnt = toLiveRealCnt(rawCnt, decrement);
            //判断当前buf有没有被引用了,没有的话就调用deallocate
            if (decrement == realCnt) {
                if (refCntUpdater.compareAndSet(this, rawCnt, 1)) {
                    deallocate();
                    return true;
                }
                return retryRelease0(decrement);
            }
            return releaseNonFinal0(decrement, rawCnt, realCnt);
    }
    

    进入PooledByteBuf类:

    @Override
    protected final void deallocate() {
        if (handle >= 0) {
            final long handle = this.handle;
            //表示当前的buf不在使用任何一块内存区域
            this.handle = -1;
            //设置memory为null
            memory = null;
            //释放buf的内存
            chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache);
            tmpNioBuf = null;
            chunk = null;
            //把buf对象放入对象回收栈
            recycle();
        }
    }
    

    再次进入PoolArena类:

    void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
            //判断是否是unpooled
            if (chunk.unpooled) {
                int size = chunk.chunkSize();
                destroyChunk(chunk);
                activeBytesHuge.add(-size);
                deallocationsHuge.increment();
            } else {
                //判断是哪种类型,tiny、small、normal
                SizeClass sizeClass = sizeClass(normCapacity);
                //放入缓存
                if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
                    // cached so not free it.
                    return;
                }
    
                freeChunk(chunk, handle, sizeClass, nioBuffer);
            }
    }
    
    //计算内存区域是哪种类型
    private SizeClass sizeClass(int normCapacity) {
            if (!isTinyOrSmall(normCapacity)) {
                return SizeClass.Normal;
            }
            return isTiny(normCapacity) ? SizeClass.Tiny : SizeClass.Small;
    }
    

    然后到PoolThreadCache类:

    boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
                    long handle, int normCapacity, SizeClass sizeClass) {
         MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
         if (cache == null) {
             return false;
         }
         //加入到缓存队列
         return cache.add(chunk, nioBuffer, handle);
    }
    
     private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
            //判断是哪种类型,然后把内存回收到哪一块
            switch (sizeClass) {
            case Normal:
                return cacheForNormal(area, normCapacity);
            case Small:
                return cacheForSmall(area, normCapacity);
            case Tiny:
                return cacheForTiny(area, normCapacity);
            default:
                throw new Error();
            }
    }
    
      private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
            int idx = PoolArena.tinyIdx(normCapacity);
            if (area.isDirect()) {
                return cache(tinySubPageDirectCaches, idx);
            }
            return cache(tinySubPageHeapCaches, idx);
        }
    

    上述跟踪代码步骤图:


    ByteBuf零拷贝机制

    Netty的零拷贝机制,是一种应用层的实现,和底层的JVM、操作系统内存机制没有过多的关联

    几种示例
    一:CompositeByteBuf,将多个ByteBuf合并为一个逻辑上的ByteBuf,避免了各个ByteBuf之间的拷贝
    public static void test1() {
           ByteBuf buf1 = Unpooled.buffer(4);
           ByteBuf buf2 = Unpooled.buffer(3);
           byte[] bytes1 = {1,2};
           byte[] bytes2 = {3,4,5};
           buf1.writeBytes(bytes1);
           buf2.writeBytes(bytes2);
           CompositeByteBuf byteBuf = Unpooled.compositeBuffer();
           byteBuf = byteBuf.addComponents(true, buf1, buf2);
           System.out.println("byteBuf: " + byteBuf.toString());
    }
    

    上面输出结果,ridx是顺序读的读取位置,widx是顺序写的写入位置,cap是新的ByteBuf的容量,components是指新的ByteBuf是由几个ByteBuf组成


    二:wrappedBuffer()方法,将byte[]数组包装成ByteBuf对象
    public static void test2() {
          byte[] bytes = {1,2,3,4,5};
          ByteBuf buf = Unpooled.wrappedBuffer(bytes);
          System.out.println("buf:" + buf.toString());
    }
    

    输出结果中:ridx是顺序读的读取位置,widx是顺序写的写入位置,cap是ByteBuf的容量,新的ByteBuf里存的是数组的引用地址,实质操作的还是原来的数组


    三:slice()方法,将一个ByteBuf对象切分成多个ByteBuf对象
    public static void test3() {
         ByteBuf buf = Unpooled.wrappedBuffer("hello".getBytes());
         ByteBuf byteBuf = buf.slice(1,2);
         System.out.println("byteBuf:" + byteBuf.toString());
    }
    

    输出结果中,可以看到,有两个ByteBuf,其中一个是原有的,新的ByteBuf中存放了原来的ByteBuf的引用地址,另一个是分割后的ByteBuf的引用地址

    相关文章

      网友评论

        本文标题:网络编程Netty之ByteBuf详解

        本文链接:https://www.haomeiwen.com/subject/lyzkfdtx.html