1.PoolThreadCache
在JEMalloc分配算法文中,将PoolThreadCache
类比为同城仓库,可以就近提取中小型货物。本文将详细介绍PoolThreadCache
的细节和实现,在Netty中,其内部结构可见下图:
这里,新引入一个数据类型MemoryRegionCache
,其内部是一个ByteBuf
队列。每个节点是一个ByteBuf
的说法并不准确,切确的说,是不再使用的ByteBuf待释放的内存空间,可以再次使用这部分空间构建ByteBuf
对象。根据分配请求大小的不同,MemoryRegionCache
可以分为Tiny,Small,Normal三种。为了更方便的根据请求分配时的大小找到满足需求的缓存空间,每一种MemoryRegionCache
又根据规范化后的大小依次组成数组,Tiny、Small、Normal的数组大小依次为32、4、12。
其中
ByteBuf
队列的长度是有限制的,Tiny、Small、Normal依次为512、256、64。为了更好的理解,举例子如下:
16B -- TinyCache[1] -- (Buf512-...-Buf3-Buf2-Buf1)
32B -- TinyCache[2] -- ()
496B -- TinyCache[31] -- (Buf2-Buf1)
512B -- SmallCache[0] -- (Buf256-...-Buf3-Buf2-Buf1)
8KB -- NormalCache[0] - (Buf64 -...-Buf3-Buf2-Buf1)
在线程缓存中,待回收的空间根据大小排列,比如,最大空间为16B的ByteBuf
被缓存时,将被置于数组索引为1的MemoryRegionCache
中,其中又由其中的队列存放该ByteBuf
的空间信息,队列的最大长度为512。也就是说,16B的ByteBuf
空间可以缓存512个,512B可以缓存256个,8KB可以缓存64个。
明白了这些,开始分析PoolThreadCache
的细节实现,首先看成员变量:
// 各类型的Cache数组
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
// 用于计算normal请求的数组索引 = log2(pageSize)
private final int numShiftsNormalDirect;
private final int numShiftsNormalHeap;
private int allocations; // 分配次数
private final int freeSweepAllocationThreshold; // 分配次数到达该阈值则检测释放
private final Thread deathWatchThread; // 线程结束观察者
private final Runnable freeTask;
在构造方法中省略同类型Cache数组的构造,代码如下:
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
this.heapArena = heapArena;
this.directArena = directArena;
if (directArena != null) {
numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);
directArena.numThreadCaches.getAndIncrement();
} else {
normalDirectCaches = null;
numShiftsNormalDirect = -1;
}
if ( normalDirectCaches != null) {
freeTask = (Runnable) () -> {free0();};
deathWatchThread = Thread.currentThread();
// 观察线程每隔一个周期检测当前线程是否存活
ThreadDeathWatcher.watch(deathWatchThread, freeTask);
} else {
freeTask = null;
deathWatchThread = null;
}
}
缓存数组的构建方法如下:
private static <T> MemoryRegionCache<T>[] createNormalCaches(
int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
if (cacheSize > 0) {
int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
for (int i = 0; i < cache.length; i++) {
cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
}
return cache;
} else {
return null;
}
}
其中的参数maxCachedBufferCapacity
为缓存Buf的最大容量,因为Normal的ByteBuf
最大容量为16MB,且默认缓存64个,这是巨大的内存开销,所以设置该参数调节缓存Buf的最大容量。比如设置为16KB,那么只有16KB和8KB的ByteBuf
缓存,其他容量的Normal请求就不缓存,这样大大减小了内存占用。在Netty中,该参数的默认值为32KB。
为了更好的理解回收内存进而再分配的过程,先介绍PoolThreadCache
中的数据结构,首先看MemoryRegionCache
,它的成员变量如下:
private final int size; // 队列长度
private final Queue<Entry<T>> queue; // 队列
private final SizeClass sizeClass; // Tiny/Small/Normal
private int allocations; // 分配次数
其中Entry
的成员变量如下:
final Handle recyclerHandle; // 回收该对象
PoolChunk<T> chunk; // ByteBuf之前分配所属的Chunk
long handle = -1; // ByteBuf在Chunk中的分配信息
此处重点分析MemoryRegionCache
的构造方法:
MemoryRegionCache(int size, SizeClass sizeClass) {
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass;
}
这里使用了一个MPSC(Multiple Producer Single Consumer)队列即多个生产者单一消费者队列,之所以使用这种类型的队列是因为:ByteBuf
的分配和释放可能在不同的线程中,这里的多生产者即多个不同的释放线程,这样才能保证多个释放线程同时释放ByteBuf
时所占空间正确添加到队列中。
明白了这些,开始分析ByteBuf
的回收过程,当一个ByteBuf
不再使用,arena首先调用如下方法尝试缓存:
boolean add(PoolArena<?> area, PoolChunk chunk, long handle,
int normCapacity, SizeClass sizeClass) {
// 在缓存数组中找到符合的元素
MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
if (cache == null) {
return false;
}
return cache.add(chunk, handle);
}
private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
switch (sizeClass) {
case Normal:
return cacheForNormal(area, normCapacity);
case Small:
return cacheForSmall(area, normCapacity);
case Tiny:
return cacheForTiny(area, normCapacity);
default:
throw new Error();
}
}
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
// normCapacity >>> 4, 即16B的索引为1
int idx = PoolArena.tinyIdx(normCapacity);
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
实质的缓存操作为MemoryRegionCache
的add()
方法,代码如下:
public final boolean add(PoolChunk<T> chunk, long handle) {
Entry<T> entry = newEntry(chunk, handle);
boolean queued = queue.offer(entry);
if (!queued) {
// 队列已满,不缓存,立即回收entry对象进行下一次分配
entry.recycle();
}
return queued;
}
private static Entry newEntry(PoolChunk<?> chunk, long handle) {
Entry entry = RECYCLER.get(); // 从池中取出entry对象
entry.chunk = chunk;
entry.handle = handle;
return entry;
}
以上便是PoolThreadCache
的回收过程,接下来分析分配过程,以Tiny请求分配为例:
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf,
int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
if (cache == null) {
return false; // 缓存数组中没有
}
boolean allocated = cache.allocate(buf, reqCapacity); // 实际的分配
// 分配次数达到整理阈值
if (++ allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim(); // 整理
}
return allocated;
}
实质的分配依然在MemoryRegionCache
中,代码如下:
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
Entry<T> entry = queue.poll(); // 从队列头部取出
if (entry == null) {
return false;
}
// 在之前ByteBuf同样的内存位置分配一个新的`ByteBuf`对象
initBuf(entry.chunk, entry.handle, buf, reqCapacity);
entry.recycle(); // entry对象回收利用
++ allocations; // 该值是内部量,和上一方法不同
return true;
}
protected void initBuf(PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf,
int reqCapacity) {
chunk.initBufWithSubpage(buf, handle, reqCapacity);
}
分配正好是回收的逆过程,接下来再分析释放过程,代码如下:
void free() {
if (freeTask != null) {
assert deathWatchThread != null;
ThreadDeathWatcher.unwatch(deathWatchThread, freeTask);
}
free0();
}
private void free0() {
int numFreed = free(tinySubPageDirectCaches) + ... +
free(normalHeapCaches);
if (directArena != null) {
directArena.numThreadCaches.getAndDecrement();
}
if (heapArena != null) {
heapArena.numThreadCaches.getAndDecrement();
}
}
与分配和回收类似,稍有不同的是:释放需要遍历Cache数组,对每一个MemoryRegionCache
执行free()
,代码如下:
public final int free() {
return free(Integer.MAX_VALUE);
}
private int free(int max) {
int numFreed = 0;
for (; numFreed < max; numFreed++) {
Entry<T> entry = queue.poll();
if (entry != null) {
freeEntry(entry);
} else {
return numFreed; // 队列中所有节点都被释放
}
}
return numFreed;
}
private void freeEntry(Entry entry) {
PoolChunk chunk = entry.chunk;
long handle = entry.handle;
entry.recycle(); // 回收entry对象
chunk.arena.freeChunk(chunk, handle, sizeClass); // 释放实际的内存空间
}
在分配过程还有一个trim()
方法,当分配操作达到一定阈值(Netty默认8192)时,没有被分配出去的缓存空间都要被释放,以防止内存泄漏,核心代码如下:
// 内部类MemoryRegionCache
public final void trim() {
// allocations 表示已经重新分配出去的ByteBuf个数
int free = size - allocations;
allocations = 0;
// 在一定阈值内还没被分配出去的空间将被释放
if (free > 0) {
free(free); // 释放队列中的节点
}
}
也就是说,期望一个MemoryRegionCache
频繁进行回收-分配,这样allocations > size,将不会释放队列中的任何一个节点表示的内存空间;但如果长时间没有分配,则应该释放这一部分空间,防止内存占据过多。Tiny请求缓存512个节点,由此可知当使用率超过 512 / 8192 = 6.25% 时就不会释放节点。
虽然PoolThreadCache
名称中含有Thread,但到目前为止的分析,与线程没有一点关系。的确如此,使他们产生联系的正是下一节的主题:PoolThreadLocalCache
。
2.PoolThreadLocalCache
线程缓存,直接想到的应该是ThreadLocal
,Netty更进一步,使用改进的FastThreadLocal
。首先看类签名:
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache>
这里不介绍FastThreadLocal
的原理,可简单认为是JDK原生的ThreadLocal
的改进版,效率更高。当请求得到线程本地变量时,会调用initialValue()
方法,代码如下:
protected synchronized PoolThreadCache initialValue() {
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
// useCacheForAllThreads是全局变量,是否全部线程使用缓存
// 或者线程是FastThreadLocalThread即Netty中的IO线程
if (useCacheForAllThreads || Thread.currentThread() instanceof FastThreadLocalThread) {
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
// 其他情况下不使用缓存
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
当设置全局变量useCacheForAllThreads
为true(默认情况true)时或者线程是FastThreadLocalThread
的子类(IO线程)时,才会启用缓存。
其中的leastUsedArena()
方法,使得线程均等使用Arena,代码如下:
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
if (arenas == null || arenas.length == 0) {
return null;
}
// 寻找使用缓存最少的arena
PoolArena<T> minArena = arenas[0];
for (int i = 1; i < arenas.length; i++) {
PoolArena<T> arena = arenas[i];
if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
minArena = arena;
}
}
return minArena;
}
每次都寻找被最少线程使用的Arena分配新的线程,这样每个Arena都能均等分到线程数,从而平均Arena的负载。
当线程生命周期结束时,调用onRemoval()
方法进行一些清理操作,代码如下:
protected void onRemoval(PoolThreadCache threadCache) {
threadCache.free(); // 释放线程缓存中未分配的空间
}
3.ThreadDeathWatcher
当线程意外结束时,如果没有释放未分配的空间,将会造成内存泄漏。为了以防万一,Netty采用一个Watcher线程观察使用缓存的线程,当线程意外结束时,确保内存一定被释放。出于性能考虑,Netty并没有使用一个常驻的后台Watcher线程,而是当有需要观察的线程时,才启动Watcher线程;当所有观察任务结束,Watcher线程亦结束。当然,这会增加一些代码的复杂度。
首先分析成员变量,由于后台只需要一个Watcher线程,所以变量都被定义为static
:
// 被观察的线程
private static final Queue<Entry> pendingEntries = new ConcurrentLinkedQueue<Entry>();
private static final Watcher watcher = new Watcher(); // 观察任务
private static final AtomicBoolean started = new AtomicBoolean(); // 观察线程是否启动
private static volatile Thread watcherThread; // 观察线程
其中的Entry
表示被观察的线程及其缓存释放时需要执行的清理操作,其中的变量如下:
final Thread thread; // 被观察线程
final Runnable task; // 清理缓存操作
final boolean isWatch; // 是否需要被观察
在PoolThreadCache
的构造方法中最后调用ThreadDeathWatcher.watch()
方法观察该调用线程,实现代码如下:
public static void watch(Thread thread, Runnable task) {
if (!thread.isAlive()) {
throw new IllegalArgumentException("thread must be alive.");
}
schedule(thread, task, true);
}
// entry.isWatch = isWatch
private static void schedule(Thread thread, Runnable task, boolean isWatch) {
pendingEntries.add(new Entry(thread, task, isWatch)); // 加入等待队列
if (started.compareAndSet(false, true)) {
// started为false则启动观察线程
Thread watcherThread = threadFactory.newThread(watcher);
watcherThread.start();
ThreadDeathWatcher.watcherThread = watcherThread;
}
}
可见实现很简单,将需要观察的线程加入到等待队列,当没有观察线程时启动观察线程即可。对应地,当含有缓存的线程主动释放缓存时会调用unwatch()
方法,代码如下:
public static void unwatch(Thread thread, Runnable task) {
schedule(thread, task, false);
}
只是设置isWatch
,其余与watch如出一辙。下面分析关键的Watcher
任务,类方法和成员变量如下:
private static final class Watcher implements Runnable {
private final List<Entry> watchees = new ArrayList<Entry>();
}
由于在watch()
和unwatch()
方法中使用了isWatch
判断是否需要观察,此处的watchees
中只包含需要观察的线程。核心的run()
方法如下:
public void run() {
for (;;) {
fetchWatchees(); // 得到需要观察的线程
notifyWatchees(); // 线程异常结束执行清理操作
// 再次执行,防止notify触发新的观察线程
fetchWatchees();
notifyWatchees();
try {
Thread.sleep(1000); // 周期1秒
} catch (InterruptedException ignore) {
// Ignore the interrupt; do not terminate until all tasks are run.
}
// 没有需要被观察的线程则结束该观察线程
if (watchees.isEmpty() && pendingEntries.isEmpty()) {.
boolean stopped = started.compareAndSet(true, false);
assert stopped;
if (pendingEntries.isEmpty()) {
break; // 没有任务则退出
}
if (!started.compareAndSet(false, true)) {
break; // 此时watch()方法启动新的观察线程,该线程退出
}
}
}
}
可见,观察线程以1秒为周期检测使用缓存的线程是否非正常死亡,如果非正常死亡则执行遗嘱中的清理操作。检测过程和清理过程如下:
private void fetchWatchees() {
for (;;) {
Entry e = pendingEntries.poll();
if (e == null) {
break;
}
if (e.isWatch) {
watchees.add(e); // 需要观察
} else {
watchees.remove(e); // 正常原因死亡,不需要遗嘱
}
}
}
private void notifyWatchees() {
List<Entry> watchees = this.watchees;
for (int i = 0; i < watchees.size();) {
Entry e = watchees.get(i);
if (!e.thread.isAlive()) { // 非正常死亡
watchees.remove(i);
try {
e.task.run(); // 遗嘱中的清理操作
} catch (Throwable t) {
logger.warn("Thread death watcher task raised an exception:", t);
}
} else {
i ++;
}
}
}
细心的朋友可能会觉得watchees.add(e)
和watchees.remove(e)
会删除不同的entry
对象(watch和unwatch添加了两个entry对象),实际中并不会,这是因为entry
特殊的equals()
方法即isWatch
不参与相等比较:
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof Entry)) {
return false;
}
Entry that = (Entry) obj;
return thread == that.thread && task == that.task;
}
public int hashCode() {
return thread.hashCode() ^ task.hashCode();
}
至此,线程缓存PoolThreadCache
分析完毕。
相关链接:
网友评论