美文网首页
Netty源码_UnpooledHeapByteBuf详解

Netty源码_UnpooledHeapByteBuf详解

作者: wo883721 | 来源:发表于2021-10-23 18:18 被阅读0次

    本篇文章我们讲解缓存区 ByteBuf 八大主要类型中两种,未池化堆缓冲区 UnpooledHeapByteBuf 和 未池化不完全堆缓冲区 UnpooledUnsafeHeapByteBuf

    一. UnpooledHeapByteBuf

    1.1 介绍

    Big endian Java heap buffer implementation.
    It is recommended to use
    UnpooledByteBufAllocator.heapBuffer(int, int), 
    Unpooled.buffer(int) and
    Unpooled.wrappedBuffer(byte[]) 
    instead of calling the constructor explicitly.
    

    UnpooledHeapByteBufjava 堆缓冲区的实现,而且它推荐使用 UnpooledByteBufAllocator.heapBuffer(int, int),Unpooled.buffer(int)Unpooled.wrappedBuffer(byte[]) 方式创建 UnpooledHeapByteBuf,而不是直接调用它的构造方法new 出来。

    1.2 成员属性

        private final ByteBufAllocator alloc;
        byte[] array;
        private ByteBuffer tmpNioBuf;
    

    有三个成员属性:

    1. alloc: 创建此缓存区的ByteBufAllocator 对象。
    2. array: 字节数组,用来储存此缓存区的内容数据。
    3. tmpNioBuf: 临时的NIO 缓存区ByteBuffer 对象,其实是由array 创建的。

    1.3 构造方法

          /**
           * 使用新分配的字节数组创建新的堆缓冲区。
           */
        public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
            super(maxCapacity);
    
            if (initialCapacity > maxCapacity) {
                throw new IllegalArgumentException(String.format(
                        "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
            }
    
            this.alloc = checkNotNull(alloc, "alloc");
            setArray(allocateArray(initialCapacity));
            setIndex(0, 0);
        }
    
           /**
            * 使用现有字节数组创建新的堆缓冲区。
            */
        protected UnpooledHeapByteBuf(ByteBufAllocator alloc, byte[] initialArray, int maxCapacity) {
            super(maxCapacity);
    
            checkNotNull(alloc, "alloc");
            checkNotNull(initialArray, "initialArray");
            if (initialArray.length > maxCapacity) {
                throw new IllegalArgumentException(String.format(
                        "initialCapacity(%d) > maxCapacity(%d)", initialArray.length, maxCapacity));
            }
    
            this.alloc = alloc;
            setArray(initialArray);
            setIndex(0, initialArray.length);
        }
    

    它有两个构造方法,一个是创建的时候没有内容,一个创建的时候就带有内容数据。

    1.4 重要方法

    1.4.1 allocateArray(int initialCapacity)

        protected byte[] allocateArray(int initialCapacity) {
            return new byte[initialCapacity];
        }
    

    分配新的字节数组。

    这个方法很重要,因为UnpooledHeapByteBufUnpooledUnsafeHeapByteBuf 最重要的区别就在这里。

    • UnpooledHeapByteBuf 是直接new 一个字节数组。
    • UnpooledUnsafeHeapByteBuf 是通过 PlatformDependent.allocateUninitializedArray(initialCapacity) 方法创建字节数组。

    1.4.2 setArray(byte[] initialArray)

        private void setArray(byte[] initialArray) {
            array = initialArray;
            tmpNioBuf = null;
        }
    

    替换缓存区的字节数组array,必须将 tmpNioBuf 设置为 null

    因为NIO缓存区tmpNioBuf其实是根据字节数组array创建的,当array改变的时候, tmpNioBuf也就要重新创建了。

    1.4.3 设置缓存区容量

        @Override
        public int capacity() {
            return array.length;
        }
    
        // 在容量减少后调用
        protected final void trimIndicesToCapacity(int newCapacity) {
            // 写索引比 新容量大,那么才需要改变读写索引的值
            if (writerIndex() > newCapacity) {
                // 写索引的值就是新容量newCapacity,
                // 而读索引的值是读索引和新容量newCapacity之间的较小值
                setIndex0(Math.min(readerIndex(), newCapacity), newCapacity);
            }
        }
        @Override
        public ByteBuf capacity(int newCapacity) {
            // 检查新容量是否越界
            checkNewCapacity(newCapacity);
            byte[] oldArray = array;
            int oldCapacity = oldArray.length;
            // 新老容量相等,那么不用改变,直接返回
            if (newCapacity == oldCapacity) {
                return this;
            }
    
            // 需要复制的内容字节数
            int bytesToCopy;
            if (newCapacity > oldCapacity) {
                // 新容量比老容量大,即扩大缓存区,
                // 那么复制的内容字节数就是老容量大小
                bytesToCopy = oldCapacity;
            } else {
                // 新容量比老容量小,就要缩小缓存区
                // 那么读写索引也要改变。
                trimIndicesToCapacity(newCapacity);
                // 需要复制的内容字节数也就是新容量大小
                bytesToCopy = newCapacity;
            }
            // 创建新的字节数组
            byte[] newArray = allocateArray(newCapacity);
            // 进行内容 copy
            System.arraycopy(oldArray, 0, newArray, 0, bytesToCopy);
            setArray(newArray);
            // 释放老的字节数组
            freeArray(oldArray);
            return this;
        }
    

    你会发现当更改缓存区容量时,分为两种情况:

    • 当新容量比老容量大的时候,即扩大缓存区,这时比较简单,只需要将老的缓存区字节数组数据复制到新的缓存区字节数组数据中就可以了。
    • 当新容量比老容量小的时候,即缩小缓存区,这个时候除了数据内容的复制,可能还需要更改读写索引。
    • 我记得在netty 老版本中,capacity(int newCapacity) 方法还涉及标记索引的更改,这个就非常华而不实,没有实际意义。现在的版本中已经取消掉了。

    1.4.4 get 系列方法

    1.4.4.1 获取基本数据类型

        @Override
        public byte getByte(int index) {
            ensureAccessible();
            return _getByte(index);
        }
    
        @Override
        protected byte _getByte(int index) {
            return HeapByteBufUtil.getByte(array, index);
        }
    
        @Override
        public short getShort(int index) {
            ensureAccessible();
            return _getShort(index);
        }
    
        @Override
        protected short _getShort(int index) {
            return HeapByteBufUtil.getShort(array, index);
        }
       ......
    

    你会发现最后都是调用 HeapByteBufUtil 对应方法,这个类 HeapByteBufUtil 我们后面再说。

    1.4.4.2 getBytes(int index, ByteBuf dst, int dstIndex, int length)

        @Override
        public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
            // 检查是否越界
            checkDstIndex(index, length, dstIndex, dst.capacity());
            if (dst.hasMemoryAddress()) {
                // 如果目标缓存区使用底层内存地址存储数据时
                // 调用 PlatformDependent.copyMemory 方法,
                // 将本缓存区字节数组array中的内容复制到目标缓存区的存储空间中
                PlatformDependent.copyMemory(array, index, dst.memoryAddress() + dstIndex, length);
            } else if (dst.hasArray()) {
                // 如果目标缓存区也是堆缓存区,那么目标缓存区就有字节数组 dst.array()
                // 调用 getBytes(int, byte[], int, int) 方法,进行字节数组之间的复制
                getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length);
            } else {
                // 如果目标缓存区是直接缓存区,
                // 那么就调用目标缓存区的 setBytes(int, byte[], int, int) 方法,
                // 将本缓存区字节数组 array 传递过去,进行内容复制
                dst.setBytes(dstIndex, array, index, length);
            }
            return this;
        }
    

    根据目标缓存区dst类型不同,使用的方式也不同。

    1.4.4.3 getBytes(int index, byte[] dst, int dstIndex, int length)

        @Override
        public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
            checkDstIndex(index, length, dstIndex, dst.length);
            // 使用 System.arraycopy 方法进行内容复制
            System.arraycopy(array, index, dst, dstIndex, length);
            return this;
        }
    

    1.4.4.4 getBytes(int index, ByteBuffer dst)

        @Override
        public ByteBuf getBytes(int index, ByteBuffer dst) {
            ensureAccessible();
            // 利用NIO缓存区的 put 方法进行内容复制
            dst.put(array, index, dst.remaining());
            return this;
        }
    

    1.4.4.5 getBytes(int index, OutputStream out, int length)

        @Override
        public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
            ensureAccessible();
            // 输出流直接写入数据
            out.write(array, index, length);
            return this;
        }
    

    1.4.4.6 getBytes(int index, GatheringByteChannel out, int length)

        @Override
        public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
            ensureAccessible();
            return getBytes(index, out, length, false);
        }
    
        @Override
        public int getBytes(int index, FileChannel out, long position, int length) throws IOException {
            ensureAccessible();
            return getBytes(index, out, position, length, false);
        }
    
        private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {
            ensureAccessible();
            ByteBuffer tmpBuf;
            if (internal) {
                // 内部使用的 NIO缓存区
                tmpBuf = internalNioBuffer();
            } else {
                // 将字节数组 array 转换成一个 NIO缓存区
                tmpBuf = ByteBuffer.wrap(array);
            }
            // 将NIO缓存区数据写入到 out 中
            return out.write((ByteBuffer) tmpBuf.clear().position(index).limit(index + length));
        }
    

    FileChannel 就是 GatheringByteChannel 的子类。

    1.4.5 set 系列方法

    1.4.5.1 设置基本数据类型

        @Override
        public ByteBuf setByte(int index, int value) {
            ensureAccessible();
            _setByte(index, value);
            return this;
        }
    
        @Override
        protected void _setByte(int index, int value) {
            HeapByteBufUtil.setByte(array, index, value);
        }
    
        @Override
        public ByteBuf setShort(int index, int value) {
            ensureAccessible();
            _setShort(index, value);
            return this;
        }
    
        @Override
        protected void _setShort(int index, int value) {
            HeapByteBufUtil.setShort(array, index, value);
        }
    ........
    

    都是调用 HeapByteBufUtil对应方法,这个类HeapByteBufUtil 我们后面再说。

    1.4.5.2 setBytes(int index, ByteBuf src, int srcIndex, int length)

        @Override
        public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
            // 检查是否越界
            checkSrcIndex(index, length, srcIndex, src.capacity());
            if (src.hasMemoryAddress()) {
                // 如果源缓存区使用底层内存地址存储数据时
                // 调用 PlatformDependent.copyMemory 方法,
                // 将源缓存区的内容复制到此缓存区的字节数组 array 中
                PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, array, index, length);
            } else  if (src.hasArray()) {
                // 如果源缓存区也是堆缓存区,那么源缓存区就有字节数组 src.array()
                // 调用 setBytes(int, byte[], int, int) 方法,进行字节数组之间的复制
                setBytes(index, src.array(), src.arrayOffset() + srcIndex, length);
            } else {
                // 如果源缓存区是直接缓存区,
                // 那么就调用源缓存区的 getBytes(int, byte[], int, int) 方法,
                // 将本缓存区字节数组 array 传递过去,将源缓存区内容复制到字节数组 array中
                src.getBytes(srcIndex, array, index, length);
            }
            return this;
        }
    

    1.4.5.3 setBytes(int index, byte[] src, int srcIndex, int length)

        @Override
        public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
            checkSrcIndex(index, length, srcIndex, src.length);
            // 使用 System.arraycopy 方法进行内容复制
            System.arraycopy(src, srcIndex, array, index, length);
            return this;
        }
    

    1.4.5.4 setBytes(int index, ByteBuffer src)

        @Override
        public ByteBuf setBytes(int index, ByteBuffer src) {
            ensureAccessible();
            // 利用NIO缓存区的 get 方法, 将NIO缓存区的内容复制到字节数组 array 中
            src.get(array, index, src.remaining());
            return this;
        }
    

    1.4.5.5 setBytes(int index, InputStream in, int length)

        @Override
        public int setBytes(int index, InputStream in, int length) throws IOException {
            ensureAccessible();
            // 将输入流 in 的内容读取到字节数组 array 中,返回读取的字节数
            return in.read(array, index, length);
        }
    

    1.4.5.6 setBytes(int index, ScatteringByteChannel in, int length)

        @Override
        public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
            ensureAccessible();
            try {
                // 通过 internalNioBuffer() 获取本缓存区内容的 NIO缓存对象,然后进行数据读取
                return in.read((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length));
            } catch (ClosedChannelException ignored) {
                return -1;
            }
        }
    
        @Override
        public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
            ensureAccessible();
            try {
                // 通过 internalNioBuffer() 获取本缓存区内容的 NIO缓存对象,然后进行数据读取
                return in.read((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length), position);
            } catch (ClosedChannelException ignored) {
                return -1;
            }
        }
    

    1.4.6 转换 NIO 缓存区方法

    1. nioBufferCount()
       public int nioBufferCount() {
           return 1;
       }
      
      返回 1,表示这个缓存区支持转换成NIO缓存区。

      返回 -1 ,表示这个缓存区不支持转换成NIO缓存区。

    2. nioBuffer(int index, int length)
       @Override
       public ByteBuffer nioBuffer(int index, int length) {
           ensureAccessible();
           return ByteBuffer.wrap(array, index, length).slice();
       }
      
      通过 ByteBuffer.wrap 方法创建NIO缓存区。
    3. nioBuffers(int index, int length)
       @Override
       public ByteBuffer[] nioBuffers(int index, int length) {
           return new ByteBuffer[] { nioBuffer(index, length) };
       }
      
    4. internalNioBuffer(int index, int length)
       @Override
       public ByteBuffer internalNioBuffer(int index, int length) {
           checkIndex(index, length);
           return (ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length);
       }
      
    5. internalNioBuffer()
       private ByteBuffer internalNioBuffer() {
           ByteBuffer tmpNioBuf = this.tmpNioBuf;
           if (tmpNioBuf == null) {
               this.tmpNioBuf = tmpNioBuf = ByteBuffer.wrap(array);
           }
           return tmpNioBuf;
       }
      
      这个方法也是利用ByteBuffer.wrap 方法创建NIO缓存区,但是它与nioBuffer(int index, int length) 方法不同时,它会使用 tmpNioBuf 变量记录一下,不同每次都创建。

    1.4.7 deallocate()

        @Override
        protected void deallocate() {
            // 是否字节数组
            freeArray(array);
            // 将 array 变成空字节数组
            array = EmptyArrays.EMPTY_BYTES;
        }
    

    我们知道这个方法是在 AbstractReferenceCountedByteBuf 类中定义的,当引用计数变成 0 的时候,就会调用这个 deallocate() 方法,释放持有的资源。

    二. UnpooledUnsafeHeapByteBuf

    仔细阅读 UnpooledUnsafeHeapByteBuf 源码,你会发现这个类很简单,它是 UnpooledHeapByteBuf 的子类,与 UnpooledHeapByteBuf 区别就两个方面。

    2.1 创建字节数组方式

        @Override
        protected byte[] allocateArray(int initialCapacity) {
            return PlatformDependent.allocateUninitializedArray(initialCapacity);
        }
    

    UnpooledUnsafeHeapByteBuf 是通过 PlatformDependent.allocateUninitializedArray 创建数组,利用 Unsafe 来加快数据的访问。

    2.2 获取基本数据类型方式

        @Override
        public byte getByte(int index) {
            checkIndex(index);
            return _getByte(index);
        }
    
        @Override
        protected byte _getByte(int index) {
            return UnsafeByteBufUtil.getByte(array, index);
        }
    
        @Override
        public short getShort(int index) {
            checkIndex(index, 2);
            return _getShort(index);
        }
    
        @Override
        protected short _getShort(int index) {
            return UnsafeByteBufUtil.getShort(array, index);
        }
        ...... 
    

    UnpooledUnsafeHeapByteBuf 是通过 UnsafeByteBufUtil 工具类获取基本数据类型的数据。

    三. HeapByteBufUtil

    先明确一个概念,什么是大端,什么是小端。

    我们知道最小的单位是字节byte,一个字节是8位。但是 java 中很多基本类型需要多个字节表示。

    • 例如 java 中的 short 类型,它是由两个字节组成的。那么它的第一个字节是表示低八位的数据还是高八位的数据呢。
    • 你可能想当然以为,肯定是表示高八位,符合从高到低的逻辑,这种就属于大端类型。如果从低到高存储数据,这个就属于小端类型。
    • 小端类型的意义是,如果是一个 long 类型,它有八个字节,储存了一个比较小的数,一个字节就可以存储,其他七个字节都是 0。如果你有大端类型,要读取到有效数据,必须高七个字节全部读取完,才能读取有效数据那个字节。

    3.1 从字节数组中获取基本数据类型

    例如获取 short

        static short getShort(byte[] memory, int index) {
            // 因为是大端,所以第一个字节是高8位的数据,
            // 因此需要将第一个字节memory[index]右移8位(memory[index] << 8)
            return (short) (memory[index] << 8 | memory[index + 1] & 0xFF);
        }
    
        static short getShortLE(byte[] memory, int index) {
            // 因为是小端,所以第一个字节是低8位,第二个字节是高八位,
            // 因此需要将第二个字节memory[index + 1]右移8位(memory[index + 1] << 8)
            return (short) (memory[index] & 0xff | memory[index + 1] << 8);
        }
    

    就是通过右移位运算和 或| 位运算,实现数的拼接。

    3.2 从字节数组中获取基本数据类型

        static void setShort(byte[] memory, int index, int value) {
            // 利用左移位运算,将short 类型高八位的数据移动到低八位,转换成一个 byte 类型存储
            memory[index]     = (byte) (value >>> 8);
            // 直接使用 (byte) 类型强转,只会保留低八位的数据
            memory[index + 1] = (byte) value;
        }
    
        static void setShortLE(byte[] memory, int index, int value) {
            // 直接使用 (byte) 类型强转,只会保留低八位的数据
            memory[index]     = (byte) value;
            // 利用左移位运算,将short 类型高八位的数据移动到低八位,转换成一个 byte 类型存储
            memory[index + 1] = (byte) (value >>> 8);
        }
    

    利用左移位运算,将高位数据转换成 byte 类型存储;再使用 (byte) 类型强转,只保留低八位的数据存储。

    相关文章

      网友评论

          本文标题:Netty源码_UnpooledHeapByteBuf详解

          本文链接:https://www.haomeiwen.com/subject/lsakaltx.html