关键属性
// tiny数组的大小, 32
static final int numTinySubpagePools = 512 >>> 4;
// 所属的PooledByteBufAllocator
final PooledByteBufAllocator parent;
// chunk二叉树的最大高度, 默认11
private final int maxOrder;
// pagesiz, 默认1 << 13 = 8192
final int pageSize;
// pageShifts=log(pageSize),默认13
final int pageShifts;
// 默认16M
final int chunkSize;
// ~(pageSize - 1) = 11111111111111111110000000000000
final int subpageOverflowMask;
// pageShifts - 9 = 4
final int numSmallSubpagePools;
// tiny数组
private final PoolSubpage<T>[] tinySubpagePools;
// small数组
private final PoolSubpage<T>[] smallSubpagePools;
构造方法
image imageprotected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
...
// 初始化tinySubpagePools
// 创建长度为32的PoolSubpage数组
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
for (int i = 0; i < tinySubpagePools.length; i ++) {
// 给每个元素都初始化一个head
tinySubpagePools[i] = newSubpagePoolHead(pageSize);
}
// 初始化numSmallSubpagePools
numSmallSubpagePools = pageShifts - 9;
// 创建长度为4的PoolSubpage数组
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
for (int i = 0; i < smallSubpagePools.length; i ++) {
// 给每个元素都初始化一个head
smallSubpagePools[i] = newSubpagePoolHead(pageSize);
}
...chunklist...
}
private PoolSubpage<T>[] newSubpagePoolArray(int size) {
return new PoolSubpage[size];
}
private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
head.prev = head;
head.next = head;
return head;
}
- tinySubpagePools:用于分配小于512字节的内存,默认长度为32,因为内存分配最小为16,每次增加16,直到512,区间[16,512)一共有32个不同值
- smallSubpagePools:用于分配大于等于512字节的内存,默认长度为4, 每次容量翻倍
定位
既然tinySubpagePools或smallSubpagePools设计成不同elemsize组成的,那么怎么根据我请求的内存大小来去定制具体的head呢?
findSubpagePoolHead
PoolSubpage<T> findSubpagePoolHead(int elemSize) {
int tableIdx;
PoolSubpage<T>[] table;
// 如果申请的空间是属于tiny的部分
if (isTiny(elemSize)) { // < 512
// 根据elemsize除去16可以很容易拿到tinySubpagePools对应大小的head
tableIdx = elemSize >>> 4;
table = tinySubpagePools;
} else {
// 否则去smallSubpagePools里面申请
tableIdx = 0;
// 这里很巧妙的来计算small的对应elemsize的head
// 512-1023之间的数字>>>10的结果为0
// 1024-2047之间的数字>>>10的结果为1
// 2048-4095之间的数字>>>10的结果为10
// 4096-8195之间的数字>>>10的结果为100
// 将以上的结果右移一位直到等于0,所用的次数正好是数组的下标
elemSize >>>= 10;
while (elemSize != 0) {
elemSize >>>= 1;
tableIdx ++;
}
table = smallSubpagePools;
}
return table[tableIdx];
}
分配
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
// 对申请的内存大小做标准化
final int normCapacity = normalizeCapacity(reqCapacity);
// 如果小于pagesize,8k
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
// 如果小于512,属于tiny的范围
if (tiny) { // < 512
// 去本地缓存申请tiny空间,如果能成功,直接返回
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
// 拿到tinySubpagePools中该容量所属的位置
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
// 否则属于small的范围
} else {
// 去本地缓存申请small空间,如果能成功,直接返回
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
// 拿到smallSubpagePools中该容量所属的位置
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
// 拿到对应的head
final PoolSubpage<T> head = table[tableIdx];
// head同步处理,
synchronized (head) {
final PoolSubpage<T> s = head.next;
// head的next有PoolSubpage存在,说明之前已经分配过内存了,看看能不能复用这段内存
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
// 申请空间,拿到内存地址handle
long handle = s.allocate();
// 申请成功
assert handle >= 0;
// 将这段申请的内存空间与ByteBuf绑定
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
if (tiny) {
allocationsTiny.increment();
} else {
allocationsSmall.increment();
}
return;
}
}
// 到这里,说明没有缓存可用,那么去内存池去获取
allocateNormal(buf, reqCapacity, normCapacity);
return;
}
// 如果超过page但还没有到chunksize,那么直接新建chunk
if (normCapacity <= chunkSize) {
// 去本地缓存申请normal空间
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
// 直接去内存池去获取
allocateNormal(buf, reqCapacity, normCapacity);
} else {
// Huge allocations are never served via the cache so just call allocateHuge
// 直接去申请Unpooled内存空间
allocateHuge(buf, reqCapacity);
}
}
allocateNormal
private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
// 在poolchunklist中看有没有chunk能进行分配,当然了,刚开始,这些里面都是没有的。
// 只有在实际生成chunk后才会加到这些list里面。
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity)) {
++allocationsNormal;
return;
}
// 新增chunk
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
// 在chunk里申请内存空间
long handle = c.allocate(normCapacity);
++allocationsNormal;
// 如果申请成功
assert handle > 0;
// 与Bytebuf绑定
c.initBuf(buf, handle, reqCapacity);
// 将该chunk加到qInit的list。
qInit.add(c);
}
标准化
int normalizeCapacity(int reqCapacity) {
if (reqCapacity < 0) {
throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
}
// 如果超过chunksize,直接返回
if (reqCapacity >= chunkSize) {
return reqCapacity;
}
// 因为整个chunk只有tiny是16B递增的。其他都是翻倍递增,也就是以2为幂的数
// 只有当超过tiny,也就是>=512, 才需要计算
if (!isTiny(reqCapacity)) { // >= 512
// Doubled
// 这里来计算离给定reqCapacity最近的下一个2的幂次方
// 比如给定8888,会返回16K
int normalizedCapacity = reqCapacity;
normalizedCapacity --;
normalizedCapacity |= normalizedCapacity >>> 1;
normalizedCapacity |= normalizedCapacity >>> 2;
normalizedCapacity |= normalizedCapacity >>> 4;
normalizedCapacity |= normalizedCapacity >>> 8;
normalizedCapacity |= normalizedCapacity >>> 16;
normalizedCapacity ++;
if (normalizedCapacity < 0) {
normalizedCapacity >>>= 1;
}
return normalizedCapacity;
}
// Quantum-spaced
// 到这里说明是tiny的范围,如果已经是16的倍数了,那么满足条件,直接返回。
if ((reqCapacity & 15) == 0) {
return reqCapacity;
}
// 否则标准化成16的倍数
return (reqCapacity & ~15) + 16;
}
重分配
void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
}
int oldCapacity = buf.length;
if (oldCapacity == newCapacity) {
return;
}
PoolChunk<T> oldChunk = buf.chunk;
long oldHandle = buf.handle;
T oldMemory = buf.memory;
int oldOffset = buf.offset;
int oldMaxLength = buf.maxLength;
int readerIndex = buf.readerIndex();
int writerIndex = buf.writerIndex();
allocate(parent.threadCache(), buf, newCapacity);
// 如果新的容量比之前的大,那么将之前内存的内容复制到新的内存空间里面
if (newCapacity > oldCapacity) {
memoryCopy(
oldMemory, oldOffset,
buf.memory, buf.offset, oldCapacity);
// 如果是缩容的操作,且能放下部分read的数据,那么将read的数据复制过去
// 当新容量不足时,允许截取
} else if (newCapacity < oldCapacity) {
if (readerIndex < newCapacity) {
if (writerIndex > newCapacity) {
writerIndex = newCapacity;
}
memoryCopy(
oldMemory, oldOffset + readerIndex,
buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
} else {
readerIndex = writerIndex = newCapacity;
}
}
buf.setIndex(readerIndex, writerIndex);
// 释放老的内存空间
if (freeOldMemory) {
free(oldChunk, oldHandle, oldMaxLength, buf.cache);
}
}
网友评论