美文网首页
PooledByteBufAllocator分配内存源码分析

PooledByteBufAllocator分配内存源码分析

作者: 圣村的希望 | 来源:发表于2019-10-18 21:13 被阅读0次

    PooledByteBufAllocator内存分配使用实例:

    public static void main(String[] args) {
     int elemSize = 1024 * 2;
     //PooledByteBufAllocator中默认会创建一个池化内存分配器Default
     PooledByteBufAllocator pooledByteBufAllocator = PooledByteBufAllocator.DEFAULT;
     for (int i=0; i<10; i++) {
     ByteBuf byteBuf = pooledByteBufAllocator.directBuffer(elemSize);
     System.out.println(JSON.toJSONString(byteBuf));
     }
    }
    

    这里采用PooledByteBufAllocator池化内存分配器分配PooledDirectByteBuffer对象作为实例进行分析

    public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
    
     //...
     @Override
     protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
     //获取当前线程的本地变量PoolThreadCache,PoolThreadLocalCache中获取
     PoolThreadCache cache = threadCache.get();
     //获取到cache中的directArena,通过directArena进行内存分配
     PoolArena<ByteBuffer> directArena = cache.directArena;
    ​
     final ByteBuf buf;
     if (directArena != null) {
     //在directArena中分配缓冲区,initialCapacity为申请大小,maxCapacity为Integer.MAX_VALUE
     buf = directArena.allocate(cache, initialCapacity, maxCapacity);
     } else {
     buf = PlatformDependent.hasUnsafe() ?
     UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
     new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
     }
    ​
     return toLeakAwareBuffer(buf);
     }
    ​
     //...
    ​
    }
    ​
    abstract class PoolArena<T> implements PoolArenaMetric {
     //池化缓冲区分配
     PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
     //常见PooledByteBuf对象
     PooledByteBuf<T> buf = newByteBuf(maxCapacity);
     allocate(cache, buf, reqCapacity);
     return buf;
     }
    
     @Override
     protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
     if (HAS_UNSAFE) {
     return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
     } else {
     return PooledDirectByteBuf.newInstance(maxCapacity);
     }
     }
    }
    ​
    ​
    final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> {
     //创建PooledByteBuf对象
     static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
     //从缓存池中获取PooledUnsafeDirectByteBuf缓冲区对象
     PooledUnsafeDirectByteBuf buf = RECYCLER.get();
     //重置缓冲区对象
     buf.reuse(maxCapacity);
     return buf;
     }
     重置缓冲区对象
     final void reuse(int maxCapacity) {
     //设置缓冲区最大值Integer.MAX_VALUE
     maxCapacity(maxCapacity);
     //重置ByteBuf引用计数为0,ByteBuf采用引用计数的方式进行内存回收
     resetRefCnt();
     //设置ByteBuf索引为0,readerIndex = writerIndex = 0
     setIndex0(0, 0);
     //读和写的标记值为0,markedReaderIndex = markedWriterIndex = 0
     discardMarks();
     }
    }
    
    • workThread获取当前线程的PoolThreadLocal缓存,缓存池对象为空会进行initial一个

    • 然后PooledByteBufAllocator将分配缓冲区的任务交给了DirectPoolArena进行分配

    • DirectPoolArena先创建一个初始的PooledByteBuf对象,读写索引都为初始0,PooledByteBuf对象是从对象池中进行获取,池中没有会进行创建,这里使用到了Recycle对象池化,后续会进行分析

    • 然后调用allocate方法进行缓冲区的分配

    abstract class PoolArena<T> implements PoolArenaMetric {
    ​
     private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
     //按照缓冲区初始申请大小进行内存规整
     final int normCapacity = normalizeCapacity(reqCapacity);
     //缓冲区大小规格是否为tiny或small,小于pageSize(8192)即是
     if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
     int tableIdx;
     PoolSubpage<T>[] table;
     //判断缓冲区大小是否为tiny规格,小于512
     boolean tiny = isTiny(normCapacity);
     if (tiny) { // < 512
     //从PoolThreadCache中进行缓存分配tiny规格缓冲区
     if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
     // was able to allocate out of the cache so move on
     return;
     }
     //确定在tinySubpagePools数组下标,normCapacity / 16
     tableIdx = tinyIdx(normCapacity);
     table = tinySubpagePools;
     } else {
     //从PoolThreadCache中进行缓存分配small规格缓冲区
     if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
     // was able to allocate out of the cache so move on
     return;
     }
     //确定在smallSubpagePools数组下标
     tableIdx = smallIdx(normCapacity);
     table = smallSubpagePools;
     }
     //获取tinySubpagePool或smallSubpagePool中对应下标内存池队列
     final PoolSubpage<T> head = table[tableIdx];
    ​
     /**
     * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
     * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
     */
     synchronized (head) {
     //在PoolSubpage中进行缓冲区分配
     final PoolSubpage<T> s = head.next;
     //第一次PoolSubpage是空的
     if (s != head) {
     assert s.doNotDestroy && s.elemSize == normCapacity;
     long handle = s.allocate();
     assert handle >= 0;
     s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
     incTinySmallAllocation(tiny);
     return;
     }
     }
     //在chunk寻找subpage进行分配
     synchronized (this) {
     allocateNormal(buf, reqCapacity, normCapacity);
     }
    ​
     incTinySmallAllocation(tiny);
     return;
     }
     //normCapacity大于pageSize小于chunkSize直接在chunk中进行分配
     if (normCapacity <= chunkSize) {
     if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
     // was able to allocate out of the cache so move on
     return;
     }
     synchronized (this) {
     allocateNormal(buf, reqCapacity, normCapacity);
     ++allocationsNormal;
     }
     } else {
     //申请内存大小大于chunk也即huge规格内存直接创建分配
     allocateHuge(buf, reqCapacity);
     }
     }
    
    }
    
    PoolArena内存组织示意图
    • 对申请的内存大小进行规整,tiny类型按16的倍数进行规整,small和normal类型规整为大于申请内存的最接近2的指数次幂的值

    • 根据规整后的内存大小来确认是在tinySubpagePool或者smallSubpagePools或者PoolChunk中分配,还是直接堆或对外内存分配。

    • 如果内存规格为tiny或small,确定其在tinySubpagePools或smallSubpagePools中的下标,tinySubpagePools中是normCapacity / 16,smallCapacity中是normCapacity / 1024后取2的对数。如果内存规格是normal,从chunk中进行分配内存

    • 如果tinySubpagePools或smallSubpagePools中都为空,也直接从chunk中进行分配

      为什么tinySubpagePools和smallSubpagePools数组大小分别为32和4?

    • tinySubpagePools用于分配小于512字节的内存,并且在内存规整的时候是按16的倍数进行规整,[16,512)一共有32个不同的值,所以这里数组的长度就是32

    • smallSubpagePools用于分配大于512字节小于8192字节的内存,并且在内存规整的时候,他是按大于normCapacity并且最接近2的指数次幂的值,(0,1024],(1024,2048],(2048,4096],(4096,8192],所以smallSubpagePools数组大小为4

    从chunk中分配内存allocateNormal

    private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
     //从不同使用率的chunkList中分配内存,先从使用率在50%的chunk中分配
     if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
     q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
     q075.allocate(buf, reqCapacity, normCapacity)) {
     return;
     }
     //如果chunkList为空或者chunkList中没有合适内存进行分配,直接创建chunk进行内存分配
     PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
     //从chunk中进行内存分配
     boolean success = c.allocate(buf, reqCapacity, normCapacity);
     assert success;
     qInit.add(c);
    }
    
    • 在PoolArena中的ChunkList中找到合适的内存大小分配,优先从使用率为50%的chunk中进行分配

    • 如果没有找到合适的chunk,则直接申请创建一个chunk进行内存分配

    • 然后将新创建的chunk添加到qInit的chunkList中

      为什么这里优先从q050中进行分配内存?

    q050的内存使用率在50%~100%之间,q075的分配成功率不高,q000会导致内存使用率太低,优先使用他们不利于内存的高效利用和回收,优先分配q050能提高内存的使用率。

    相关文章

      网友评论

          本文标题:PooledByteBufAllocator分配内存源码分析

          本文链接:https://www.haomeiwen.com/subject/ljswmctx.html