内存分配是 netty 源码学习中的难点,也是 netty 高性能的关键之一。可以说这是 netty 的精髓。以下记录本人阅读源码的过程,因为能力有限,请读者抱着怀疑的态度观看,有不准确的地方还望指正。本文基于 netty 4.1.46 ,以 PooledByteBufAllocator 、Direct 、Unsafe 为讲解主线。
世间万物皆有始,下边让我们从一个 test 开始
public static void main(String[] args) {
int _1b = 1;
int _1k = 1024 * _1b;
PooledByteBufAllocator allocator = new PooledByteBufAllocator(true);
//申请 1b 内存, 此处 netty 会分配一个 16b 的连续的 byte,因为第一次分配, netty 向操作系统一次性申请 16M 内存
ByteBuf buffer = allocator.buffer(_1b);
//这里写不下 netty 会自动扩容
buffer.writeBytes("hello 田32143243242432432才".getBytes());
System.out.println(buffer.toString(Charset.forName("utf-8")));
//回收内存
buffer.release();
//再次申请 1b 内存,这里会复用上次申请到的 16b 内存
buffer = allocator.buffer(_1b);
//回收内存
buffer.release();
//分配一页内存,分配策略和分配 1b 内存完全不一样
buffer = allocator.buffer(8*_1k);
}
问题:
- 1、申请 1b 内存, 因为第一次分配, netty 向操作系统一次性申请 16M 内存,然后从中配出 16b 的连续的 byte,返回给用户。
* 向操作系统申请 16M 内存,是以什么方式存在的 byte[] 还是java.nio.ByteBuffer?
* 将一整块 byte[] 或 java.nio.ByteBuffer 如何切分? 如何能高效? - 2、回收内存
* 切分出来的内存如何回收
带着以上几个问题,正式开始学习
1、alloc 的 uml 图
image.png上图中相关类的用途:
- 1、PoolThreadCache
每一个线程都会有一个 PoolThreadCache 对象,负责初始化 PoolArena,并且缓存着回收后的内存对象。
2、PoolArena
内存分配的工具集合,缓存了页的碎片池 PoolSubpage<T>[] tinySubpagePools,按照PoolChunk 的使用率进行归类。
3、PoolChunk
管理一个 Chunk (从操作系统申请来的 16M 内存),如何划分page (8k 内存),为每个 page 分配一个详情对象 PoolSubpage。
4、PoolSubpage
此类用于将 page 拆分的工具,体现了 page 拆分情况,用于分配小于 8k (一个 page) 的内存。
5、MemoryRegionCache
用户缓存被回收后的内存,即调用了 buffer.release(); 方法的 buffer。
有了初步认识后,开始源码阅读。在源码阅读过程中可能还会出现更多的问题。能提出问题就是进步。
2、PooledByteBufAllocator 的初始化
(1)、获取一些可配置的参数,如果没有配置那么用默认值,这里展示一些重要的参数,篇幅原因其他部分省略了。
static {
//默认 8k 的页
int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
DEFAULT_PAGE_SIZE = defaultPageSize;
//分配规则的 完全二叉树 的层级
int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
DEFAULT_MAX_ORDER = defaultMaxOrder;
//Chunk 的大小 默认为 16M = 8<<11
final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
final Runtime runtime = Runtime.getRuntime();
final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
//最大堆内内存分配工具Arena 数量
DEFAULT_NUM_HEAP_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numHeapArenas",
(int) Math.min(
defaultMinNumArena,
runtime.maxMemory() / defaultChunkSize / 2 / 3)));
//最大堆外内存分配工具Arena 数量
DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numDirectArenas",
(int) Math.min(
defaultMinNumArena,
PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
//不同规格内存类型的缓冲数量
//0 - 512b
DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
//512b - 8k
DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
//8k - 16m
DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
}
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
//设置是否是 directBuffer 或 byte[] ,类型的内存分配
super(preferDirect);
//非常重要的 线程本地变量,内存分配都基于此引用
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
//pageSize 是每页的大小,maxOrder 是数的高度,因为树的叶子节点为 page
//所以个 chunkSize = pageSize * (1<<maxOrder)
// 默认 chunkSize 16M maxOrder 11层 pageSize 8k
chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
checkPositiveOrZero(nHeapArena, "nHeapArena");
checkPositiveOrZero(nDirectArena, "nDirectArena");
checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment");
if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
}
if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
throw new IllegalArgumentException("directMemoryCacheAlignment: "
+ directMemoryCacheAlignment + " (expected: power of two)");
}
//计算 1 位移多少位数等于 pageSize
int pageShifts = validateAndCalculatePageShifts(pageSize);
//堆内内存分配工具创建
if (nHeapArena > 0) {
heapArenas = newArenaArray(nHeapArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
for (int i = 0; i < heapArenas.length; i ++) {
PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
pageSize, maxOrder, pageShifts, chunkSize,
directMemoryCacheAlignment);
heapArenas[i] = arena;
metrics.add(arena);
}
heapArenaMetrics = Collections.unmodifiableList(metrics);
} else {
heapArenas = null;
heapArenaMetrics = Collections.emptyList();
}
//堆外内分配工具的创建
if (nDirectArena > 0) {
// nDirectArena 默认数量为 2 * cpu
// 因为在分配 EventLoop 的数量为 2 * cpu 所以此处就是希望,给每个线程分配一个 DirectArena,不用加锁
directArenas = newArenaArray(nDirectArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
for (int i = 0; i < directArenas.length; i ++) {
//创建内存区域 Arena
PoolArena.DirectArena arena = new PoolArena.DirectArena(
this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
directArenas[i] = arena;
metrics.add(arena);
}
directArenaMetrics = Collections.unmodifiableList(metrics);
} else {
directArenas = null;
directArenaMetrics = Collections.emptyList();
}
metric = new PooledByteBufAllocatorMetric(this);
}
看初始化的代码可以提出几个问题:
1、为啥 chunk 的大小默认为 16M
所有应用向操作系统OS内存申请是以 chunk (16m)为单位的。后续所有的操作都是在chunk中操作的。例如 要分配1m的内存,需要向操作系统申请一个chunk (16m),然后在chunk (16m)中取1m的内存。
2、为啥分配工具的数量为 2 倍逻辑cpu数量
因为在分配 EventLoop 的数量默认为 2 * cpu 所以此处就是希望,给每个线程分配一个 Arena,减少锁的竞争。
注意此处创建了 new PoolThreadLocalCache(useCacheForAllThreads); 分配内存时要从 PoolThreadLocalCache 对象中拿到一个线程本地副本变量。
然后看一下 Arena 的初始化方法,初始化了一些数据结构,为内存分配,和使用率汇总做准备。
//内存规格
enum SizeClass {
Tiny,
Small,
Normal
}
protected PoolArena(PooledByteBufAllocator parent, int pageSize,
int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
this.parent = parent;
//默认 8k = 8192
this.pageSize = pageSize;
//默认 11 因为 8*1024*(1<<11) = 16M = chunkSize
this.maxOrder = maxOrder;
//因为 pageSize 是2的整数倍数,所以 pageShifts 是1位移的位数,移动完成即可得到 pageSize
this.pageShifts = pageShifts;
//默认 16M = 16777216
this.chunkSize = chunkSize;
directMemoryCacheAlignment = cacheAlignment;
directMemoryCacheAlignmentMask = cacheAlignment - 1;
subpageOverflowMask = ~(pageSize - 1);
//数组长度默认 32
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
for (int i = 0; i < tinySubpagePools.length; i ++) {
//和下文的 MemoryRegionCache 算法差不多, PoolSubpage[1] = 16b PoolSubpage[2] = 32b .... PoolSubpage[31] = 512B
//在 Chunk 中的 allocateSubpage 创建 子page 时填充
tinySubpagePools[i] = newSubpagePoolHead(pageSize);
}
numSmallSubpagePools = pageShifts - 9;
//数组长度默认 4
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
for (int i = 0; i < smallSubpagePools.length; i ++) {
//PoolSubpage[1] = 512 PoolSubpage[2] = 1k .... PoolSubpage[3] = 4k
smallSubpagePools[i] = newSubpagePoolHead(pageSize);
}
//创建按照Chnuk使用率分组的集合类
q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
//通过双向链表方式进行连接
q100.prevList(q075);
q075.prevList(q050);
q050.prevList(q025);
q025.prevList(q000);
q000.prevList(null);
qInit.prevList(qInit);
List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
metrics.add(qInit);
metrics.add(q000);
metrics.add(q025);
metrics.add(q050);
metrics.add(q075);
metrics.add(q100);
chunkListMetrics = Collections.unmodifiableList(metrics);
}
注意:
smallSubpagePools 和 tinySubpagePools 用于,按照规格存储 page 拆分后的内存碎片的,也称为"子page"。
PoolChunkList : 用于 Chnuk 使用率分组的集合类。便于统计,和避免不必要的空间检测。如果调用了PooledByteBufAllocator.buffer (1) ,之后结构如图
3、PooledByteBufAllocator.buffer 获取一段连续的内存
@Override
public ByteBuf buffer(int initialCapacity) {
if (directByDefault) {
return directBuffer(initialCapacity);
}
return heapBuffer(initialCapacity);
}
directByDefault 变量就是初始化时候用户传入的参数,用来区分是否创建java.nio.ByteBuffer 对象。这边以 directBuffer() 方法为例
@Override
public ByteBuf directBuffer(int initialCapacity) {
return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
}
DEFAULT_MAX_CAPACITY 是netty ByteBuff 最大可扩容的上界,默认为
Integer.MAX_VALUE。
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
//拿到线程局部缓冲 PoolThreadCache
PoolThreadCache cache = threadCache.get();
//拿到堆外预分配内存
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
这里用到了初始化的时候创建的 PoolThreadLocalCache 对象,如果是第一次调用 threadCache.get(); 那么会初始化 PoolThreadCache ,否则直接拿到第一次创建的对象。
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
//默认 useCacheForAllThreads 为 true
private final boolean useCacheForAllThreads;
@Override
protected synchronized PoolThreadCache initialValue() {
//netty 的 内存分配工具池中的 区域都是从 heapArenas 或 directArenas 中拿到的
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
final Thread current = Thread.currentThread();
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
final PoolThreadCache cache = new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
if (DEFAULT_CACHE_TRIM_INTERVAL_MILLIS > 0) {
final EventExecutor executor = ThreadExecutorMap.currentExecutor();
if (executor != null) {
executor.scheduleAtFixedRate(trimTask, DEFAULT_CACHE_TRIM_INTERVAL_MILLIS,
DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
}
}
return cache;
}
// No caching so just use 0 as sizes.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
其中 leastUsedArena 方法是从directArenas 数组中拿到一个最少被使用过的 directArena。
然后看一下初始化 PoolThreadCache,
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
//开辟内存工具,netty 内存池的实现
this.heapArena = heapArena;
this.directArena = directArena;
if (directArena != null) {
//创建重复利用内存的 缓冲容器,如果 ByteBuf.release() 方法那么会填充此缓冲区
//创建 MemoryRegionCache 数组,数组大小和 PoolArena.tinySubpagePools 数量一致
//tinySubPageHeapCaches[1] =16B tinySubPageHeapCaches[2] = 32B ... 496B
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
//smallSubPageHeapCaches[1] = 512B smallSubPageHeapCaches[2] = 1K ... 4K
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalDirect = log2(directArena.pageSize);
//normalHeapCaches[1] = 8K smallSubPageHeapCaches[2] = 16K smallSubPageHeapCaches[3] = 24K
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);
directArena.numThreadCaches.getAndIncrement();
} else {
// No directArea is configured so just null out all caches
tinySubPageDirectCaches = null;
smallSubPageDirectCaches = null;
normalDirectCaches = null;
numShiftsNormalDirect = -1;
}
if (heapArena != null) {
// Create the caches for the heap allocations
tinySubPageHeapCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageHeapCaches = createSubPageCaches(
smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalHeap = log2(heapArena.pageSize);
normalHeapCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, heapArena);
heapArena.numThreadCaches.getAndIncrement();
} else {
// No heapArea is configured so just null out all caches
tinySubPageHeapCaches = null;
smallSubPageHeapCaches = null;
normalHeapCaches = null;
numShiftsNormalHeap = -1;
}
// Only check if there are caches in use.
if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
|| tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
&& freeSweepAllocationThreshold < 1) {
throw new IllegalArgumentException("freeSweepAllocationThreshold: "
+ freeSweepAllocationThreshold + " (expected: > 0)");
}
}
结构如图:
image.png
PoolThreadLocalCache 构造方法主要是创建了 MemoryRegionCache 他的作用是,重复利用内存的缓冲容器,如果 ByteBuf.release() 方法那么会填充此缓冲区。
private abstract static class MemoryRegionCache<T> {
// tiny N*16B
// small 512B 1K 2K 4K
// normal 8k 16k 32k
// 如果 32k 以上的节点是不进行缓存的
private final int size;
//存放ByteBuf ,Queue 里边的所有的元素大小固定,都等于 size
private final Queue<Entry<T>> queue;
//内存规格 tiny small normal
private final SizeClass sizeClass;
private int allocations;
MemoryRegionCache(int size, SizeClass sizeClass) {
//规格化,查找一个大于size并且是2指数 ,这里配置的已经是规格化了,只是为了保险
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
//创建持有这种规格的缓存队列
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass;
}
static final class Entry<T> {
final Handle<Entry<?>> recyclerHandle;
//分配单位
PoolChunk<T> chunk;
ByteBuffer nioBuffer;
//指向一个连续区域的内存 , 通过 handle 可以定位 PoolChunk 中的一块连续的内存
long handle = -1;
Entry(Handle<Entry<?>> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
void recycle() {
//表示 Entry 不指向任何一块内存
chunk = null;
nioBuffer = null;
handle = -1;
recyclerHandle.recycle(this);
}
}
至此初始化话过程已经完成
网友评论