概述
通过前面两节的铺垫,对netty内存池结构有了大概了解,本节详细分析下具体的内存分配过程(其中涉及缓存MemoryRegionCache和对象池RECYCLER的部分先忽略,后面单独分析)
1. PooledByteBufAllocator入口
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
//调用directArena allocate
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
入口为通过PooledByteBufAllocator创建byteBuf,前面章节已经分析过,这里不再分析
2. PoolArena.allocate
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
//1. 模板方法创建ByteBuf对象
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
//2. 基于创建的ByteBuf进行分配
allocate(cache, buf, reqCapacity);
return buf;
}
2.1. 模板方法创建ByteBuf对象
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
if (HAS_UNSAFE) {
return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
} else {
return PooledDirectByteBuf.newInstance(maxCapacity);
}
}
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
//从对象池中获取进行复用
PooledUnsafeDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
主要是通过RECYCLER对象池对ByteBuf进行复用,没有可以复用的对象会创建新的ByteBuf并放到RECYCLER中(关于RECYCLER后续详细分析)
2.2. 基于创建的ByteBuf进行分配
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
//1. 规范申请内存
final int normCapacity = normalizeCapacity(reqCapacity);
//是否tiny或small规格,判断capacity < pageSize
if (isTinyOrSmall(normCapacity)) { // normCapacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
//判断normCapacity < 512
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
//从缓存中分配成功直接返回,否则继续去分配
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
//获取tinySubpagePools中下标即多大规格的tiny,即分配容量 / 16B
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
//从缓存中分配成功直接返回,否则继续去分配
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
//获取smallSubpagePools中下标即多大规格的small
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
//获取到对应规格tiny或small链表头节点head
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
//有两个地方可以操作链表(PoolChunk#allocateSubpage(int)和PoolChunk#free(long))即head节点可能被多个线程同时修改,需要进行同步操作
synchronized (head) {
final PoolSubpage<T> s = head.next;
//不是头结点说明存在相同规格且有空闲内存的PoolSubpage在链表上可以被分配,不需要新创建page
//直接拿出来分配,头结点初始化的时候next和prev指向自己
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
//4. 从poolSubpage上分配
long handle = s.allocate();
assert handle >= 0;
//5. 分配后重新初始化当前chunk
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
//tiny或small已分配个数加一
incTinySmallAllocation(tiny);
return;
}
}
//可能会多线程同时分配,ps没有整个方法同步,减小锁范围
synchronized (this) {
//2. 普通分配(先分配page)
allocateNormal(buf, reqCapacity, normCapacity, cache);
}
//tiny或small已分配个数加一
incTinySmallAllocation(tiny);
return;
}
//大于pagesize小于chunksize,直接以page为单位进行分配
if (normCapacity <= chunkSize) {
//通过缓存分配
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
//2. 普通分配(先分配page)
allocateNormal(buf, reqCapacity, normCapacity, cache);
//normal已分配个数加一
++allocationsNormal;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
//3. huge的分配不会通过缓存进行,只需调用allocateHuge
allocateHuge(buf, reqCapacity);
}
}
大致流程:
-
normalizeCapacity
获取规范化后的内存大小,向上取最近的跟tinySubpagePools或smallSubpagePools或pagesize整数倍相匹配的大小(例如:1 > 16,17 > 32,1.5K > 2K,9K > 16K) -
根据不同normCapacity采用不同分配策略:
-
normCapacity =< pageSize
:获取到对应规格tiny或small链表头节点head,如果存在相同规格且有空闲内存的PoolSubpage在链表上可以被分配,不需要新创建page,直接在该page上分配long handle = s.allocate();
,分配后重新初始化当前chunks.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
;如果不存在则进行常规分配allocateNormal(buf, reqCapacity, normCapacity, cache);
-
normCapacity =< chunkSize
:直接进行常规分配allocateNormal(buf, reqCapacity, normCapacity, cache);
-
normCapacity > chunkSize
:huge的分配不通过缓存进行,直接分配unpooled内存allocateHuge(buf, reqCapacity);
-
先来分析allocateNormal
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity, PoolThreadCache threadCache) {
//1. 尝试在现有的chunk上分配
if (q050.allocate(buf, reqCapacity, normCapacity, threadCache) ||
q025.allocate(buf, reqCapacity, normCapacity, threadCache) ||
q000.allocate(buf, reqCapacity, normCapacity, threadCache) ||
qInit.allocate(buf, reqCapacity, normCapacity, threadCache) ||
q075.allocate(buf, reqCapacity, normCapacity, threadCache)) {
return;
}
//2. 创建一个chunk 8K 11 13 8K
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
//3. 在创建的chunk上分配
boolean success = c.allocate(buf, reqCapacity, normCapacity, threadCache);
assert success;
//4. 新chunk添加到chunkList上
qInit.add(c);
}
- 按照使用率从小到大顺序尝试在PoolChunkList上现有的chunk上分配,分配成功直接返回
- 现有chunk上分配失败,创建一个新的chunk
- 在新创建的chunk上分配
- 新chunk添加到chunkList上
3. PoolChunkList.allocate
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity, PoolThreadCache threadCache) {
//maxCapacity:该PoolChunkList上chunk最大可用容量
if (normCapacity > maxCapacity) {
// Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can
// be handled by the PoolChunks that are contained in this PoolChunkList.
return false;
}
//遍历PoolChunk
for (PoolChunk<T> cur = head; cur != null; cur = cur.next) {
//不为空就尝试分配,分配失败接着尝试在下一个chunk中分配
if (cur.allocate(buf, reqCapacity, normCapacity, threadCache)) {
//分配成功并且空闲率小于最小空闲率,则从当前链表移除,放到nextList中
if (cur.freeBytes <= freeMinThreshold) {
remove(cur);
nextList.add(cur);
}
return true;
}
}
return false;
}
- 遍历当前PoolChunkList上节点PoolChunk
- 尝试在chunk上进行分配
cur.allocate(buf, reqCapacity, normCapacity, threadCache)
,分配成功直接return true; - 分配成功,如果当前chunk空闲率小于最小空闲率,则从当前链表移除,放到nextList(使用率更高的list)中
4. PoolChunk.allocate
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity, PoolThreadCache threadCache) {
//一个唯一的值,根据叶子节点id,位图索引生成
final long handle;
// normCapacity >= pageSize
if ((normCapacity & subpageOverflowMask) != 0) {
//以pageSize为单位进行分配
handle = allocateRun(normCapacity);
} else {
//以siny或small为单位进行分配
handle = allocateSubpage(normCapacity);
}
if (handle < 0) {
return false;
}
ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null;
//初始化PooledByteBuf
initBuf(buf, nioBuffer, handle, reqCapacity, threadCache);
return true;
}
- normCapacity >= pageSize以页为单位进行分配
handle = allocateRun(normCapacity);
返回分配页的索引 - normCapacity < pageSize以siny或small为单位进行分配
handle = allocateSubpage(normCapacity);
,先分配page然后在page上分配tiny或small,返回叶子节点id和位图索引生成的为一值 - 初始化PooledByteBuf
initBuf(buf, nioBuffer, handle, reqCapacity, threadCache);
private long allocateRun(int normCapacity) {
//确定需要在二叉树的 d 层开始节点匹配 pageShifts默认13
int d = maxOrder - (log2(normCapacity) - pageShifts);
//在二叉树中进行节点匹配
int id = allocateNode(d);
if (id < 0) {
return id;
}
freeBytes -= runLength(id);
return id;
}
- 计算需要在二叉树哪一层进行节点分配,这里的容量都是pageSize及以上的,log2(normCapacity) - pageShifts 表示容量是页大小的2的多少倍,最大深度索引再减去这个,刚好是定位到页大小倍数的深度索引。例如:normCapacity=8192*2即两页大小2的14次方,则d = 10,即在第10层分配
- 在二叉树d层中进行节点匹配
int id = allocateNode(d);
,返回分配节点索引
private int allocateNode(int d) {
//从内存映射索引为1的开始即根节点开始
int id = 1;
//用于比较id所在深度索引是否小于d
int initial = - (1 << d);
//获取id内存映射的深度值,memoryMap[id],0或1或12
byte val = value(id);
//树节点完全分配val=12, 部分分配val=1,未分配val=0
if (val > d) {
return -1;
}
//深度优先遍历完所有深度索引小于d的可用的子结点,只有到id的深度索引是d的时候才会结束,
//先找出对应的深度,然后从左到右看是否有内存可分配。
while (val < d || (id & initial) == 0) {
//左节点
id <<= 1;
//左节点对应深度
val = value(id);
//如果大于深度索引 即左节点不能用了
if (val > d) {
//异或位运算,获取右结点
id ^= 1;
//再取出对应深度索引值
val = value(id);
}
}
byte value = value(id);
assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
value, id & initial, d);
//memoryMap[id] = 12,即设置为不可用
setValue(id, unusable);
//更新父节点值
updateParentsAlloc(id);
//返回内存映射索引
return id;
}
- 深度优先遍历所有深度索引小于d的可用的子结点,只有id的深度索引是d的时候才会结束,找出对应的深度,然后判断左节点是否可用,不可用的话再取右节点,最终找到分配节点的索引即id
- 更新id节点树深度为12即标记为不可用
- 遍历父节点,更新父节点深度值为左右子节点中较小的那个
private void updateParentsAlloc(int id) {
//从id开始直到跟节点
while (id > 1) {
//获取父节点
int parentId = id >>> 1;
//获id节点的深度索引值
byte val1 = value(id);
//获取另一个节点的深度索引值,即是左节点就获取右节点,是右节点就获取左节点
byte val2 = value(id ^ 1);
//取最小的
byte val = val1 < val2 ? val1 : val2;
//设置父节点的深度索引值为子节点最小的那一个
setValue(parentId, val);
//继续遍历父节点
id = parentId;
}
}
- 遍历从id节点开始直到根节点
- 设置父节点的深度索引值为左右子节点较小的那一个
private long allocateSubpage(int normCapacity) {
//找到当前规格内存normCapacity在subpagePools数组中索引,进而获取该索引内head节点
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
//subpage只能分配在叶子节点
int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
synchronized (head) {
//结点分配
int id = allocateNode(d);
if (id < 0) {
return id;
}
//this.subpages; 初始化PoolChunk时创建得一个大小为2048得数组,分别对应二叉树中2048个叶子节点
final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize;
freeBytes -= pageSize;
//id节点(在二叉树中节点)获取到subpages中索引
int subpageIdx = subpageIdx(id);
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
//创建一个subpage并放到subpages中
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
subpages[subpageIdx] = subpage;
} else {
//什么时候subpage不为null???
subpage.init(head, normCapacity);
}
//在page上进行分配
return subpage.allocate();
}
}
-
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
找到当前规格内存normCapacity在subpagePools数组中索引,进而获取该索引内head节点。例如:normCapacity=32属于tiny规格,32 / 16 = 2,即取tinySubpagePools[2]中head节点 -
int id = allocateNode(d);
进行节点分配,上面已经分析过 - 获取PoolChunk中subpages数组中subpage(一般情况下为null),创建一个subpage并放到subpages中
-
subpage.allocate();
在subpage上进行分配
5. PoolSubpage.allocate
long allocate() {
if (elemSize == 0) {
return toHandle(0);
}
//numAvail:可用段数量,doNotDestroy:未被销毁,free时改为true
if (numAvail == 0 || !doNotDestroy) {
return -1;
}
//找到当前page中可分配内存段的bitmapIdx
//bitmapIdx高26位表示在bitmap数组中位置索引,低6位表示64位long类型内位数索引(2^6 = 64)
final int bitmapIdx = getNextAvail();
//bitmap数组下标
int q = bitmapIdx >>> 6;
//long数种哪一位需要修改为1
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) == 0;
//将对应位置的q设置为1
bitmap[q] |= 1L << r;
//可用段为0后从链表中删除
if (-- numAvail == 0) {
removeFromPool();
}
//把两个int(32位)的索引 bitmapIdx 和 memoryMapIdx,合并成一个long类型
return toHandle(bitmapIdx);
}
-
final int bitmapIdx = getNextAvail();
找到当前page中可分配内存段的bitmapIdx -
bitmap[q] |= 1L << r;
通过bitmapIdx定位到具体分配了哪个段并把代表该段的索引置为1表示不可用,bitmapIdx 32位int类型高26位表示在bitmap数组中位置索引,低6位表示64位long类型内位数索引(2^6 = 64); -
removeFromPool();
可用段为0后从链表中删除该page -
toHandle(bitmapIdx);
把两个int(32位)的索引 bitmapIdx 和 memoryMapIdx,合并成一个long类型返回
private int getNextAvail() {
//首次获取时nextAvail为0置为 -1,直接返回0,不需要通过findNextAvail()寻找
//以后每次都需要通过findNextAvail()寻找???就为了少执行一次findNextAvail()吗?
int nextAvail = this.nextAvail;
if (nextAvail >= 0) {
this.nextAvail = -1;
return nextAvail;
}
return findNextAvail();
}
主要通过判断nextAvail决定是否需要通过findNextAvail();
进行寻找,其实就是只有第一次执行时可以直接返回,后面每一次都要find,从这也可以看出netty代码对性能的最求简直丧心病狂
private int findNextAvail() {
//bitmap数组8个long类型节点
final long[] bitmap = this.bitmap;
//bitmapLength表示bitmap可用长度,例如16B时需要8个long,bitmapLength=8;32B时只需要4个long,bitmapLength=4
final int bitmapLength = this.bitmapLength;
for (int i = 0; i < bitmapLength; i ++) {
//bits: bitmap数组i位置的long类型数
long bits = bitmap[i];
//bits 不是每一位都全是1,表示还有为0的可用段
if (~bits != 0) {
return findNextAvail0(i, bits);
}
}
return -1;
}
-
bitmap
: 8个long类型组成的位图数组;bitmapLength
:bitmap的有效长度,不一定是8,例如16B时需要8个long,bitmapLength=8;32B时只需要4个long,bitmapLength=4 - 遍历bitmap中每一个long类型,判断long是不是全1,不是的话就说明该long类型表示的位图中还有未分配的段
- 有可用段时通过
findNextAvail0(i, bits);
找到索引直接返回
//该page包含得段数量
final int maxNumElems = this.maxNumElems;
//要分配的起始索引,根据第i个位图,如果是0表示第一个long类型表示即0-63进行分配 1表示第二个long类型表示64-127分配
//例如:i = 00000000000000000000000000000000; baseVal = 00000000000000000000000000000000 十进制:0
// i = 00000000000000000000000000000001; baseVal = 00000000000000000000000001000000 十进制:64
// i = 00000000000000000000000000000010; baseVal = 00000000000000000000000010000000 十进制:128
final int baseVal = i << 6;
for (int j = 0; j < 64; j ++) {
//取出最低位,为0表示可用
if ((bits & 1) == 0) {
//相当于baseVal + j, baseVal相当于由于bitmap中索引跳过的值,加上位置序号j后的新的索引值
//相当于baseVal是十位,j是个位的意思
int val = baseVal | j;
if (val < maxNumElems) {
return val;
} else {
break;
}
}
//位图右移,即从低位往高位
bits >>>= 1;
}
return -1;
}
-
maxNumElems
: page包含的段数量(pagesize/normalizeCapacity); -
final int baseVal = i << 6;
:i是在位图数组中索引,左移6位相当于乘以64,可以理解为64进制的一位 - 从低位到高位遍历bits(
bits >>>= 1;
)的每一位,不为0表示该位不可用,继续查看下一位 - 为0表示可用,
int val = baseVal | j;
相当于baseVal + j
,表示具体的段索引,小于maxNumElems
直接返回,否则返回-1表示未找到可用段 - 举例说明:一个段大小为16B的page
5.1 第1次分配16B大小- maxNumElems包含段数量为8192 / 16 = 512
- 因为是第1次分配,位图数组中所有long都是0,因此 i = 0,baseVal = 0
- 从低位到高位遍历第1个long类型的每一位,最低位就是0表示可用,val = 0 + 0 < 512,直接返回0
5.2 第65次分配16B大小 - maxNumElems包含段数量为8192 / 16 = 512
- 1个long类型表示64位,因此bitmap中第一个long已经全部位数为1表示不可用,开始使用第2个long类型,因此 i = 1,baseVal = 64
- 从低位到高位遍历第2个long类型的每一位,最低位就是0表示可用,val = 64 + 0 < 512,直接返回64
private long toHandle(int bitmapIdx) {
//把两个int(32位)的索引 bitmapIdx 和 memoryMapIdx,合并成一个long类型
//加 0x4000000000000000L 是为了让高32位不全是0
return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
}
可以想一下如何把两个不同类型索引或两个int类型值同时返回?netty这里把两个int(32位)的索引 bitmapIdx 和 memoryMapIdx,合并成一个long类型,加 0x4000000000000000L 是为了让高32位不全是0,后面initBuf会用到
PoolSubpage.allocate分析完了,主流程中还剩下一个PoolChunk.allocate中的initBuf方法,下面接着分析
void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity, PoolThreadCache threadCache) {
//(int) handle long类型直接强转成int类型,高32位被丢弃,返回低32位即memoryMapIdx
int memoryMapIdx = memoryMapIdx(handle);
//(int) (handle >>> Integer.SIZE); 又移32位,返回高32位即bitmapIdx
int bitmapIdx = bitmapIdx(handle);
//allocateRun以page为单位进行分配,bitmapIdx为0;
if (bitmapIdx == 0) {
byte val = value(memoryMapIdx);
assert val == unusable : String.valueOf(val);
buf.init(this, nioBuffer, handle, runOffset(memoryMapIdx) + offset,
reqCapacity, runLength(memoryMapIdx), threadCache);
} else {
initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx, reqCapacity, threadCache);
}
}
-
memoryMapIdx(handle);
:long类型直接强转成int类型,高32位被丢弃,返回低32位即memoryMapIdx -
bitmapIdx(handle);
:右移32位,返回高32位即bitmapIdx - allocateRun以page为单位进行分配,bitmapIdx为0,内存偏移量只精确到page级别
- 重点看下
runOffset(memoryMapIdx) + offset
其中runOffset(memoryMapIdx)表示page在chunk中偏移量pagesize的倍数,offset表示缓存行偏移量,通常为0 - allocateSubpage时高32位也可能为0即第一次分配时页上bitmapIdx就是为0,为了放置跟allocateRun混淆,前面在计算handle时加了个
0x4000000000000000L
。在subpage上再分配段,内存偏移量只精确到page中的段
private void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer,
long handle, int bitmapIdx, int reqCapacity, PoolThreadCache threadCache) {
assert bitmapIdx != 0;
int memoryMapIdx = memoryMapIdx(handle);
//拿到具体的subpage
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage.doNotDestroy;
assert reqCapacity <= subpage.elemSize;
//初始化PooledByteBuf
//0x3FFFFFFF跟0x4000000000000000L对应,相当于还原真是的bitmapIdx
buf.init(
this, nioBuffer, handle,
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
reqCapacity, subpage.elemSize, threadCache);
}
-
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
subpages数组中拿到具体的subpage -
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset
分配的段相对整个chunk的偏移量,相比上面以页为单位进行分配增加了页内偏移量(bitmapIdx & 0x3FFFFFFF) * subpage.elemSize
,其中0x3FFFFFFF跟之前0x4000000000000000L对应,这里目的时解出原始的bitmapIdx
private void init0(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
assert handle >= 0;
assert chunk != null;
//该byteBuf属于哪个PoolChunk
this.chunk = chunk;
//该byteBuf申请的jdk底层数据结构,heap类型就是byte[],direct类型就是DirectByteBuffer
memory = chunk.memory;
//jdk nio 的ByteBuffer
tmpNioBuf = nioBuffer;
//分配器
allocator = chunk.arena.parent;
//PoolThreadCache
this.cache = cache;
//bitmapIdx + memoryMapIdx
this.handle = handle;
//缓存行偏移
this.offset = offset;
//申请的内存大小reqCapacity
this.length = length;
//规范化后的内存大小normalCapacity
this.maxLength = maxLength;
}
到此allocate主流程的一个主分支分析完了
网友评论