美文网首页
Netty内存分配源码分析

Netty内存分配源码分析

作者: 爱健身的兔子 | 来源:发表于2020-11-03 15:56 被阅读0次

1 Netty内存分配与释放实战

 public static void main(String[] args) {
            //1 分配一个ByteBuf
            PooledByteBufAllocator allocator = new PooledByteBufAllocator(false);
            ByteBuf byteBuf = allocator.heapBuffer(15);

            ByteBuf byteBuf1 = allocator.heapBuffer(9000);

            //2 释放分配的内存
            byteBuf.release();
            byteBuf.release();
        }

2 核心类及成员变量

2.1 PoolArena

Netty的PoolArena是由多个Chunk组成的大块内存区域,而每个Chunk则由一个或者多个Page组成,因此,对内存的组织和管理也就主要集中在如何管理和组织Chunk和Page了。

先说结论:PoolArena通过6个PoolChunkList来管理PoolChunk,而每个PoolChunk由N个PoolSubpage构成,即将PoolChunk的里面底层实现 T memory分成N段,每段就是一个PoolSubpage。当用户申请一个Buf时,使用Arena所拥有的chunk所管辖的page分配内存,内存分配的落地点为 T memory上。

abstract class PoolArena<T> implements PoolArenaMetric {

   enum SizeClass {
        Tiny,
        Small,
        Normal
    }
  // 该参数指定了tinySubpagePools数组的长度,由于tinySubpagePools每一个元素的内存块差值为16,
  // 因而数组长度是512/16,也即这里的512 >>> 4 = 32 [16,32,64...512]
  static final int numTinySubpagePools = 512 >>> 4;
  //表示该PoolArena的allocator
  final PooledByteBufAllocator parent;
  //表示PoolChunk中由Page节点构成的二叉树的最大高度,默认11
  private final int maxOrder;
  //page的大小,默认8K
  final int pageSize;
  // 指定了叶节点大小8KB是2的多少次幂,默认为13,该字段的主要作用是,在计算目标内存属于二叉树的
  // 第几层的时候,可以借助于其内存大小相对于pageShifts的差值,从而快速计算其所在层数
  final int pageShifts;
  //默认16MB
  final int chunkSize;
  // 由于PoolSubpage的大小为8KB=8196,因而该字段的值为
  // -8192=>=> 1111 1111 1111 1111 1110 0000 0000 0000
  // 这样在判断目标内存是否小于8KB时,只需要将目标内存与该数字进行与操作,只要操作结果等于0,
  // 就说明目标内存是小于8KB的,这样就可以判断其是应该首先在tinySubpagePools或smallSubpagePools
  // 中进行内存申请
  final int subpageOverflowMask;
  // 该参数指定了smallSubpagePools数组的长度,默认为4
  final int numSmallSubpagePools;
  //tinySubpagePools用来分配小于512 byte的Page
  private final PoolSubpage<T>[] tinySubpagePools;
  //smallSubpagePools用来分配大于等于512 byte且小于pageSize内存的Page
  private final PoolSubpage<T>[] smallSubpagePools;
  //用来存储用来分配给大于等于pageSize大小内存的PoolChunk
  //存储内存利用率50-100%的chunk
  private final PoolChunkList<T> q050;
  //存储内存利用率25-75%的chunk
  private final PoolChunkList<T> q025;
  //存储内存利用率1-50%的chunk
  private final PoolChunkList<T> q000;
  //存储内存利用率0-25%的chunk
  private final PoolChunkList<T> qInit;
  //存储内存利用率75-100%的chunk
  private final PoolChunkList<T> q075;
  //存储内存利用率100%的chunk
  private final PoolChunkList<T> q100;
    //堆内存(heap buffer)
  static final class HeapArena extends PoolArena<byte[]> {

  }
   //堆外直接内存(direct buffer)
  static final class DirectArena extends PoolArena<ByteBuffer> {

  }

}

2.2 PoolChunk

Chunk主要用来组织和管理多个Page的内存分配和释放。在Netty中,Chunk通过一颗完全二叉树来管理Page。

final class PoolChunk<T> {// PoolChunk会涉及到具体的内存,泛型T表示byte[](堆内存)、或java.nio.ByteBuffer(堆外内存)  

        final PoolArena<T> arena;//表示该PoolChunk所属的PoolArena。  
        final T memory;// 具体用来表示内存;byte[]或java.nio.ByteBuffer。 
        final boolean unpooled;// 是否是可重用的,unpooled=false表示可重用  

        private final byte[] memoryMap;//默认12层的完全二叉树的节点数是4096
        private final byte[] depthMap;
        private final PoolSubpage<T>[] subpages;//表示该PoolChunk所包含的PoolSubpage。也就是PoolChunk连续的可用内存。默认数组长度是2048

        private final int subpageOverflowMask;//同PoolArena中的一样
        private final int pageSize;//每个PoolSubpage的大小,默认为8192个字节(8K)  
        private final int pageShifts;//同PoolArena中的一样
        private final int maxOrder;//同PoolArena中的一样
        private final int chunkSize;//同PoolArena中的一样
        private final int log2ChunkSize;
        private final int maxSubpageAllocs;//完全二叉树最后一层的节点数量 1 << maxOrder (默认2048)

        private final byte unusable;

        private int freeBytes; //当前PoolChunk空闲的内存。 

        PoolChunkList<T> parent;//一个PoolChunk分配后,会根据使用率挂在PoolArena的一个PoolChunkList中
        // PoolChunk本身设计为一个链表结构
        PoolChunk<T> prev;
        PoolChunk<T> next;
}

注意:

poolChunk默认由2048个page组成,一个page默认大小为8k,图中节点的值为在数

组memoryMap的下标。
1、如果需要分配大小8k的内存,则只需要在第11层,找到第一个可用节点即可。
2、如果需要分配大小16k的内存,则只需要在第10层,找到第一个可用节点即可。
3、如果节点1024存在一个已经被分配的子节点2048,则该节点不能被分配,如需要分配大小16k的内存,这个时候节点2048已被分配,节点2049未被分配,就不能直接分配节点1024,因为该节点目前只剩下8k内存。

2.3 PoolSubpage

PoolSubpage用于分配小于pageSize的内存。PoolSubpage这个类的块大小是由第一次申请的内存大小来决定的,且每个块大小相同。PoolSubpage这个类是通过位图法来对管理内存的分配。

 final class PoolSubpage<T> {

        final PoolChunk<T> chunk;//用来表示该Page属于哪个Chunk
        private final int memoryMapIdx;//用来表示该Page在Chunk.memoryMap中的索引

        private final int runOffset;// 当前Page在chunk.memoryMap的偏移量
        private final int pageSize;//Page的大小,默认为8192
        /*
        long 类型的数组bitmap用来表示Page中存储区域的使用状态,
        数组中每个long的每一位表示一个块存储区域的占用情况:0表示未占用,1表示占用。
        例如:对于一个4K的Page来说如果这个Page用来分配1K的存储区块,
        那么long数组中就只有一个long类型的元素且这个数值的低4危用来指示4个存储区域的占用情况。
        */
        private final long[] bitmap;
        //PoolSubpage本身设计为一个链表结构
        PoolSubpage<T> prev;
        PoolSubpage<T> next;

        boolean doNotDestroy;
        int elemSize;//块的大小
        private int maxNumElems;//page按elemSize大小分得的块个数
        private int bitmapLength;//bitmap数组实际会用到的长度,等于pageSize/elemSize/64
        // 下一个可用的位置
        private int nextAvail;
        // 可用的段数量
        private int numAvail;

 } 

2.4 PoolThreadCache

Netty为了高效分配内存,采用了PoolThreadCacheByteBuf使用完回收时将使用完的ByteBuf记录到PoolThreadCache中,下次在需要分配同大小(同范围)的ByteBuf可直接进行分配,准确的说PoolThreadCache缓存的并不是ByteBuf,而是待释放ByteBuf的chunk、自己的起始位置、最大可访问长度等信息。

final class PoolThreadCache{
//PoolThreadCache
//因为PoolThreadCache主要持有那些需要被释放的ByteBuf,但是不会
//进行ByteBuf的分配工作,所以这里的heapArena和directArena主要用来
//进行相关缓存的计数。
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;

//下面则分别为堆内存、直接内存对应的tiny、small和normal的缓存
//PoolThreadCache采用三个数组保存了每种尺寸的ByteBuf,
//因为每种类型的尺寸都是返回值,比如tiny是size<512,所以
//每种类型的尺寸由根据大小计算下标组成了其对应的数组
//数据中的每个元素为MemoryRegionCache类型,MemoryRegionCache
//其实是一个queue,比如Netty中最小的ByteBuf为16,则tiny缓存数组
//第一个元素MemoryRegionCache队列就保存所有被回收的大小为16的
//ByteBuf信息
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
}

2.5 MemoryRegionCache

MemoryRegionCache是同种大小ByteBuf(注意这里不是真正的ByteBuf,只是回收的ByteBuf的相关信息)的队列,根据为tiny/small或者normal尺寸,有SubPageMemoryRegionCacheNormalMemoryRegionCache两种实现,其中NormalMemoryRegionCache对应nomal的ByteBufSubPageMemoryRegionCache则对应tiny和small。

private abstract static class MemoryRegionCach}e<T> {

//MemoryRegionCache
//该大小的ByteBuf队列容量
private final int size;
//队列
private final Queue<Entry<T>> queue;
/*枚举,标识该队列的尺寸类型enum SizeClass {    Tiny,    Small,    Normal}*/
private final SizeClass sizeClass;

}

3 内存分配流程

/***********************PooledByteBufAllocator************************/
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {

     //获取PoolThreadCache ,
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;

    final ByteBuf buf;
    if (directArena != null) {
        //进行分配
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        //非池化分配
        buf = PlatformDependent.hasUnsafe() ?
                UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }

    return toLeakAwareBuffer(buf);
}

/******************************PoolArena**************************/

PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
    //新建一个ByteBuf对象,这里使用的对象池技术
    PooledByteBuf<T> buf = newByteBuf(maxCapacity);
    //进行分配
    allocate(cache, buf, reqCapacity);
    return buf;
}

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
  // 将需要申请的容量格式为 2^N
  final int normCapacity = normalizeCapacity(reqCapacity);
  // 判断目标容量是否小于8KB,小于8KB则使用tiny或small的方式申请内存
  if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
    int tableIdx;
    PoolSubpage<T>[] table;
    boolean tiny = isTiny(normCapacity);
    // 判断目标容量是否小于512字节,小于512字节的为tiny类型的
    if (tiny) { // < 512
      // 将分配区域转移到 tinySubpagePools 中
      if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
        //能从缓存中获取就直接
        return;
      }
      // 如果无法从当前线程缓存中申请到内存,则尝试从tinySubpagePools中申请,这里tinyIdx()方法
      // 就是计算目标内存是在tinySubpagePools数组中的第几号元素中的
      tableIdx = tinyIdx(normCapacity);
      table = tinySubpagePools;
    } else {
      // 如果目标内存在512byte~8KB之间,则尝试从smallSubpagePools中申请内存。这里首先从
      // 当前线程的缓存中申请small级别的内存,如果申请到了,则直接返回
      if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
        // was able to allocate out of the cache so move on
        return;
      }
      tableIdx = smallIdx(normCapacity);
      table = smallSubpagePools;
    }
        // 获取目标元素的头结点
    final PoolSubpage<T> head = table[tableIdx];

    // 这里需要注意的是,由于对head进行了加锁,而在同步代码块中判断了s != head,
    // 也就是说PoolSubpage链表中是存在未使用的PoolSubpage的,因为如果该节点已经用完了,
    // 其是会被移除当前链表的。也就是说只要s != head,那么这里的allocate()方法
    // 就一定能够申请到所需要的内存块
    synchronized (head) {
      // s != head就证明当前PoolSubpage链表中存在可用的PoolSubpage,并且一定能够申请到内存,
      // 因为已经耗尽的PoolSubpage是会从链表中移除的
      final PoolSubpage<T> s = head.next;
      // 如果此时 subpage 已经被分配过内存了执行下文,如果只是初始化过,则跳过该分支
      if (s != head) {
        // 从PoolSubpage中申请内存
        assert s.doNotDestroy && s.elemSize == normCapacity;
        // 通过申请的内存对ByteBuf进行初始化
        long handle = s.allocate();
        assert handle >= 0;
        // 初始化 PoolByteBuf 说明其位置被分配到该区域,但此时尚未分配内存
        s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
        // 对tiny类型的申请数进行更新
        incTinySmallAllocation(tiny);
        return;
      }
    }
    synchronized (this) {
         allocateNormal(buf, reqCapacity, normCapacity);
     }
    incTinySmallAllocation(tiny);
    return;
  }
   // 走到这里说明目标内存是大于8KB的,那么就判断目标内存是否大于16M,如果大于16M,
  // 则不使用内存池对其进行管理,如果小于16M,则到PoolChunkList中进行内存申请
  if (normCapacity <= chunkSize) {
    // 小于16M,首先到当前线程的缓存中申请,如果申请到了则直接返回,如果没有申请到,
    // 则到PoolChunkList中进行申请
    if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
      // was able to allocate out of the cache so move on
      return;
    }
    synchronized (this) {
       allocateNormal(buf, reqCapacity, normCapacity);
       ++allocationsNormal;
    }
  } else {
    // 对于大于16M的内存,Netty不会对其进行维护,而是直接申请,然后返回给用户使用
    allocateHuge(buf, reqCapacity);
  }
}

//先在chunklist里面分配
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
        if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
            q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
            q075.allocate(buf, reqCapacity, normCapacity)) {
            return;
        }
        //创建一个PoolChunk对象
        PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
        //内存分配
        boolean success = c.allocate(buf, reqCapacity, normCapacity);
        assert success;
        qInit.add(c);
    }
/******************************PoolChunk********************************/

boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
        final long handle;
        //这里在完全二叉树中通过伙伴算法查找待分配的
        if ((normCapacity & subpageOverflowMask) != 0) { 
            handle =  allocateRun(normCapacity);//分配大于pageSize的内存
        } else {
            handle = allocateSubpage(normCapacity);//分配小于pageSize的内存
        }

        if (handle < 0) {
            return false;
        }
        ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null;
        initBuf(buf, nioBuffer, handle, reqCapacity);//初始化PooledByteBuf.
        return true;
    }

4 内存释放流程

/***************************PoolArena***********************************/
public void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity,
     PoolThreadCache cache) {
  // 如果是非池化的,则直接销毁目标内存块,并且更新相关的数据
  if (chunk.unpooled) {
    int size = chunk.chunkSize();
    destroyChunk(chunk);
    activeBytesHuge.add(-size);
    deallocationsHuge.increment();
  } else {
    // 如果是池化的,首先判断其是哪种类型的,即tiny,small或者normal,
    // 然后将其交由当前线程的缓存进行处理,如果添加成功,则直接返回
    SizeClass sizeClass = sizeClass(normCapacity);
    if (cache != null && cache.add(this, chunk, nioBuffer, handle,
          normCapacity, sizeClass)) {
      return;
    }

    // 如果当前线程的缓存已满,则将目标内存块返还给公共内存块进行处理
    freeChunk(chunk, handle, sizeClass, nioBuffer);
  }
}

/***********************PoolThreadCache******************************/

@SuppressWarnings({ "unchecked", "rawtypes" })
boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
    //根据回收的ByteBuf大小,从指定数组中获取对应队列下标处的
    //MemoryRegionCache
    MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
    //如果为空,表示没有启用缓存,直接返回
    if (cache == null) {
        return false;
    }
    //如果启用缓存,则加入缓存中
    return cache.add(chunk, handle);
}

//根据ByteBuf大小,从指定数组中获取队列,下标计算上面已经介绍过
//这里不再展开介绍
private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
    switch (sizeClass) {
    case Normal:
        return cacheForNormal(area, normCapacity);
    case Small:
        return cacheForSmall(area, normCapacity);
    case Tiny:
        return cacheForTiny(area, normCapacity);
    default:
        throw new Error();
    }
}
/***********************MemoryRegionCache******************************/

//将元素加入到MemoryRegionCache队列中
@SuppressWarnings("unchecked")
public final boolean add(PoolChunk<T> chunk, long handle) {
    Entry<T> entry = newEntry(chunk, handle);
    //这里offer考虑的队列的尺寸,可见第二节的介绍
    boolean queued = queue.offer(entry);
    //超出队列最大尺寸,则回收entry
    if (!queued) {
        // If it was not possible to cache the chunk, immediately recycle the entry
        entry.recycle();
    }

    return queued;
}

Netty源码分析:PoolArena_wojiushimogui的博客-CSDN博客

Netty源码-PoolThreadCache - 简书

深入浅出Netty内存管理 PoolChunk - 简书

相关文章

网友评论

      本文标题:Netty内存分配源码分析

      本文链接:https://www.haomeiwen.com/subject/acxivktx.html