美文网首页
PooledByteBuf对象、内存复用

PooledByteBuf对象、内存复用

作者: 横渡 | 来源:发表于2019-06-26 12:36 被阅读0次

    PoolThreadCache: PooledByteBufAllocator 实例维护了一个线程变量。
    多种分类的MemoryRegionCache数组用作内存缓存,MemoryRegionCache内部是链表,队列里面存Chunk。
    Pool Chunk里面维护了内存引用,内存复用的做法就是把buf的memory指向Chunck的memory。

    我们看下面这段代码

     @Test
        public void poolTest() {
            System.out.println("测试buf回收====");
            ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
            // tiny
            ByteBuf buf1 = allocator.directBuffer(495); // 分配的内存最大长度为496
            System.out.printf("buf1: 0x%X%n", buf1.memoryAddress());
            buf1.release(); // 此时会被回收到tiny 的512b格子中
    }
    

    跟踪内存分配的部分
    ByteBuf buf1 = allocator.directBuffer(495);
    调用 AbstractByteBufAllocator 的directBuffer方法
    AbstractByteBufAllocator # directBuffer(int initialCapacity)
    因为java平台有unsafe机制,会使用池化的PooledByteBufAllocator:
    PooledByteBufAllocator # newDirectBuffer(int initialCapacity, int maxCapacity)

    我们来看PooledByteBufAllocator类newDirectBuffer的代码:

    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
            // PoolThreadCache 跟线程绑定的分配内存的缓存,使用jemaclloc的方式分配内存
            PoolThreadCache cache = threadCache.get();
           // PoolArena 真正的内存分配管理器
            PoolArena<ByteBuffer> directArena = cache.directArena;
    
            final ByteBuf buf;
            if (directArena != null) {
                // poolArena 分配内存
                buf = directArena.allocate(cache, initialCapacity, maxCapacity);
            } else {
                buf = PlatformDependent.hasUnsafe() ?
                        UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                        new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
            }
    
            return toLeakAwareBuffer(buf);
        }
    

    来看 PoolArena中分配内存的方法:

    PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
            // 首先创建ByteBuf
            PooledByteBuf<T> buf = newByteBuf(maxCapacity);
           // 给ByteBuf分配内存
            allocate(cache, buf, reqCapacity);
            return buf;
        }
    

    PoolArena 内部类 DirectArena 的newByteBuf方法:

     @Override
    protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
      if (HAS_UNSAFE) { // 是否支持unsafe机制
        return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
      } else {
        return PooledDirectByteBuf.newInstance(maxCapacity);
      }
    }
    

    因为支持unsafe机制,走第一个分支PooledUnsafeDirectByteBuf

        static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
            // 尝试从RECYCLER中复用ByteBuf,复用不到再创建ByteBuf对象
            PooledUnsafeDirectByteBuf buf = RECYCLER.get();
            // 给新分配出来的ByteBuf做一些清理工作,如设置引用计数为1,readerIndex、writeIndex分别设置为0
            buf.reuse(maxCapacity);
            return buf;
        }
    

    先尝试从 RECYCLER 中复用ByteBuf,RECYCLER 是一个基于threadlocal(线程封闭)机制的轻量级别对象池。
    我们来看类Recycler中的get()方法:

        public final T get() {
            if (maxCapacityPerThread == 0) {
                return newObject((Handle<T>) NOOP_HANDLE);
            }
            // 基于线程的Stack
            Stack<T> stack = threadLocal.get(); 
            // DefaultHandle 是对象的句柄,里面的value是真正要复用的对象
            DefaultHandle<T> handle = stack.pop();  // 尝试复用
            if (handle == null) { // 复用不到就创建
                handle = stack.newHandle();
                // 因为我们要创建PooledUnsafeDirectByteBuf,这里value的类型就是 PooledUnsafeDirectByteBuf
                handle.value = newObject(handle);
            }
            return (T) handle.value;
        }
    

    我们来看PooledByteBuf中的reuse方法:

     final void reuse(int maxCapacity) {
            maxCapacity(maxCapacity);
            // 引用计数设置为1,会调用父类 AbstractReferenceCountedByteBuf 中的方法
            setRefCnt(1);
            // 调用 AbstractByteBuf中的方法设置readerIndex,writerIndex
            setIndex0(0, 0);
            discardMarks();
        }
    

    至此ByteBuf对象已经创建出来了,接下来看如何为ByteBuf分配内存。我们回到PoolArena 的allocate方法:

        PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
            PooledByteBuf<T> buf = newByteBuf(maxCapacity);
            // 为ByteBuf分配内存
            allocate(cache, buf, reqCapacity);
            return buf;
        }
    

    PoolArena # allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity)

        // 给buf分配内存空间
        private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
            final int normCapacity = normalizeCapacity(reqCapacity); // 将需要的容量转换为16的倍数,不足16的就为16
            if (isTinyOrSmall(normCapacity)) { // capacity < pageSize tiny和small的
                int tableIdx;
                PoolSubpage<T>[] table;
                boolean tiny = isTiny(normCapacity);
                if (tiny) { // < 512 小于512字节,就从tiny缓存区域中分配
                    if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                        // was able to allocate out of the cache so move on
                        return;
                    }
                    tableIdx = tinyIdx(normCapacity);
                    table = tinySubpagePools;
                } else {
                    if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                        // was able to allocate out of the cache so move on
                        return;
                    }
                    tableIdx = smallIdx(normCapacity);
                    table = smallSubpagePools;
                }
    
                final PoolSubpage<T> head = table[tableIdx];
    
                /**
                 * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
                 * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
                 */
                synchronized (head) {
                    final PoolSubpage<T> s = head.next;
                    if (s != head) {
                        assert s.doNotDestroy && s.elemSize == normCapacity;
                        long handle = s.allocate();
                        assert handle >= 0;
                        s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
                        incTinySmallAllocation(tiny);
                        return;
                    }
                }
                synchronized (this) {
                    allocateNormal(buf, reqCapacity, normCapacity);
                }
    
                incTinySmallAllocation(tiny);
                return;
            }
            if (normCapacity <= chunkSize) {
                if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                synchronized (this) {
                    allocateNormal(buf, reqCapacity, normCapacity);
                    ++allocationsNormal;
                }
            } else {
                // Huge allocations are never served via the cache so just call allocateHuge
                allocateHuge(buf, reqCapacity);
            }
        }
    

    PooledThreadCache中有三块缓存区域:tiny,small,normal。首先会根据申请内存的容量去判断分配哪块区域的内存。
    如果在tiny区域分配,会调用PoolThreadCache的allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) 方法:

        boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
            return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
        }
    

    PoolThreadCache#cacheForTiny(PoolArena<?> area, int normCapacity)

        private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
            int idx = PoolArena.tinyIdx(normCapacity);
            if (area.isDirect()) {
                return cache(tinySubPageDirectCaches, idx);
            }
            return cache(tinySubPageHeapCaches, idx);
        }
    

    tinySubPageDirectCaches 是一个数组,每个元素都是一个tiny类型的内存块,PoolArena.tinyIdx(normCapacity) 计算出需要使用哪个内存块,然后使用对应的idx取出内存块。

    相关文章

      网友评论

          本文标题:PooledByteBuf对象、内存复用

          本文链接:https://www.haomeiwen.com/subject/axaqcctx.html