美文网首页
Netty源码七 ByteBuf

Netty源码七 ByteBuf

作者: 横渡 | 来源:发表于2019-08-11 09:50 被阅读0次

    内存分配概述

    介绍netty内存分配,最为底层,负责从底层读据到ByteBuf。

    三个问题
    +内存类别有哪些
    +如何减少多线程内存分配之间的竞争
    +不同大小的内存是如何进行分配的

    主要内容:

    1. 内存与内存管理器的抽象
    2. 不同规格大小和不同类型的内存的分配策略
    3. 内存回收过程

    ByteBuf结构以及重要API

    • ByteBuf结构
      ByteBuf内存结构
     *      +-------------------+------------------+------------------+
     *      | discardable bytes |  readable bytes  |  writable bytes  |
     *      |                   |     (CONTENT)    |                  |
     *      +-------------------+------------------+------------------+
     *      |                   |                  |                  |
     *      0      <=      readerIndex   <=   writerIndex    <=    capacity
    

    +read、write、set方法
    调用read方法时,会移动readerIndex指针;
    调用write方法时,会移动writerIndex指针;
    set方法不移动任何指针。

    • mark和reset方法
      mark方法的作用时保存指针。
      reset方法复原指针位置。

    • readbleBytes和writableBytes、maxWritableBytes
      见名思义。

    ByteBuf分类

    ByteBuf类图


    ByteBuf类图.PNG
    • Pooled和Unpooled
      Pooled从预先分配好的内存中分配;Unpooled直接调用系统api进行内存分配。
    • Unsafe和非Unsafe
      Unsafe:调用jdk的Unsafe拿到ByteBuf的内存地址。
      非Unsafe:不依赖底层unsafe。
    • Heap和Direct
      Heap:直接在堆上内存分配,分配的内存参与gc,分配的内存不需要手动释放,底层是byte[]数组。
      Direct:分配的内存不受 的控制,分配的内存不参与gc,分配的内存需要手动释放,调用jdk的ByteBuffer.allocateDirect(initialCapacity)进行内存分配。

    内存分配器ByteBufAllocator

    • ByteBufAllocator功能
      最顶层抽象ByteBufAllocator,功能:重载的buffer方法,重载的ioBuffer(更希望分配directBuffer),heapBuffer在堆上进行内存分配,directBuffer直接内存分配,compositeBuffer可以把两个byteBuffer合并在一起。

    • AbstractByteBufAllocator
      实现了ByteBufAllocator的大部分功能,留下了两个抽象接口newHeapBuffer,newDirectBuffer进行扩展,从而区分heap和direct内存。

    • ByteBufAllocator两大子类

    ByteBufAllocator分类:


    ByteBufferAllocator.PNG

    ByteBufAllocator的两大子类PooledByteBufAllocator和UnpooledByteBufAllocator,这里是通过子类区分Pooled和Unpooled。
    那么Unsafe和非Unsafe是如何区分的呢?netty是自动判别的,如果底层有unsafe
    对象netty就直接通过Unsafe来分配内存。

    UnpooledByteBufAllocator分析

    • heap内存分配逻辑
    • direct内存分配逻辑

    unsafe会通过内存地址+偏移量的方式去拿到对应的数据;而非unsafe是通过数组+下标或者jdk底层的ByteBuffer的api拿数据。一般情况下通过unsafe操作内存比非unsafe的方式效率要高。

    PooledByteBufAllocator概述

    首先它下面分了两类内存,newHeapBuffer和newDirectBuffer,这两类内存的分配过程大致相同,我们来分析newDirectBuffer。

        protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
            PoolThreadCache cache = threadCache.get();
            PoolArena<ByteBuffer> directArena = cache.directArena;
    
            final ByteBuf buf;
            if (directArena != null) {
                buf = directArena.allocate(cache, initialCapacity, maxCapacity);
            } else {
                buf = PlatformDependent.hasUnsafe() ?
                        UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                        new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
            }
    
            return toLeakAwareBuffer(buf);
        }
    
    1. 拿到线程局部缓存PoolThreadCache
      因为newDirectBuffer可能被多线程调用
    final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
            private final boolean useCacheForAllThreads;
    
            PoolThreadLocalCache(boolean useCacheForAllThreads) {
                this.useCacheForAllThreads = useCacheForAllThreads;
            }
    
            @Override
            protected synchronized PoolThreadCache initialValue() {
                // 拿到 heapArena 和 directArena ;然后创建PoolThreadCache
                final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
                final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
    
                Thread current = Thread.currentThread();
                if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
                    return new PoolThreadCache(
                            heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
                            DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
                }
                // No caching so just use 0 as sizes.
                return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
            }
    
            @Override
            protected void onRemoval(PoolThreadCache threadCache) {
                threadCache.free();
            }
    
            private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
                if (arenas == null || arenas.length == 0) {
                    return null;
                }
    
                PoolArena<T> minArena = arenas[0];
                for (int i = 1; i < arenas.length; i++) {
                    PoolArena<T> arena = arenas[i];
                    if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
                        minArena = arena;
                    }
                }
    
                return minArena;
            }
        }
    
    

    FastThreadLocal实际上是一个更快的ThreadLocal,从这里看出每个线程都有一个PoolThreadCache 。

    1. 在线程局部缓存的Arena上进行内存分配
      线程局部缓存维护着两大内存,一个是堆相关的内存,一个是堆外相关的内存。我们拿堆外内存相关的逻辑进行分析。
      heapArena和directArena是在创建PoolThreadCache的时候传递进来的,见上面initialValue代码。
      在创建内存构造器PooledByteBufAllocator的时候会创建两大内存heapArena和directArena,我们来看构造函数。
    public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
                                      int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                                      boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
            super(preferDirect);
            threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
            this.tinyCacheSize = tinyCacheSize;
            this.smallCacheSize = smallCacheSize;
            this.normalCacheSize = normalCacheSize;
            chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
    
            checkPositiveOrZero(nHeapArena, "nHeapArena");
            checkPositiveOrZero(nDirectArena, "nDirectArena");
    
            checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment");
            if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
                throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
            }
    
            if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
                throw new IllegalArgumentException("directMemoryCacheAlignment: "
                        + directMemoryCacheAlignment + " (expected: power of two)");
            }
    
            int pageShifts = validateAndCalculatePageShifts(pageSize);
    
            if (nHeapArena > 0) {
                //heapArena初始化
                heapArenas = newArenaArray(nHeapArena);
                List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
                for (int i = 0; i < heapArenas.length; i ++) {
                    PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
                            pageSize, maxOrder, pageShifts, chunkSize,
                            directMemoryCacheAlignment);
                    heapArenas[i] = arena;
                    metrics.add(arena);
                }
                heapArenaMetrics = Collections.unmodifiableList(metrics);
            } else {
                heapArenas = null;
                heapArenaMetrics = Collections.emptyList();
            }
    
            if (nDirectArena > 0) {
                //directArena初始化
                directArenas = newArenaArray(nDirectArena);
                List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
                for (int i = 0; i < directArenas.length; i ++) {
                    PoolArena.DirectArena arena = new PoolArena.DirectArena(
                            this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
                    directArenas[i] = arena;
                    metrics.add(arena);
                }
                directArenaMetrics = Collections.unmodifiableList(metrics);
            } else {
                directArenas = null;
                directArenaMetrics = Collections.emptyList();
            }
            metric = new PooledByteBufAllocatorMetric(this);
        }
    

    heapArena 初始化heapArenas = newArenaArray(nHeapArena);;directArena初始化directArenas = newArenaArray(nDirectArena);
    我们来看上述构造函数的nHeapArena和nDirectArena从哪里来的,往上跟代码:

        public PooledByteBufAllocator(boolean preferDirect) {
            this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
        }
    

    看DEFAULT_NUM_DIRECT_ARENA怎么来的。默认情况下defaultMinNumArena是小于runtime.maxMemory() / defaultChunkSize / 2 / 3)的,所以 DEFAULT_NUM_DIRECT_ARENA默认情况下是两倍的cpu核数。DEFAULT_NUM_HEAP_ARENA也是同理。为什么要创建两倍的cpu核心数的Arena?因为在前面创建NIO线程的时候也是默认两倍的cpu核心数,也就是说每个线程都有一个独享的Arena,对arena数组中的每个Arena它其实在分配线程的时候是不用加锁的。

    /*
             * We use 2 * available processors by default to reduce contention as we use 2 * available processors for the
             * number of EventLoops in NIO and EPOLL as well. If we choose a smaller number we will run into hot spots as
             * allocation and de-allocation needs to be synchronized on the PoolArena.
             *
             * See https://github.com/netty/netty/issues/3888.
             */
            // 两倍的cpu核心数
            final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
            final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
            DEFAULT_NUM_HEAP_ARENA = Math.max(0,
                    SystemPropertyUtil.getInt(
                            "io.netty.allocator.numHeapArenas",
                            (int) Math.min(
                                    defaultMinNumArena,
                                    runtime.maxMemory() / defaultChunkSize / 2 / 3)));
    

    我们来看下PooledByteBufAllocator内存分配器(假设有四个NIO线程)的结构示意图:


    PooledByteBufAllocator结构图.png

    图中有四个NIO线程,通过我们前面的代码分析我们知道分别有4个heapArena和4个directArena,逻辑基本上是相同的,我们在图中统称为Arena。
    PooledByteBufAllocator在分配ByteBuf时候是怎么做的呢?首先通过PoolThreadCache拿到对应的Arena对象;PooledThreadCache的作用通过ThreadLocal把内存分配器中其中的一个Arena塞到它的成员变量里边,然后当每个NIO线程去调用它的get方法的时候,会拿到它底层的一个Arena,这样就可以把线程和Arena进行一个绑定。PooledByteBufAllocator除了可以在Arena上进行分配内存还可以在它底层维护的ByteBuf缓存列表上分配内存。
    举个例子,当我第一次分配了1024个字节的内存大小使用完了之后,需要第二次分配1024字节的内存。这个时候其实不需要在Arena上进行内存分配,而是通过PoolThreadCache里边维护的一个缓存列表中取出返回即可。

    PooledByteBufAllocator里边维护了三个类型的ByteBuf缓存的大小,tinyCacheSize,smallCaheSize,normalCacheSize,在PoolThreadCache初始化的时候使用到了这三个值,

    PoolThreadCache的构造函数

       PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
                        int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                        int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
            checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
            this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
            this.heapArena = heapArena;
            this.directArena = directArena;
            if (directArena != null) {
                // 创建缓存对象
                tinySubPageDirectCaches = createSubPageCaches(
                        tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
                smallSubPageDirectCaches = createSubPageCaches(
                        smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
    
                numShiftsNormalDirect = log2(directArena.pageSize);
                normalDirectCaches = createNormalCaches(
                        normalCacheSize, maxCachedBufferCapacity, directArena);
    
                directArena.numThreadCaches.getAndIncrement();
            } else {
                // No directArea is configured so just null out all caches
                tinySubPageDirectCaches = null;
                smallSubPageDirectCaches = null;
                normalDirectCaches = null;
                numShiftsNormalDirect = -1;
            }
            if (heapArena != null) {
                // Create the caches for the heap allocations
                tinySubPageHeapCaches = createSubPageCaches(
                        tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
                smallSubPageHeapCaches = createSubPageCaches(
                        smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
    
                numShiftsNormalHeap = log2(heapArena.pageSize);
                normalHeapCaches = createNormalCaches(
                        normalCacheSize, maxCachedBufferCapacity, heapArena);
    
                heapArena.numThreadCaches.getAndIncrement();
            } else {
                // No heapArea is configured so just null out all caches
                tinySubPageHeapCaches = null;
                smallSubPageHeapCaches = null;
                normalHeapCaches = null;
                numShiftsNormalHeap = -1;
            }
    
            // Only check if there are caches in use.
            if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
                    || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
                    && freeSweepAllocationThreshold < 1) {
                throw new IllegalArgumentException("freeSweepAllocationThreshold: "
                        + freeSweepAllocationThreshold + " (expected: > 0)");
            }
        }
    

    创建缓存对象的方法:

        private static <T> MemoryRegionCache<T>[] createSubPageCaches(
                int cacheSize, int numCaches, SizeClass sizeClass) {
            if (cacheSize > 0 && numCaches > 0) {
                @SuppressWarnings("unchecked")
                MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
                for (int i = 0; i < cache.length; i++) {
                    // TODO: maybe use cacheSize / cache.length
                    // 创建cache
                    cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
                }
                return cache;
            } else {
                return null;
            }
        }
    

    创建缓存对象中的每个元素:

        private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
            SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
                super(size, sizeClass);
            }
    
            @Override
            protected void initBuf(
                    PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) {
                chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity);
            }
        }
    
            MemoryRegionCache(int size, SizeClass sizeClass) {
                // size缓存的内存规格
                this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
                // queue这种内存规格的缓存最终有多少个
                queue = PlatformDependent.newFixedMpscQueue(this.size);
                this.sizeClass = sizeClass;
            }
    

    7. directArena分配direct内存的流程

    • 从对象池里拿到PooledByteBuf进行复用
        @Override
        protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
            PoolThreadCache cache = threadCache.get();
            PoolArena<ByteBuffer> directArena = cache.directArena;
    
            final ByteBuf buf;
            if (directArena != null) {
                buf = directArena.allocate(cache, initialCapacity, maxCapacity);
            } else {
                buf = PlatformDependent.hasUnsafe() ?
                        UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                        new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
            }
    
            return toLeakAwareBuffer(buf);
        }
    

    看 directArena.allocate

        PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
            // 创建PooledByteBuf
            PooledByteBuf<T> buf = newByteBuf(maxCapacity);
            // 从cache中为PooledByteBuf分配内存
            allocate(cache, buf, reqCapacity);
            return buf;
        }
    

    我们来看newByteBuf,DirectArena中的实现分配对外内存

            @Override
            protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
                if (HAS_UNSAFE) { // 默认采用unsafe方式
                    return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
                } else {
                    return PooledDirectByteBuf.newInstance(maxCapacity);
                }
            }
    

    看newInstance:

        static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
            // 从可回收的对象池中拿到ByteBuf,对象池中没有就直接创建一个
            PooledUnsafeDirectByteBuf buf = RECYCLER.get();
            buf.reuse(maxCapacity); // 进行复用,设置capacity,引用次数,readerIndex,writerIndex,重置标志位
            return buf; // 拿到纯净的ByteBuf对象
        }
    

    我们看RECYCLE:

        private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
            @Override
            protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) { 
                return new PooledUnsafeDirectByteBuf(handle, 0); // RECYCLER 没有就创建一个ByteBuf,handle负责ByteBuf对象的回收
            }
        };
    

    内存分配第一步拿到了ByteBuf,接下来从PoolThreadCache上进行内存分配。

        private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
            final int normCapacity = normalizeCapacity(reqCapacity);
            if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
                int tableIdx;
                PoolSubpage<T>[] table;
                boolean tiny = isTiny(normCapacity);
                if (tiny) { // < 512
                    if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                        // was able to allocate out of the cache so move on
                        return;
                    }
                    tableIdx = tinyIdx(normCapacity);
                    table = tinySubpagePools;
                } else {
                    if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                        // was able to allocate out of the cache so move on
                        return;
                    }
                    tableIdx = smallIdx(normCapacity);
                    table = smallSubpagePools;
                }
    
                // 以上尝试在缓存上进行内存分配,如果没有成功,会进行实际内存分配
                final PoolSubpage<T> head = table[tableIdx];
    
                /**
                 * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
                 * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
                 */
                synchronized (head) {
                    final PoolSubpage<T> s = head.next;
                    if (s != head) {
                        assert s.doNotDestroy && s.elemSize == normCapacity;
                        long handle = s.allocate();
                        assert handle >= 0;
                        s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
                        incTinySmallAllocation(tiny);
                        return;
                    }
                }
                synchronized (this) {
                    allocateNormal(buf, reqCapacity, normCapacity);
                }
    
                incTinySmallAllocation(tiny);
                return;
            }
            if (normCapacity <= chunkSize) { // 这里是个特例,如果分配的内存大于chunkSize就分配一个allocateHuge
                if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                synchronized (this) {
                    allocateNormal(buf, reqCapacity, normCapacity);
                    ++allocationsNormal;
                }
            } else {
                // allocateHuge不是从缓存上分配的
                // Huge allocations are never served via the cache so just call allocateHuge
                allocateHuge(buf, reqCapacity);
            }
        }
    

    allocate实际上包含两大步骤,第一步先从缓存上进行内存分配,第二步从内存堆里面进行内存分配。

    • allocate 从缓存上进行内存分配
    • 从内存堆里面进行内存分配

    8. 内存规格介绍

    Netty内存规格示意图.png

    内存临界值:0,512B,8K,16M
    tiny: 0-512B
    small: 512B-8K
    normal:8k-16M
    huge:>16M
    为什么把16M作为一个内存分界点?16M对应的一个chunk,所有的内存申请是以chunk为单位到操作系统进行申请的;然后所有ByteBuf的内存分配,都是在chunk里边进行操作;比如要分配一个1M的内存,我要先从操作系统中申请一个16M的chunk,然后从16M里取一段内存当作1M,然后把这1M对应的连续内存分配给ByteBuf。

    为什么会有一个8k的内存临界点?netty里面把8k当作一个page进行内存分配的。从系统申请到了16M的内存,这是比较大的。这时候netty对16M的内存进行切分,切分的方式就是以Page进行切分。也就是一个chunk切分成了2048个page,分配16k内存时只需要取2个page。

    0-8k的内存对象在netty中叫做subPage。如果申请一个10B的内存还是以page进行分配内存,这样就会很浪费,这时候就能看到了subPage的作用。

    9. 命中缓存的分配流程

    看allocate方法:

        private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
            final int normCapacity = normalizeCapacity(reqCapacity);
            if (isTinyOrSmall(normCapacity)) { // capacity < pageSize,pageSize是8k
                int tableIdx;
                PoolSubpage<T>[] table;
                boolean tiny = isTiny(normCapacity);
                if (tiny) { // < 512
                    if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                        // was able to allocate out of the cache so move on
                        return;
                    }
                    tableIdx = tinyIdx(normCapacity);
                    table = tinySubpagePools;
                } else {
                    if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                        // was able to allocate out of the cache so move on
                        return;
                    }
                    tableIdx = smallIdx(normCapacity);
                    table = smallSubpagePools;
                }
    
                final PoolSubpage<T> head = table[tableIdx];
    
                /**
                 * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
                 * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
                 */
                synchronized (head) {
                    final PoolSubpage<T> s = head.next;
                    if (s != head) {
                        assert s.doNotDestroy && s.elemSize == normCapacity;
                        long handle = s.allocate();
                        assert handle >= 0;
                        s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
                        incTinySmallAllocation(tiny);
                        return;
                    }
                }
                synchronized (this) {
                    allocateNormal(buf, reqCapacity, normCapacity);
                }
    
                incTinySmallAllocation(tiny);
                return;
            }
            if (normCapacity <= chunkSize) {
                if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                synchronized (this) {
                    allocateNormal(buf, reqCapacity, normCapacity);
                    ++allocationsNormal;
                }
            } else {
                // Huge allocations are never served via the cache so just call allocateHuge
                allocateHuge(buf, reqCapacity);
            }
        }
    
    1. 首先进行内存的规格化;
        int normalizeCapacity(int reqCapacity) {
            checkPositiveOrZero(reqCapacity, "reqCapacity");
    
            if (reqCapacity >= chunkSize) { // >16M直接返回
                return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
            }
    
            if (!isTiny(reqCapacity)) { // >= 512
                // Doubled
    
                int normalizedCapacity = reqCapacity;
                normalizedCapacity --;
                normalizedCapacity |= normalizedCapacity >>>  1;
                normalizedCapacity |= normalizedCapacity >>>  2;
                normalizedCapacity |= normalizedCapacity >>>  4;
                normalizedCapacity |= normalizedCapacity >>>  8;
                normalizedCapacity |= normalizedCapacity >>> 16;
                normalizedCapacity ++;
    
                if (normalizedCapacity < 0) {
                    normalizedCapacity >>>= 1;
                }
                assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;
    
                return normalizedCapacity;
            }
    
            if (directMemoryCacheAlignment > 0) {
                return alignCapacity(reqCapacity);
            }
    
            // Quantum-spaced
            if ((reqCapacity & 15) == 0) {
                return reqCapacity;
            }
    
            return (reqCapacity & ~15) + 16;
        }
    

    分配缓存的大致步骤:

    • 找到对应size的MemoryRegionCache;
    • 从queue中弹出一个entry给ByteBuf初始化
      entry里面有chunk,代表一段连续的内存,chunk分配一段连续内存给ByteBuf,ByteBuf就可以对这段内存进行数据读写。
    • 将弹出的entry丢到对象池中进行复用
      netty为了尽量对分配的内存进行复用,是通过RECYCLE进行管理内存的。减少gc,减少对象池重复的创建和销毁。
      cache.allocateTiny(this, buf, reqCapacity, normCapacity)为例,来说明上述三个步骤。
    1. 找到对应size的MemoryRegionCache
        /**
         * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
         */
        boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
            return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
        }
    
        private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
            int idx = PoolArena.tinyIdx(normCapacity);  // 计算数组下标,根据下标去取MemoryRegionCache
            if (area.isDirect()) {
                return cache(tinySubPageDirectCaches, idx);
            }
            return cache(tinySubPageHeapCaches, idx);
        }
    
      // 容量除以16就能求出下标
        static int tinyIdx(int normCapacity) {
            return normCapacity >>> 4;
        }
    
    1. 从queue中弹出一个entry给ByteBuf初始化
        @SuppressWarnings({ "unchecked", "rawtypes" })
        private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
            if (cache == null) {
                // no cache found so just return false here
                return false;
            }
            boolean allocated = cache.allocate(buf, reqCapacity);
            if (++ allocations >= freeSweepAllocationThreshold) {
                allocations = 0;
                trim();
            }
            return allocated;
        }
    
            public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
                Entry<T> entry = queue.poll();
                if (entry == null) {
                    return false;
                }
              // 初始化
                initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity);
                entry.recycle();
    
                // allocations is not thread-safe which is fine as this is only called from the same thread all time.
                ++ allocations;
                return true;
            }
    
        private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
            SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
                super(size, sizeClass);
            }
    
            @Override
            protected void initBuf( // subPage初始化
                    PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) {
                chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity);
            }
        }
    
        void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity) {
            initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx(handle), reqCapacity);
        }
    
        private void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer,
                                        long handle, int bitmapIdx, int reqCapacity) {
            assert bitmapIdx != 0;
    
            int memoryMapIdx = memoryMapIdx(handle);
    
            PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
            assert subpage.doNotDestroy;
            assert reqCapacity <= subpage.elemSize;
    
            buf.init(
                this, nioBuffer, handle,
                runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
                    reqCapacity, subpage.elemSize, arena.parent.threadCache());
        }
    
        void init(PoolChunk<T> chunk, ByteBuffer nioBuffer,
                  long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
            init0(chunk, nioBuffer, handle, offset, length, maxLength, cache);
        }
    
        private void init0(PoolChunk<T> chunk, ByteBuffer nioBuffer,
                           long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
            assert handle >= 0;
            assert chunk != null;
    
            this.chunk = chunk; // 向系统申请的内存块
            memory = chunk.memory;
            tmpNioBuf = nioBuffer;
            allocator = chunk.arena.parent;
            this.cache = cache;
            this.handle = handle; // 指向内存块
            this.offset = offset;
            this.length = length;
            this.maxLength = maxLength;
        }
    
    1. 将弹出的entry丢到对象池中进行复用
      entry.recycle();
            static final class Entry<T> {
                final Handle<Entry<?>> recyclerHandle;
                PoolChunk<T> chunk;
                ByteBuffer nioBuffer;
                long handle = -1;
    
                Entry(Handle<Entry<?>> recyclerHandle) {
                    this.recyclerHandle = recyclerHandle;
                }
    
                void recycle() {  // 参数初始化设置
                    chunk = null;
                    nioBuffer = null;
                    handle = -1;
                    recyclerHandle.recycle(this);
                }
            }
    
            @Override
            public void recycle(Object object) {
                if (object != value) {
                    throw new IllegalArgumentException("object does not belong to handle");
                }
    
                Stack<?> stack = this.stack;
                if (lastRecycledId != recycleId || stack == null) {
                    throw new IllegalStateException("recycled already");
                }
    
                stack.push(this); // 压入堆栈中
            }
    

    10. 命中缓存的分配逻辑

    • netty中缓存相关的数据结构
      MemeoryRegionCache数据结构.png

    Netty中缓存相关的数据结构叫做MemoryRegionCache,它有三部分组成:第一部分是queue,第二部分是sizeClass,第三部分是size。

    首先queue中的每个元素都是一个实体,每个实体中都有一个chunk一个handler。netty中的内存都是以chunk为单位进行分配的,handler都唯一指向一段连续的内存;所以chunk和handler合在一起就可以确定一块内存的大小及其位置,所有的实体组合起来就变成了cache的一个链。从缓存中找对应的链,就可以定位到queue中的一个实体。

    sizeClass是netty的内存规格,huge内存规格是直接分配的,所以MemoryRegionCache中没有。

    size是一小块内存的大小。

    一个MemoryRegionCahe中,每个小块的内存大小是固定的。如果某个MemoryRegionCache中缓存了一个1k的内存块,那么这个MemoryRegionCache中queue缓存的都是1k大小的ByteBuf。内存大小的种类,如果内存规格是tiny的,它的内存大小种类16B的整数倍且不大于512B,别的内存规格可从图示直接看出。

        private abstract static class MemoryRegionCache<T> {
            private final int size;
            private final Queue<Entry<T>> queue;
            private final SizeClass sizeClass;
            private int allocations;
    
            MemoryRegionCache(int size, SizeClass sizeClass) {
                this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
                queue = PlatformDependent.newFixedMpscQueue(this.size);
                this.sizeClass = sizeClass;
            }
          // ...
    }
    

    MemoryRegionCahe 在PoolThreadCache中维护。

    final class PoolThreadCache {
    
        private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
    
        final PoolArena<byte[]> heapArena;
        final PoolArena<ByteBuffer> directArena;
    
        // Hold the caches for the different size classes, which are tiny, small and normal.
        private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
        private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
        private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
        private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
        private final MemoryRegionCache<byte[]>[] normalHeapCaches;
        private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
        
    // ...
    }
    

    创建MemoryRegionCache:

        PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
                        int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                        int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
            checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
            this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
            this.heapArena = heapArena;
            this.directArena = directArena;
            if (directArena != null) {
                 // 创建tiny[32]数组
                tinySubPageDirectCaches = createSubPageCaches(
                        tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
                smallSubPageDirectCaches = createSubPageCaches(
                        smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
    
                numShiftsNormalDirect = log2(directArena.pageSize);
                normalDirectCaches = createNormalCaches(
                        normalCacheSize, maxCachedBufferCapacity, directArena);
    
                directArena.numThreadCaches.getAndIncrement();
            } else {
                // No directArea is configured so just null out all caches
                tinySubPageDirectCaches = null;
                smallSubPageDirectCaches = null;
                normalDirectCaches = null;
                numShiftsNormalDirect = -1;
            }
            if (heapArena != null) {
                // Create the caches for the heap allocations
                tinySubPageHeapCaches = createSubPageCaches(
                        tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
                smallSubPageHeapCaches = createSubPageCaches(
                        smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
    
                numShiftsNormalHeap = log2(heapArena.pageSize);
                normalHeapCaches = createNormalCaches(
                        normalCacheSize, maxCachedBufferCapacity, heapArena);
    
                heapArena.numThreadCaches.getAndIncrement();
            } else {
                // No heapArea is configured so just null out all caches
                tinySubPageHeapCaches = null;
                smallSubPageHeapCaches = null;
                normalHeapCaches = null;
                numShiftsNormalHeap = -1;
            }
    
            // Only check if there are caches in use.
            if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
                    || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
                    && freeSweepAllocationThreshold < 1) {
                throw new IllegalArgumentException("freeSweepAllocationThreshold: "
                        + freeSweepAllocationThreshold + " (expected: > 0)");
            }
        }
    

    来看创建tiny[32]

     tinySubPageDirectCaches = createSubPageCaches(
                        tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
    

    tinyCacheSize是在内存分配器中维护的,默认512;PoolArena.numTinySubpagePools,默认512 >>> 4,512右移4位,相当于512除以16,也就是32。
    创建数组,数组长度就是32:

        private static <T> MemoryRegionCache<T>[] createSubPageCaches(
                int cacheSize, int numCaches, SizeClass sizeClass) {
            if (cacheSize > 0 && numCaches > 0) {
                @SuppressWarnings("unchecked")
                MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
                for (int i = 0; i < cache.length; i++) {
                    // TODO: maybe use cacheSize / cache.length
                    // cacheSize,这里是512
                    cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
                }
                return cache;
            } else {
                return null;
            }
        }
    
    

    继续跟进:

        private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
            SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
                super(size, sizeClass);
            }
    
            @Override
            protected void initBuf(
                    PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) {
                chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity);
            }
        }
    
            MemoryRegionCache(int size, SizeClass sizeClass) {
                this.size = MathUtil.safeFindNextPositivePowerOfTwo(size); // 这里还是512
                queue = PlatformDependent.newFixedMpscQueue(this.size);
                this.sizeClass = sizeClass;
            }
    

    分析到这里可以看到tinySubPageDirectCaches(MemoryRegionCache)最外层有32个节点(SubPageMemoryRegionCache),每个节点表示不同内存规格(16B,32B,...,496B)的一个队列,每个队列的长度默认是512个。

    MemoryRegionCache.png

    11. arena、chunk、page、subpage

    • Arena结构
      Arena结构.png

    最外层是chunckList的数据结构,每个chunkList通过双向链表进行连接,每个节点都是一个chunk,每个chunk是向操作系统申请内存的最小单位16M。chunkList为什么通过双向链表连接起来呢,netty会实时计算chunk的实时分配情况,按照内存使用率归为不同的chunkList,这样进行内存分配时,netty会根据一定的算法定位到合适的chunkList,然后取其中的一个chunk进行内存分配

    abstract class PoolArena<T> implements PoolArenaMetric {
        static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
    
        enum SizeClass {
            Tiny,
            Small,
            Normal
        }
    
        static final int numTinySubpagePools = 512 >>> 4;
    
        final PooledByteBufAllocator parent;
    
        private final int maxOrder;
        final int pageSize;
        final int pageShifts;
        final int chunkSize;
        final int subpageOverflowMask;
        final int numSmallSubpagePools;
        final int directMemoryCacheAlignment;
        final int directMemoryCacheAlignmentMask;
        private final PoolSubpage<T>[] tinySubpagePools;
        private final PoolSubpage<T>[] smallSubpagePools;
    
        private final PoolChunkList<T> q050;
        private final PoolChunkList<T> q025;
        private final PoolChunkList<T> q000;
        private final PoolChunkList<T> qInit;
        private final PoolChunkList<T> q075;
        private final PoolChunkList<T> q100;
    // ...
    }
    
    protected PoolArena(PooledByteBufAllocator parent, int pageSize,
              int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
            this.parent = parent;
            this.pageSize = pageSize;
            this.maxOrder = maxOrder;
            this.pageShifts = pageShifts;
            this.chunkSize = chunkSize;
            directMemoryCacheAlignment = cacheAlignment;
            directMemoryCacheAlignmentMask = cacheAlignment - 1;
            subpageOverflowMask = ~(pageSize - 1);
            tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
            for (int i = 0; i < tinySubpagePools.length; i ++) {
                tinySubpagePools[i] = newSubpagePoolHead(pageSize);
            }
    
            numSmallSubpagePools = pageShifts - 9;
            smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
            for (int i = 0; i < smallSubpagePools.length; i ++) {
                smallSubpagePools[i] = newSubpagePoolHead(pageSize);
            }
    
            // 初始化chunkList
            q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
            q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
            q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
            q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
            q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
            qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
    
            q100.prevList(q075);
            q075.prevList(q050);
            q050.prevList(q025);
            q025.prevList(q000);
            q000.prevList(null);
            qInit.prevList(qInit);
    
            List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
            metrics.add(qInit);
            metrics.add(q000);
            metrics.add(q025);
            metrics.add(q050);
            metrics.add(q075);
            metrics.add(q100);
            chunkListMetrics = Collections.unmodifiableList(metrics);
        }
    
    • Chunk的结构

      Chunk结构.png

      chunk将里边的内存按8k拆分成了page,每个page又拆分为了4个subPage。

    12. page级别的内存分配:allocateNormal()

    我们看PoolArena中allocateNormal代码片段:

            if (normCapacity <= chunkSize) {
                if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { // 从cache中分配
                    // was able to allocate out of the cache so move on
                    return;
                }
                synchronized (this) {
                    allocateNormal(buf, reqCapacity, normCapacity); // 从arena中分配
                    ++allocationsNormal;
                }
            } 
    

    从arena中allocateNormal:

        private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
            if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
                q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
                q075.allocate(buf, reqCapacity, normCapacity)) {
                return;
            }
    
            // Add a new chunk. 创建一个chunk进行内存分配
            PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
            boolean success = c.allocate(buf, reqCapacity, normCapacity);
            assert success;
            qInit.add(c);
        }
    
    • 尝试从现有的chunkList分配内存
      +创建一个chunk进行内存分配
      +初始化ByteBuf

    13. subpage级别的内存分配:allocateTiny()

    • 定位一个Subpage对象
    • 初始化subpage
    • 初始化PooledByteBuf

    通过代码来调试:

    public class TinyAllocate {
        public static void main(String[] args) {
            PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
            allocator.directBuffer(16);
        }
    }
    

    来看allocate的部分代码片段:

                boolean tiny = isTiny(normCapacity);
                if (tiny) { // < 512
                    if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                        // was able to allocate out of the cache so move on
                        return;
                    }
                    tableIdx = tinyIdx(normCapacity);
                    table = tinySubpagePools;
                } else {
                    if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                        // was able to allocate out of the cache so move on
                        return;
                    }
                    tableIdx = smallIdx(normCapacity);
                    table = smallSubpagePools;
                }
                final PoolSubpage<T> head = table[tableIdx];
    
                /**
                 * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
                 * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
                 */
                synchronized (head) {
                    final PoolSubpage<T> s = head.next;
                    if (s != head) { // 默认情况下,头节点是没有任何subpage相关的信息
                        assert s.doNotDestroy && s.elemSize == normCapacity;
                        long handle = s.allocate();
                        assert handle >= 0;
                        s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
                        incTinySmallAllocation(tiny);
                        return;
                    }
                }
                synchronized (this) {
                    allocateNormal(buf, reqCapacity, normCapacity);
                }
    
                incTinySmallAllocation(tiny);
                return;
    

    我们来看tinySubpagePools的结构,默认情况下是和MemoryRegionCache的tiny结构是一样的。
    tiny[32] 0 -> 16B -> 32B -> 48B -> ... 480B

        private long allocateSubpage(int normCapacity) {
            // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
            // This is need as we may add it back and so alter the linked-list structure.
            PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
            int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
            synchronized (head) {
                int id = allocateNode(d);
                if (id < 0) {
                    return id;
                }
    
                final PoolSubpage<T>[] subpages = this.subpages;
                final int pageSize = this.pageSize;
    
                freeBytes -= pageSize;
    
                int subpageIdx = subpageIdx(id);
                PoolSubpage<T> subpage = subpages[subpageIdx];
                if (subpage == null) {
                    subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
                    subpages[subpageIdx] = subpage;
                } else {
                    subpage.init(head, normCapacity);
                }
                return subpage.allocate();
            }
        }
    

    14. ByteBuf的释放

    • 连续的内存区段加到缓存
    • 标记连续的内存区段为未使用
    • ByteBuf加到对象池

    相关文章

      网友评论

          本文标题:Netty源码七 ByteBuf

          本文链接:https://www.haomeiwen.com/subject/ajwfjctx.html