Netty-PoolThreadCache
概要
imagePoolThreadCache顾名思义,就是跟线程绑定的cache。
PoolThreadCahche是Netty内存管理中能够实现高效内存申请和释放的一个重要原因,Netty会为每一个线程都维护一个PoolThreadCache对象,当进行内存申请时,首先会尝试从PoolThreadCache中申请,如果无法从中申请到,则会尝试从Netty的公共内存池中申请.
关键属性
// 从Alloctor选中的heaparena
final PoolArena<byte[]> heapArena;
// 从Alloctor选中的directarena
final PoolArena<ByteBuffer> directArena;
// tiny/small/normal的caches
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
private final int numShiftsNormalDirect;
private final int numShiftsNormalHeap;
private final int freeSweepAllocationThreshold;
private int allocations;
private final Thread thread = Thread.currentThread();
private final Runnable freeTask = new Runnable() {
@Override
public void run() {
free0();
}
};
初始化
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
// TODO
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
this.heapArena = heapArena;
this.directArena = directArena;
// directArena处理
if (directArena != null) {
// tinySubPageHeapCaches数组缓存的是[16B, 32B, … , 496B]大小的内存块,
// 每个元素对应的缓存queue个数不能超过512个
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
// smallSubPageHeapCaches数组长度为4(如上图所示),
// 依次缓存[512K, 1024k, 2048k, 4096k]大小的缓存,
// 每个的元素对应的缓存queue个数不能超过256个
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalDirect = log2(directArena.pageSize);
// normalDirectCaches依次缓存[8k, 16k, 32k]大小的缓存
// 每个的元素对应的缓存queue个数不能超过64个
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);
// 如果上面初始化完成,那么numThreadCaches加一,代表directArena有新的线程来关联
directArena.numThreadCaches.getAndIncrement();
} else {
// No directArea is configured so just null out all caches
tinySubPageDirectCaches = null;
smallSubPageDirectCaches = null;
normalDirectCaches = null;
numShiftsNormalDirect = -1;
}
// 同direct
if (heapArena != null) {
// Create the caches for the heap allocations
tinySubPageHeapCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageHeapCaches = createSubPageCaches(
smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalHeap = log2(heapArena.pageSize);
normalHeapCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, heapArena);
heapArena.numThreadCaches.getAndIncrement();
} else {
// No heapArea is configured so just null out all caches
tinySubPageHeapCaches = null;
smallSubPageHeapCaches = null;
normalHeapCaches = null;
numShiftsNormalHeap = -1;
}
ThreadDeathWatcher.watch(thread, freeTask);
}
createNormalCaches
private static <T> MemoryRegionCache<T>[] createNormalCaches(
int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
if (cacheSize > 0) {
// 一个chunk按道理最大能撑到16M,但是如果只是作为线程的本地缓存,16M*64显然不必要,
//也极大浪费,而这里normal限定为8k,16k,32k就可以了,不至于浪费
int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
// 当然了需要3个位置来放置这些长度
int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
for (int i = 0; i < cache.length; i++) {
// NormalMemoryRegionCache代表两个意思,每个不同位置代表不同的normalsize
// 到时候申请空间,代表是去chunk申请,而不是subpage
cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
}
return cache;
} else {
return null;
}
}
createSubPageCaches
// tiny 32*512 0-512
// small 4*256 512-8192
// 逻辑同normal
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0) {
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
// TODO: maybe use cacheSize / cache.length
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
}
return cache;
} else {
return null;
}
}
MemoryRegionCache
概述
1564047989450.png 1564048025752.png从跟踪queue的add来看,可以得到一个结论。PoolThreadCache并不是自己闷着头将不同规格大小的cache初始化完,对外提供内存。而是去收集上层中不用的内存空间,保存在这里,以便下次复用。如果你不这么做,那么这些内存会被回收掉,也是一种浪费。在高并发的场景,内存的开销会很大。能复用一点再好不过了。
关键属性
// 队列能保存的最大元素个数
private final int size;
// 队列保存了可复用的内存空间
private final Queue<Entry<T>> queue;
// tiny or small or normal
private final SizeClass sizeClass;
// 堆外分配过多少次了
private int allocations;
分配
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
// 如果当前队列没有可用的Entry,那么直接返回false
Entry<T> entry = queue.poll();
if (entry == null) {
return false;
}
// 如果有的话,直接将buf与该entry进行绑定,这样也达到内存复用的目的
initBuf(entry.chunk, entry.handle, buf, reqCapacity);
entry.recycle();
// 如果成功复用,那么分配次数加一
++ allocations;
return true;
}
清理
public final void trim() {
int free = size - allocations;
allocations = 0;
// We not even allocated all the number that are
if (free > 0) {
free(free);
}
}
定位
这里的目的是根据arena和请求的空间大小来决定我要去拿到哪种规格, 多大的内存空间。
而这些在PoolThreadCache初始化的时候就已经规划好,下面就是具体定位的逻辑。
定位成功的话,拿到对应的MemoryRegionCache,而它里面queue队列中就是你需要的内存。假如可以复用的话。
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
// 拿到normCapacity对应的idex
int idx = PoolArena.tinyIdx(normCapacity);
// 去拿到对应的tiny的MemoryRegionCache
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.smallIdx(normCapacity);
if (area.isDirect()) {
return cache(smallSubPageDirectCaches, idx);
}
// 去拿到对应的small的MemoryRegionCache
return cache(smallSubPageHeapCaches, idx);
}
private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
if (area.isDirect()) {
int idx = log2(normCapacity >> numShiftsNormalDirect);
return cache(normalDirectCaches, idx);
}
int idx = log2(normCapacity >> numShiftsNormalHeap);
// 去拿到对应的normal的MemoryRegionCache
return cache(normalHeapCaches, idx);
}
private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
// 可以看到,假如你传入的idx不合适的话,说明你申请的空间大小不符合cache的条件
// 那么返回null,代表cache中没有你需要的空间
if (cache == null || idx > cache.length - 1) {
return null;
}
return cache[idx];
}
分配
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}
boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
}
boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
}
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
if (cache == null) {
// no cache found so just return false here
return false;
}
// 去cache里面看有没有可重用的空间
boolean allocated = cache.allocate(buf, reqCapacity);
// 每对外分配了freeSweepAllocationThreshold次数后,做trim
if (++ allocations >= freeSweepAllocationThreshold) {
allocations = 0;
// 清理空间
trim();
}
return allocated;
}
清理
从上面分配可知,当cache复用了一定次数的后,需要对cache里面的空间做一次清理。Netty认为,隔了这么久的时间里面,如果有Entry还没有人认领,那么这些空间可能使用率会指数级递减。与其放着占着内存,不如全部释放掉。
void trim() {
// 每种规格的cache都需要清理
trim(tinySubPageDirectCaches);
trim(smallSubPageDirectCaches);
trim(normalDirectCaches);
trim(tinySubPageHeapCaches);
trim(smallSubPageHeapCaches);
trim(normalHeapCaches);
}
private static void trim(MemoryRegionCache<?>[] caches) {
if (caches == null) {
return;
}
for (MemoryRegionCache<?> c: caches) {
trim(c);
}
}
private static void trim(MemoryRegionCache<?> cache) {
if (cache == null) {
return;
}
// 调用MemoryRegionCache的清理
cache.trim();
}
// MemoryRegionCache
public final void trim() {
// 计算还能存多少个Entry,也就是free
int free = size - allocations;
allocations = 0;
// We not even allocated all the number that are
if (free > 0) {
free(free);
}
}
private int free(int max) {
int numFreed = 0;
// 极端情况下,上一步可以得到free个内存空间等待释放,也就是queue全满的情况
for (; numFreed < max; numFreed++) {
// 从队列中取出一个Entry
Entry<T> entry = queue.poll();
if (entry != null) {
freeEntry(entry);
} else {
// all cleared
return numFreed;
}
}
return numFreed;
}
private void freeEntry(Entry entry) {
PoolChunk chunk = entry.chunk;
long handle = entry.handle;
// entry回收
entry.recycle();
// 底层chunk去释放
chunk.arena.freeChunk(chunk, handle, sizeClass);
}
网友评论