美文网首页
ByteBufAllocator

ByteBufAllocator

作者: Pillar_Zhong | 来源:发表于2019-08-01 19:04 被阅读0次

概要

1564037439099.png image

初始化

static {
    // 默认的pagesize 8k
    int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);

    // 默认poolsubpage二叉树的深度11
    int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);

    final Runtime runtime = Runtime.getRuntime();

    // 每个ByteBufAllocator所能拥有的最小arena个数是cpu核心的2倍,跟NioEventLoop一样
    // 换句话说,每个EventLoop都有自己专属的Arena,节省了同步的开销
    final int defaultMinNumArena = runtime.availableProcessors() * 2;
    final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
    
    // runtime.maxMemory() / defaultChunkSize,当然是当前系统内存总共能切分多少个chunk
    // 除以2,相当于不能消耗超过系统内存的一半
    // 除以3,相当于3*arena=chunk,也就是每个arena至少需要3个chunk
    DEFAULT_NUM_HEAP_ARENA = Math.max(0,
            SystemPropertyUtil.getInt(
                    "io.netty.allocator.numHeapArenas",
                    (int) Math.min(
                            defaultMinNumArena,
                            runtime.maxMemory() / defaultChunkSize / 2 / 3)));
    DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
            SystemPropertyUtil.getInt(
                    "io.netty.allocator.numDirectArenas",
                    (int) Math.min(
                            defaultMinNumArena,
                            PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));

    // 设置cachesize,tiny有512,small有256,normal有64
    DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
    DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
    DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);

    DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
            "io.netty.allocator.cacheTrimInterval", 8192);

}

构造方法

public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
                              int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
    super(preferDirect);
    // 可以看成是一个ThreadLocal变量,跟当前线程绑定
    threadCache = new PoolThreadLocalCache();
    this.tinyCacheSize = tinyCacheSize;
    this.smallCacheSize = smallCacheSize;
    this.normalCacheSize = normalCacheSize;
    // chunksize = pagesize*2^maxOrder = 16M 默认
    final int chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
    
    // Integer.SIZE - 1 - Integer.numberOfLeadingZeros(pageSize)
    // 也就是计算pagesize从尾部开始一共有多少位连续的0
    // 比如8k=10000000000000,那么pageShifts=13
    int pageShifts = validateAndCalculatePageShifts(pageSize);

    // 上面初始化的时候,nHeapArena已经知道了,最少是CPUCORE*2
    if (nHeapArena > 0) {
        // 这里先创建一个CPUCORE*2长度的PoolArena数组
        heapArenas = newArenaArray(nHeapArena);
        List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
        for (int i = 0; i < heapArenas.length; i ++) {
            // 正式开始初始化PoolArena.HeapArena
            PoolArena.HeapArena arena = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize);
            // 加到heapArenas里面
            heapArenas[i] = arena;
            metrics.add(arena);
        }
        heapArenaMetrics = Collections.unmodifiableList(metrics);
    } else {
        heapArenas = null;
        heapArenaMetrics = Collections.emptyList();
    }

    // 堆外的跟上面类似
    if (nDirectArena > 0) {
        directArenas = newArenaArray(nDirectArena);
        List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
        for (int i = 0; i < directArenas.length; i ++) {
            PoolArena.DirectArena arena = new PoolArena.DirectArena(
                    this, pageSize, maxOrder, pageShifts, chunkSize);
            directArenas[i] = arena;
            metrics.add(arena);
        }
        directArenaMetrics = Collections.unmodifiableList(metrics);
    } else {
        directArenas = null;
        directArenaMetrics = Collections.emptyList();
    }
}

申请

newHeapBuffer

protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    // 拿到线程绑定的PoolThreadCache的heapArena
    // 也就是先去线程本地缓存里找是否有所属的arena
    PoolThreadCache cache = threadCache.get();
    PoolArena<byte[]> heapArena = cache.heapArena;

    ByteBuf buf;
    // 如果有的话,那么直接去找arena申请内存
    if (heapArena != null) {
        buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        // 如果没有的话,那么去申请非池化的内存
        buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
    }    
    return toLeakAwareBuffer(buf);
}
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;

    ByteBuf buf;
    if (directArena != null) {
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        // 比heap多了一次优化,使用unsafe的方式直接去申请堆外内存
        if (PlatformDependent.hasUnsafe()) {
            buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
        } else {
            buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }
    }

    return toLeakAwareBuffer(buf);
}

PoolThreadLocalCache

这里有个问题是,既然Allocator初始化的时候新建了这么多的arena,那么它是怎样跟线程绑定的呢?而且线程是怎样决定绑定哪一个arena的呢?我们继续。

final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {

    // 前面Allocator初始化的时候已经新建了一个PoolThreadLocalCache
    // 同时,它内部ThreadLocalMap初始化的时候会触发到这里
    // 总结下,这个Allocator在该线程绑定一个threadCache的本地变量
    // 而这个threadCache对应的value是initialValue的返回值
    @Override
    protected synchronized PoolThreadCache initialValue() {
        // 从heapArenas和directArenas分别选中一个,包装成PoolThreadCache返回
        // 最少被使用
        final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
        final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

        return new PoolThreadCache(
                heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
                DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
    }

    @Override
    protected void onRemoval(PoolThreadCache threadCache) {
        threadCache.free();
    }

    private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
        if (arenas == null || arenas.length == 0) {
            return null;
        }
            
        PoolArena<T> minArena = arenas[0];
        // 很简单,遍历整个Allocator所拥有的所有arena,看谁绑定的线程数最少就用谁
        for (int i = 1; i < arenas.length; i++) {
            PoolArena<T> arena = arenas[i];
            if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
                minArena = arena;
            }
        }

        return minArena;
    }
}

相关文章

网友评论

      本文标题:ByteBufAllocator

      本文链接:https://www.haomeiwen.com/subject/blncdctx.html