概要
1564037439099.png image初始化
static {
// 默认的pagesize 8k
int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
// 默认poolsubpage二叉树的深度11
int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
final Runtime runtime = Runtime.getRuntime();
// 每个ByteBufAllocator所能拥有的最小arena个数是cpu核心的2倍,跟NioEventLoop一样
// 换句话说,每个EventLoop都有自己专属的Arena,节省了同步的开销
final int defaultMinNumArena = runtime.availableProcessors() * 2;
final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
// runtime.maxMemory() / defaultChunkSize,当然是当前系统内存总共能切分多少个chunk
// 除以2,相当于不能消耗超过系统内存的一半
// 除以3,相当于3*arena=chunk,也就是每个arena至少需要3个chunk
DEFAULT_NUM_HEAP_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numHeapArenas",
(int) Math.min(
defaultMinNumArena,
runtime.maxMemory() / defaultChunkSize / 2 / 3)));
DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numDirectArenas",
(int) Math.min(
defaultMinNumArena,
PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
// 设置cachesize,tiny有512,small有256,normal有64
DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
"io.netty.allocator.cacheTrimInterval", 8192);
}
构造方法
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
super(preferDirect);
// 可以看成是一个ThreadLocal变量,跟当前线程绑定
threadCache = new PoolThreadLocalCache();
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
// chunksize = pagesize*2^maxOrder = 16M 默认
final int chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
// Integer.SIZE - 1 - Integer.numberOfLeadingZeros(pageSize)
// 也就是计算pagesize从尾部开始一共有多少位连续的0
// 比如8k=10000000000000,那么pageShifts=13
int pageShifts = validateAndCalculatePageShifts(pageSize);
// 上面初始化的时候,nHeapArena已经知道了,最少是CPUCORE*2
if (nHeapArena > 0) {
// 这里先创建一个CPUCORE*2长度的PoolArena数组
heapArenas = newArenaArray(nHeapArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
for (int i = 0; i < heapArenas.length; i ++) {
// 正式开始初始化PoolArena.HeapArena
PoolArena.HeapArena arena = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize);
// 加到heapArenas里面
heapArenas[i] = arena;
metrics.add(arena);
}
heapArenaMetrics = Collections.unmodifiableList(metrics);
} else {
heapArenas = null;
heapArenaMetrics = Collections.emptyList();
}
// 堆外的跟上面类似
if (nDirectArena > 0) {
directArenas = newArenaArray(nDirectArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
for (int i = 0; i < directArenas.length; i ++) {
PoolArena.DirectArena arena = new PoolArena.DirectArena(
this, pageSize, maxOrder, pageShifts, chunkSize);
directArenas[i] = arena;
metrics.add(arena);
}
directArenaMetrics = Collections.unmodifiableList(metrics);
} else {
directArenas = null;
directArenaMetrics = Collections.emptyList();
}
}
申请
newHeapBuffer
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
// 拿到线程绑定的PoolThreadCache的heapArena
// 也就是先去线程本地缓存里找是否有所属的arena
PoolThreadCache cache = threadCache.get();
PoolArena<byte[]> heapArena = cache.heapArena;
ByteBuf buf;
// 如果有的话,那么直接去找arena申请内存
if (heapArena != null) {
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
} else {
// 如果没有的话,那么去申请非池化的内存
buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
// 比heap多了一次优化,使用unsafe的方式直接去申请堆外内存
if (PlatformDependent.hasUnsafe()) {
buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
}
return toLeakAwareBuffer(buf);
}
PoolThreadLocalCache
这里有个问题是,既然Allocator初始化的时候新建了这么多的arena,那么它是怎样跟线程绑定的呢?而且线程是怎样决定绑定哪一个arena的呢?我们继续。
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
// 前面Allocator初始化的时候已经新建了一个PoolThreadLocalCache
// 同时,它内部ThreadLocalMap初始化的时候会触发到这里
// 总结下,这个Allocator在该线程绑定一个threadCache的本地变量
// 而这个threadCache对应的value是initialValue的返回值
@Override
protected synchronized PoolThreadCache initialValue() {
// 从heapArenas和directArenas分别选中一个,包装成PoolThreadCache返回
// 最少被使用
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
@Override
protected void onRemoval(PoolThreadCache threadCache) {
threadCache.free();
}
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
if (arenas == null || arenas.length == 0) {
return null;
}
PoolArena<T> minArena = arenas[0];
// 很简单,遍历整个Allocator所拥有的所有arena,看谁绑定的线程数最少就用谁
for (int i = 1; i < arenas.length; i++) {
PoolArena<T> arena = arenas[i];
if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
minArena = arena;
}
}
return minArena;
}
}
网友评论