ByteBuf

作者: Pillar_Zhong | 来源:发表于2019-08-01 19:03 被阅读0次

    体系结构

    1563803296721.png

    从图可以看出来,ByteBuf分类主要是三个维度:

    • Unpooled和Pooled
    • Unsafe和非Unsafe
    • Heap和Direct

    访问索引

    从图示中可以容易理解ByteBuf的读写区域

    1563801077089.png

    内存分配

    策略

    1563805578302.png 1563805646415.png 1563805753773.png

    可以看到在接口方法里面已经区分了是direct还是heap,在继承体系上区分了是Pooled还是Unpooled

    而在具体的逻辑中,再根据系统底层是否支持Unsafe来决定是否要返回Unsafe的实现。

    UnpooledByteBufAllocator

    heap内存的分配

    1563806190062.png
    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
        // 根据是否支持Unsafe与否来返回具体的UnpooledHeapByteBuf
        return PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity)
                : new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
    }
    

    初始化

    UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        // 直接去调用父类UnpooledHeapByteBuf的构造函数
        // 说明底层的内存分配是一样的,只不过数据操作的方式不一样,会用Unsafe的方式,更快,更直接。
        super(alloc, initialCapacity, maxCapacity);
    }
    
    protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        // 直接新建byte数组,长度为initialCapacity,readindex和writeindex为0
        this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);
    }
    
    private UnpooledHeapByteBuf(
            ByteBufAllocator alloc, byte[] initialArray, int readerIndex, int writerIndex, int maxCapacity) {
    
        super(maxCapacity);
    
        ...
    
        this.alloc = alloc;
        // 保存上面的byte数组作为ByteBuf的存储
        setArray(initialArray);
        // read和write index归零
        setIndex(readerIndex, writerIndex);
    }
    

    getByte

    Unsafe
    // Unsafe的方式操作数据
    protected byte _getByte(int index) {
        return UnsafeByteBufUtil.getByte(array, index);
    }
    
    static byte getByte(byte[] data, int index) {
        // 可以看到,这里直接通过byte数组的内存偏移量加上数组的index来获取数据
        return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);
    }
    
    非Unsafe
    // 非Unsafe的方式操作数据
    protected byte _getByte(int index) {
        return HeapByteBufUtil.getByte(array, index);
    }
    
    // 通过数组下标的方式来操作数组
    static byte getByte(byte[] memory, int index) {
        return memory[index];
    }
    

    direct内存分配

    1563807731297.png
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        // 根据底层对Unsafe的支持与否来决定最终的UnpooledDirectByteBuf
        ByteBuf buf = PlatformDependent.hasUnsafe() ?
                UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    
        return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
    }
    

    初始化

    static UnpooledUnsafeDirectByteBuf newUnsafeDirectByteBuf(
            ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        if (PlatformDependent.useDirectBufferNoCleaner()) {
            return new UnpooledUnsafeNoCleanerDirectByteBuf(alloc, initialCapacity, maxCapacity);
        }
        return new UnpooledUnsafeDirectByteBuf(alloc, initialCapacity, maxCapacity);
    }
    
    非Unsafe
    protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(maxCapacity);
        ...
    
        this.alloc = alloc;
        // NIO的方法,这里直接去堆外分配内存
        setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
    }
    
    private void setByteBuffer(ByteBuffer buffer) {
        ByteBuffer oldBuffer = this.buffer;
        // 而这里对老的buffer进行回收,TODO
        if (oldBuffer != null) {
            if (doNotFree) {
                doNotFree = false;
            } else {
                freeDirect(oldBuffer);
            }
        }
        // 将前面创建的堆外内存保存下来
        this.buffer = buffer;
        tmpNioBuf = null;
        capacity = buffer.remaining();
    }
    
    Unsafe
    protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(maxCapacity);
        ...
    
        this.alloc = alloc;
        // 直接堆外内存分配
        setByteBuffer(allocateDirect(initialCapacity), false);
    }
    
    // 跟非Unsafe类似
    final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
        if (tryFree) {
            ByteBuffer oldBuffer = this.buffer;
            if (oldBuffer != null) {
                if (doNotFree) {
                    doNotFree = false;
                } else {
                    freeDirect(oldBuffer);
                }
            }
        }
        this.buffer = buffer;
        // Unsafe不一样的地方在于,需要知道buffer的内存偏移量,而这个偏移量在allocateDirect会生成
        memoryAddress = PlatformDependent.directBufferAddress(buffer);
        tmpNioBuf = null;
        capacity = buffer.remaining();
    }
    
    static long directBufferAddress(ByteBuffer buffer) {
        // 可以看到这个偏移量会去找ADDRESS_FIELD_OFFSET
        // final Field field = Buffer.class.getDeclaredField("address");
        // 而堆外内存的偏移量早在allocateDirect的时候就已经自动赋给buffer的address了,直接反射去拿就好
        // 这个address只代表堆外内存的偏移量,
        return getLong(buffer, ADDRESS_FIELD_OFFSET);
    }
    

    getByte

    Unsafe
    protected byte _getByte(int index) {
        return UnsafeByteBufUtil.getByte(addr(index));
    }
    
    long addr(int index) {
        // 根据上面保存下来的偏移量来去拿到index的地址
        return memoryAddress + index;
    }
    
    非Unsafe
    protected byte _getByte(int index) {
        // 直接通过api来对buffer进行读取
        return buffer.get(index);
    }
    

    PooledByteBufAllocator

    direct内存分配

    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        // 拿到当前线程绑定的PoolThreadCache
        PoolThreadCache cache = threadCache.get();
        // 进而拿到PoolThreadCache的堆外内存
        PoolArena<ByteBuffer> directArena = cache.directArena;
    
        ByteBuf buf;
        if (directArena != null) {
            // 在当前线程绑定的堆外内存区域请求一块空间
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            if (PlatformDependent.hasUnsafe()) {
                buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
            } else {
                buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
            }
        }
    
        return toLeakAwareBuffer(buf);
    }
    

    相关文章

      网友评论

          本文标题:ByteBuf

          本文链接:https://www.haomeiwen.com/subject/xbmcdctx.html