PooledByteBufAllocator内存分配使用实例:
public static void main(String[] args) {
int elemSize = 1024 * 2;
//PooledByteBufAllocator中默认会创建一个池化内存分配器Default
PooledByteBufAllocator pooledByteBufAllocator = PooledByteBufAllocator.DEFAULT;
for (int i=0; i<10; i++) {
ByteBuf byteBuf = pooledByteBufAllocator.directBuffer(elemSize);
System.out.println(JSON.toJSONString(byteBuf));
}
}
这里采用PooledByteBufAllocator池化内存分配器分配PooledDirectByteBuffer对象作为实例进行分析
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
//...
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
//获取当前线程的本地变量PoolThreadCache,PoolThreadLocalCache中获取
PoolThreadCache cache = threadCache.get();
//获取到cache中的directArena,通过directArena进行内存分配
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
//在directArena中分配缓冲区,initialCapacity为申请大小,maxCapacity为Integer.MAX_VALUE
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
//...
}
abstract class PoolArena<T> implements PoolArenaMetric {
//池化缓冲区分配
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
//常见PooledByteBuf对象
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
allocate(cache, buf, reqCapacity);
return buf;
}
@Override
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
if (HAS_UNSAFE) {
return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
} else {
return PooledDirectByteBuf.newInstance(maxCapacity);
}
}
}
final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> {
//创建PooledByteBuf对象
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
//从缓存池中获取PooledUnsafeDirectByteBuf缓冲区对象
PooledUnsafeDirectByteBuf buf = RECYCLER.get();
//重置缓冲区对象
buf.reuse(maxCapacity);
return buf;
}
重置缓冲区对象
final void reuse(int maxCapacity) {
//设置缓冲区最大值Integer.MAX_VALUE
maxCapacity(maxCapacity);
//重置ByteBuf引用计数为0,ByteBuf采用引用计数的方式进行内存回收
resetRefCnt();
//设置ByteBuf索引为0,readerIndex = writerIndex = 0
setIndex0(0, 0);
//读和写的标记值为0,markedReaderIndex = markedWriterIndex = 0
discardMarks();
}
}
-
workThread获取当前线程的PoolThreadLocal缓存,缓存池对象为空会进行initial一个
-
然后PooledByteBufAllocator将分配缓冲区的任务交给了DirectPoolArena进行分配
-
DirectPoolArena先创建一个初始的PooledByteBuf对象,读写索引都为初始0,PooledByteBuf对象是从对象池中进行获取,池中没有会进行创建,这里使用到了Recycle对象池化,后续会进行分析
-
然后调用allocate方法进行缓冲区的分配
abstract class PoolArena<T> implements PoolArenaMetric {
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
//按照缓冲区初始申请大小进行内存规整
final int normCapacity = normalizeCapacity(reqCapacity);
//缓冲区大小规格是否为tiny或small,小于pageSize(8192)即是
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
//判断缓冲区大小是否为tiny规格,小于512
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
//从PoolThreadCache中进行缓存分配tiny规格缓冲区
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
//确定在tinySubpagePools数组下标,normCapacity / 16
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
//从PoolThreadCache中进行缓存分配small规格缓冲区
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
//确定在smallSubpagePools数组下标
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
//获取tinySubpagePool或smallSubpagePool中对应下标内存池队列
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
//在PoolSubpage中进行缓冲区分配
final PoolSubpage<T> s = head.next;
//第一次PoolSubpage是空的
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
//在chunk寻找subpage进行分配
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
incTinySmallAllocation(tiny);
return;
}
//normCapacity大于pageSize小于chunkSize直接在chunk中进行分配
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
//申请内存大小大于chunk也即huge规格内存直接创建分配
allocateHuge(buf, reqCapacity);
}
}
}
PoolArena内存组织示意图
-
对申请的内存大小进行规整,tiny类型按16的倍数进行规整,small和normal类型规整为大于申请内存的最接近2的指数次幂的值
-
根据规整后的内存大小来确认是在tinySubpagePool或者smallSubpagePools或者PoolChunk中分配,还是直接堆或对外内存分配。
-
如果内存规格为tiny或small,确定其在tinySubpagePools或smallSubpagePools中的下标,tinySubpagePools中是normCapacity / 16,smallCapacity中是normCapacity / 1024后取2的对数。如果内存规格是normal,从chunk中进行分配内存
-
如果tinySubpagePools或smallSubpagePools中都为空,也直接从chunk中进行分配
为什么tinySubpagePools和smallSubpagePools数组大小分别为32和4?
-
tinySubpagePools用于分配小于512字节的内存,并且在内存规整的时候是按16的倍数进行规整,[16,512)一共有32个不同的值,所以这里数组的长度就是32
-
smallSubpagePools用于分配大于512字节小于8192字节的内存,并且在内存规整的时候,他是按大于normCapacity并且最接近2的指数次幂的值,(0,1024],(1024,2048],(2048,4096],(4096,8192],所以smallSubpagePools数组大小为4
从chunk中分配内存allocateNormal
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
//从不同使用率的chunkList中分配内存,先从使用率在50%的chunk中分配
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity)) {
return;
}
//如果chunkList为空或者chunkList中没有合适内存进行分配,直接创建chunk进行内存分配
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
//从chunk中进行内存分配
boolean success = c.allocate(buf, reqCapacity, normCapacity);
assert success;
qInit.add(c);
}
-
在PoolArena中的ChunkList中找到合适的内存大小分配,优先从使用率为50%的chunk中进行分配
-
如果没有找到合适的chunk,则直接申请创建一个chunk进行内存分配
-
然后将新创建的chunk添加到qInit的chunkList中
为什么这里优先从q050中进行分配内存?
q050的内存使用率在50%~100%之间,q075的分配成功率不高,q000会导致内存使用率太低,优先使用他们不利于内存的高效利用和回收,优先分配q050能提高内存的使用率。
网友评论