(*文章基于Netty4.1.22版本)
整体介绍
分析PoolChunk的时候,讲到大于等于8KB走Chunk,否则走PoolSubpage,为什么要这么做呢?假如我们需要分配1KB的内存,那么如果还是用Chunk来分配,那么由于Chunk是默认8KB,那么就有7KB的空间浪费了,无法分配,因为一个Chunk是一个整体,所以Netty在Chunk之下拆分了大小相等的内存段,即PoolSubpage,这样空间的利用就更合理了
源码分析
字段介绍
final class PoolSubpage<T> implements PoolSubpageMetric {
final PoolChunk<T> chunk;
// 即PoolChunk的id
private final int memoryMapIdx;
// page在Chunk叶子节点的相对位移,如果id为2048则runOffset为0,2049则为1,以此类推
private final int runOffset;
// Chunk的大小
private final int pageSize;
// 用来标记内存段分配情况的数组,原理类似Java的BitSet
private final long[] bitmap;
// 两个指针
PoolSubpage<T> prev;
PoolSubpage<T> next;
boolean doNotDestroy;
// 每个内存段的大小
int elemSize;
// 内存段的数量
private int maxNumElems;
private int bitmapLength;
private int nextAvail;
// 可用的内存段数量
private int numAvail;
}
基本介绍如上,其中需要详细分析的是这个bitmap数组,数组里每个元素是long类型,bitmap在初始化的时候会设置pageSize>>>10个元素,即8个元素
- 那么为什么是8?
在分配的时候,外部会先经过PoolArena(后续分析),这时会处理请求的字节大小,保证其为2的N次方,且最小为16,那么8KB的情况下,最多会分割成512个内存段,一个long是64位,512/64=8,所以最多只需要8个long元素就可以分配所有段。
每个long元素有64位,每一位可以用来标志一个内存段的使用情况,由于是使用类似BitSet的结构,bitmap的结构图如下:
bitmap结构.png
bitmap有8个long元素,每个long有64位,每一位都能标志一个内存的分配情况,共计8*64=512位。
初始化
// PoolArena中有个page的pool,head为里面的头结点
PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
this.chunk = chunk;
this.memoryMapIdx = memoryMapIdx;
this.runOffset = runOffset;
this.pageSize = pageSize;
bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
init(head, elemSize);
}
// 初始化每个段落的基本信息,例如每个内存段大小,内存段的数量,然后加入到Arena的PagePool的头结点中
void init(PoolSubpage<T> head, int elemSize) {
doNotDestroy = true;
this.elemSize = elemSize;
if (elemSize != 0) {
// 元素大小=可用数量=Chunk的pageSize / 每个元素的大小
maxNumElems = numAvail = pageSize / elemSize;
nextAvail = 0;
bitmapLength = maxNumElems >>> 6;
if ((maxNumElems & 63) != 0) {
bitmapLength ++;
}
for (int i = 0; i < bitmapLength; i ++) {
bitmap[i] = 0;
}
}
addToPool(head);// 加入到头结点后面
}
分配
在PoolChunk分配的时候,如果小于8KB会调用allocateSubpage,其中会初始化PoolSubpage并用其分配
private long allocateSubpage(int normCapacity) {
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
synchronized (head) {//
// ....
final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize;
freeBytes -= pageSize;
// subpageIdx = 0 1 2 3 4 ....2047 即最底下的叶子节点个数
int subpageIdx = subpageIdx(id);
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
// 初始化
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
subpages[subpageIdx] = subpage;
} else {
subpage.init(head, normCapacity);
}
return subpage.allocate();
}
}
long allocate() {
//....
// 找出一个可用的位置
final int bitmapIdx = getNextAvail();
int q = bitmapIdx >>> 6;// 图中的q
int r = bitmapIdx & 63;// 图中的r
assert (bitmap[q] >>> r & 1) == 0;
bitmap[q] |= 1L << r;
// 没分配一个位置,numAvail-1
if (-- numAvail == 0) {//当全部分配完毕,将该subpage从链表中移除
removeFromPool();
}
return toHandle(bitmapIdx);
}
private long toHandle(int bitmapIdx) {
return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
}
获取下一个可用的位置
private int getNextAvail() {
int nextAvail = this.nextAvail;
if (nextAvail >= 0) {
this.nextAvail = -1;
return nextAvail;
}
return findNextAvail()//核心在这里
}
private int findNextAvail() {
final long[] bitmap = this.bitmap;
final int bitmapLength = this.bitmapLength;
for (int i = 0; i < bitmapLength; i ++) {
long bits = bitmap[i];
if (~bits != 0) {// 该long元素上还有未分配的位置,当64位全为1的时候~bits才等于0
return findNextAvail0(i, bits);
}
}
return -1;
}
private int findNextAvail0(int i, long bits) {
final int maxNumElems = this.maxNumElems;
final int baseVal = i << 6;//0 64 128....
for (int j = 0; j < 64; j ++) {
if ((bits & 1) == 0) {//还有可用的位置,取得一个可以用的位置
int val = baseVal | j;
if (val < maxNumElems) {
return val;
} else {
break;
}
}
bits >>>= 1;
}
return -1;
}
getNextAvail会获取0~maxNumElems中可以用的位置
bitmap设值
bitmap[q] |= 1L << r;
这句代码怎么理解呢,其实就是如下的过程:
r=0:0000....0001 ->1
r=1:0000....0010 ->3
r=2:0000....0100 ->7
r=3:0000....1000 ->15
下一个r与上一个bitmap的值做或操作
返回page索引
当使用PoolChunk分配的时候,会返回一个id,即树中节点的值,代表该节点被分配,那么在使用PoolSubpage的时候也需要返回一个值代表page中某个节点被分配,通过上面的分析我们知道了,Page最多有512个节点,那么返回的值有0~511,那么如果就这样返回出去了,那么外部就无法知道到底是分配了Chunk还是Page,因为释放的时候,会传入这个唯一标志值,所以在这里调用了toHandle进行了一波骚操作,会让其变成一个64位的信息,然后释放的时候就可以知道该释放Chunk还是Page。
当然了,如果直到了用Chunk还是Page还是不够的,因为不知道用的是哪个Chunk的Page,所以在toHandle方法中还会看到对memoryMapIdx进行了计算
0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
左移32位转换成64位信息是为了判断是用Page还是用Chunk,和memoryMapIdx做位运算是为了判断在属于哪个Chunk
(这里要感谢闪电哥的热心解答,嘿嘿)
释放
// 如果返回true代表该PoolSubpage已经被使用
// 如果返回false代表该PoolSubpage已经没有使用了
boolean free(PoolSubpage<T> head, int bitmapIdx) {
if (elemSize == 0) {
return true;
}
int q = bitmapIdx >>> 6;
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) != 0;
bitmap[q] ^= 1L << r;
// 释放的时候设置nextAvail为当前的bitmapIdx,下次getNextAvail的时候可以直接使用
setNextAvail(bitmapIdx);
if (numAvail ++ == 0) {
addToPool(head);// 加入到头结点后面
return true;
}
if (numAvail != maxNumElems) {
return true;
} else {//一个未用
if (prev == next) {// 如果只有head和当前节点,那么直接返回
return true;
}
// 没有使用了,那么可以把该节点从链表中移除
doNotDestroy = false;
removeFromPool();
return false;
}
}
网友评论