美文网首页
netty与内存分配(2)-PooledByteBufAlloc

netty与内存分配(2)-PooledByteBufAlloc

作者: 奔跑地蜗牛 | 来源:发表于2019-05-29 19:25 被阅读0次

    简介

    在java已经实现了通过jvm对内存空间的管理后,netty为什么还需要进行内存分配管理?因为jvm管理的内存对象大多数堆内内存,而对于堆外内存,jvm也就是通过保留堆外内存的直接引用对象来进行管理,而对堆外内存并没有直接进行管理,所以为了及时释放堆外内存,避免多次重复通过malloc()系统调用申请内存造成的性能损失,所以需要设置内存池来进行内存块复用,这点其实和连接池作用相似。
    netty的内存管理机制其实主要是借鉴jemalloc的内存管理方案;

    内存管理思路

    内存管理其实和物流管理也是差不了太多的,内存块有大有小,物流商品也有大有小,内存块需要就近取用,避免线程竞争,物流也会为顾客减少取件时间而设置驿站,所以内存管理需要考虑对申请内存和释放内存进行如下考虑:
    申请内存:

    • 对于小型内存块,申请次数较多的,需要避免线程竞争,设置线程私用内存块tcache;
    • 对于tcache内存块如果已经无法满足需求了,则需求建立一个公共区域,允许多个线程共享,则就有了arena;
      而arena内存块也需要对内存块大小进行分类管理分别为small allocation和large allocation:
      • small allocation: 申请次数较多的,而且大小不一,需要划分不同档位,那么面对不同申请需要,避免内存空间浪费;
      • large allocation: 申请次数较少,则尽量一次分配完成,避免重复申请;
    • 对于超大性内存块,则是直接从内存空间分配,分配直接交付使用;

    释放内存:

    • 尽量复用最近使用的内存块,从最低地址分配内存,则长时间未使用的内存将跑到高地址,可以进行清理
    • 将使用时间跨度较大的内存块清理掉

    PooledByteBufAllocator内存分配实现

    属性如下:

        private static final int DEFAULT_NUM_HEAP_ARENA;//heap arena默认数量
        private static final int DEFAULT_NUM_DIRECT_ARENA;//direct arena默认数量
    
        private static final int DEFAULT_PAGE_SIZE;//页默认大小为8192b,8KiB
        //default_max_order用来管理chunk的大小,因为chunk是以平衡二叉树的形式管理所有的page,树的深度决定chunk的大小
        //defualt_max_order默认为11,则chunk的大小约为pageSize*2^11
        private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk
        private static final int DEFAULT_TINY_CACHE_SIZE;//默认tiny caches数量->512
        private static final int DEFAULT_SMALL_CACHE_SIZE;//默认small caches数量->256
        private static final int DEFAULT_NORMAL_CACHE_SIZE;//默认normal caches数量->64
        private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;//默认最大缓存容量
        private static final int DEFAULT_CACHE_TRIM_INTERVAL;//默认缓存移除间隔,默认是8192次分配后,将释放掉不常用内存
        private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;//是否默认为所有线程共享内存块,默认为true
        private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;//默认直接内存分配
    
        private static final int MIN_PAGE_SIZE = 4096;//最小页大小
        private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);//chunk最大不得超过2^30也就是1G
    

    其具体初始化如下:

    static {
            //默认pageSize为8192
            int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
            Throwable pageSizeFallbackCause = null;
            try {//计算defaultPageSize是否大于min_page_size=4096且为2的整数次幂
                validateAndCalculatePageShifts(defaultPageSize);
            } catch (Throwable t) {
                pageSizeFallbackCause = t;
                defaultPageSize = 8192;
            }
            DEFAULT_PAGE_SIZE = defaultPageSize;
    
            //计算chunk大小
            int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
            Throwable maxOrderFallbackCause = null;
            try {
                validateAndCalculateChunkSize(DEFAULT_PAGE_SIZE, defaultMaxOrder);
            } catch (Throwable t) {
                maxOrderFallbackCause = t;
                defaultMaxOrder = 11;
            }
            DEFAULT_MAX_ORDER = defaultMaxOrder;
    
            // Determine reasonable default for nHeapArena and nDirectArena.决定Arena合理的默认值,
           // 需要保证每个arena有三个chunk,则整个arena不能超过最大内存的50%
            // Assuming each arena has 3 chunks, the pool should not consume more than 50% of max memory.
            final Runtime runtime = Runtime.getRuntime();
    
            /*
             * We use 2 * available processors by default to reduce contention as we use 2 * available processors for the
             * number of EventLoops in NIO and EPOLL as well. If we choose a smaller number we will run into hot spots as
             * allocation and de-allocation needs to be synchronized on the PoolArena.
             *
             * See https://github.com/netty/netty/issues/3888.
             */
            final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
            final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
     // 计算PoolAreana的个数 PoolArena默认为:cpu核心线程数与最大堆内存/2/(3*chunkSize)这两个数中的较小者 
    // 这里的除以2是为了确保系统分配的所有PoolArena占用的内存不超过系统可用内存的一半,这里的除以3是为了保证每个PoolArena至少可以由3个PoolChunk组成
     // 用户可以通过io.netty.allocator.numHeapArenas/numDirectArenas来进行修改
            DEFAULT_NUM_HEAP_ARENA = Math.max(0,
                    SystemPropertyUtil.getInt(
                            "io.netty.allocator.numHeapArenas",
                            (int) Math.min(
                                    defaultMinNumArena,
                                    runtime.maxMemory() / defaultChunkSize / 2 / 3)));
            DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
                    SystemPropertyUtil.getInt(
                            "io.netty.allocator.numDirectArenas",
                            (int) Math.min(
                                    defaultMinNumArena,
                                    PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
    
            // cache sizes
            DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
            DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
            DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
    
            // 32 kb is the default maximum capacity of the cached buffer. Similar to what is explained in
            // 'Scalable memory allocation using jemalloc'
            DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt(
                    "io.netty.allocator.maxCachedBufferCapacity", 32 * 1024);
    
            // the number of threshold of allocations when cached entries will be freed up if not frequently used
            DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
                    "io.netty.allocator.cacheTrimInterval", 8192);
    
            DEFAULT_USE_CACHE_FOR_ALL_THREADS = SystemPropertyUtil.getBoolean(
                    "io.netty.allocator.useCacheForAllThreads", true);
    
            DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT = SystemPropertyUtil.getInt(
                    "io.netty.allocator.directMemoryCacheAlignment", 0);
    }
    

    PooledByteBufAllocator构造初始化如下:

       public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
                                      int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                                      boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
            super(preferDirect);
            threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
            this.tinyCacheSize = tinyCacheSize;
            this.smallCacheSize = smallCacheSize;
            this.normalCacheSize = normalCacheSize;
            chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
    ...........................节省空间---------------
     int pageShifts = validateAndCalculatePageShifts(pageSize);
    
            if (nHeapArena > 0) {
                heapArenas = newArenaArray(nHeapArena);
                List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
                for (int i = 0; i < heapArenas.length; i ++) {
                    PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
                            pageSize, maxOrder, pageShifts, chunkSize,
                            directMemoryCacheAlignment);
                    heapArenas[i] = arena;
                    metrics.add(arena);
                }
                heapArenaMetrics = Collections.unmodifiableList(metrics);
            } else {
                heapArenas = null;
                heapArenaMetrics = Collections.emptyList();
            }
           //堆外内存初始化
            if (nDirectArena > 0) {
                directArenas = newArenaArray(nDirectArena);//创建Arena数组
                List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
                for (int i = 0; i < directArenas.length; i ++) {//初始化Arena
                    PoolArena.DirectArena arena = new PoolArena.DirectArena(
                            this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
                    directArenas[i] = arena;
                    metrics.add(arena);
                }
                directArenaMetrics = Collections.unmodifiableList(metrics);
            } else {
                directArenas = null;
                directArenaMetrics = Collections.emptyList();
            }
            metric = new PooledByteBufAllocatorMetric(this);
        }
    

    PoolArena源码解析

    基本域属性,如下图:

    //将Subpage划分为3类
    enum SizeClass {
            Tiny,
            Small,
            Normal
        }
        //设置tiny subpagePools数量
        static final int numTinySubpagePools = 512 >>> 4;
    
        final PooledByteBufAllocator parent;
        //arena管理subpage平衡二叉树深度
        private final int maxOrder;
       //页大小
        final int pageSize;
       //页偏移量
        final int pageShifts;
        //chunk大小
        final int chunkSize;
        final int subpageOverflowMask;
        //smallSubpagePools数量即大小在512到8192区间的内存块,划分为[512,1024],[1024,2048],[2048,4096],[4096,8192]
        final int numSmallSubpagePools;
        final int directMemoryCacheAlignment;
        final int directMemoryCacheAlignmentMask;
        private final PoolSubpage<T>[] tinySubpagePools;
        private final PoolSubpage<T>[] smallSubpagePools;
         
       //存储内存使用率在50%-100%的chunk
        private final PoolChunkList<T> q050;
        //存储内存使用率在25%-75%的chunk
        private final PoolChunkList<T> q025;
        //存储内存使用率在0-50%的chunk
        private final PoolChunkList<T> q000;
        //存储内存使用率在0-25%的chunk
        private final PoolChunkList<T> qInit;
        //存储内存使用率在75-100%左右的chunk
        private final PoolChunkList<T> q075;
        //存储内存使用率为100%左右的chunk
        private final PoolChunkList<T> q100;
    
        private final List<PoolChunkListMetric> chunkListMetrics;
    
        // Metrics for allocations and deallocations
        private long allocationsNormal;
        // We need to use the LongCounter here as this is not guarded via synchronized block.
        private final LongCounter allocationsTiny = PlatformDependent.newLongCounter();
        private final LongCounter allocationsSmall = PlatformDependent.newLongCounter();
        private final LongCounter allocationsHuge = PlatformDependent.newLongCounter();
        private final LongCounter activeBytesHuge = PlatformDependent.newLongCounter();
    
        private long deallocationsTiny;
        private long deallocationsSmall;
        private long deallocationsNormal;
    
        // We need to use the LongCounter here as this is not guarded via synchronized block.
        private final LongCounter deallocationsHuge = PlatformDependent.newLongCounter();
    
        // Number of thread caches backed by this arena.
        final AtomicInteger numThreadCaches = new AtomicInteger();
    

    其构造函数如下:

    protected PoolArena(PooledByteBufAllocator parent, int pageSize,
              int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
            this.parent = parent;
            this.pageSize = pageSize;
            this.maxOrder = maxOrder;
            this.pageShifts = pageShifts;
            this.chunkSize = chunkSize;
            directMemoryCacheAlignment = cacheAlignment;
            directMemoryCacheAlignmentMask = cacheAlignment - 1;
            subpageOverflowMask = ~(pageSize - 1);
            tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
            for (int i = 0; i < tinySubpagePools.length; i ++) {
                tinySubpagePools[i] = newSubpagePoolHead(pageSize);
            }
    
            numSmallSubpagePools = pageShifts - 9;
            smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
            for (int i = 0; i < smallSubpagePools.length; i ++) {
                smallSubpagePools[i] = newSubpagePoolHead(pageSize);
            }
    
            q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
            q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
            q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
            q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
            q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
            qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
    
            q100.prevList(q075);
            q075.prevList(q050);
            q050.prevList(q025);
            q025.prevList(q000);
            q000.prevList(null);
            qInit.prevList(qInit);
    
            List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
            metrics.add(qInit);
            metrics.add(q000);
            metrics.add(q025);
            metrics.add(q050);
            metrics.add(q075);
            metrics.add(q100);
            chunkListMetrics = Collections.unmodifiableList(metrics);
        }
    

    从上可以看出PoolChunkList它们之间的关系如下图:


    poolchunklist.png

    内存分配过程如下:

    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
            final int normCapacity = normalizeCapacity(reqCapacity);//将请求的内存大小规格化
            if (isTinyOrSmall(normCapacity)) { // capacity < pageSize请求内存在PageSize以下
                int tableIdx;
                PoolSubpage<T>[] table;
                boolean tiny = isTiny(normCapacity);//如果是申请规格是tiny,小于512
                if (tiny) { // < 512
                    if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {//线程私有内存块是否能满足需求
                        // was able to allocate out of the cache so move on
                        return;
                    }
                    tableIdx = tinyIdx(normCapacity);//获取可分配内存索引
                    table = tinySubpagePools;
                } else {
                    if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {//如果是small,判断线程私有内存块是否满足
                        // was able to allocate out of the cache so move on
                        return;
                    }
                    tableIdx = smallIdx(normCapacity);
                    table = smallSubpagePools;
                }
    
                final PoolSubpage<T> head = table[tableIdx];
    
                /**
                 * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
                 * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
                 */
                synchronized (head) {//同步,避免内存分配冲突
                    final PoolSubpage<T> s = head.next;
                    if (s != head) {
                        assert s.doNotDestroy && s.elemSize == normCapacity;
                        long handle = s.allocate();
                        assert handle >= 0;
                        s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                        incTinySmallAllocation(tiny);
                        return;
                    }
                }
                synchronized (this) {
                    allocateNormal(buf, reqCapacity, normCapacity);
                }
    
                incTinySmallAllocation(tiny);
                return;
            }
            if (normCapacity <= chunkSize) {//针对large 内存块申请
                if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {//查询线程私有内否满足
                    // was able to allocate out of the cache so move on
                    return;
                }
                synchronized (this) {//同步
                    allocateNormal(buf, reqCapacity, normCapacity);
                    ++allocationsNormal;
                }
            } else {//巨大内存块申请,jvm内存池不会缓存该内存块
                // Huge allocations are never served via the cache so just call allocateHuge
                allocateHuge(buf, reqCapacity);
            }
        }
    
    

    内存规格化源码分析:

     int normalizeCapacity(int reqCapacity) {
            if (reqCapacity < 0) {
                throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
            }
    
            if (reqCapacity >= chunkSize) {
                return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
            }
    
            if (!isTiny(reqCapacity)) { // >= 512
                // Doubled
                //这里通过右移确定其最高档位的大小,如果一个数位数小于16大于8位,如下
                //1xxxxxxxxx右移一位进行或运算则为11xxxxxxxx;
                //11xxxxxxxx右移两位再或运算则为1111xxxxxx;
                //1111xxxxxx右移4位再或运算则为11111111xxxx;
                //11111111xxxx右移8位再或运算则为11111111111;
               //11111111111在右移16为再或运算还是11111111111;
              //再进行+1操作则为100000000000
                int normalizedCapacity = reqCapacity;
                normalizedCapacity --;
                normalizedCapacity |= normalizedCapacity >>>  1;
                normalizedCapacity |= normalizedCapacity >>>  2;
                normalizedCapacity |= normalizedCapacity >>>  4;
                normalizedCapacity |= normalizedCapacity >>>  8;
                normalizedCapacity |= normalizedCapacity >>> 16;
                normalizedCapacity ++;
    
                if (normalizedCapacity < 0) {
                    normalizedCapacity >>>= 1;
                }
                assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;
    
                return normalizedCapacity;
            }
    
            if (directMemoryCacheAlignment > 0) {
                return alignCapacity(reqCapacity);
            }
    
            // Quantum-spaced
            if ((reqCapacity & 15) == 0) {
                return reqCapacity;
            }
    
            return (reqCapacity & ~15) + 16;
        }
    

    相关文章

      网友评论

          本文标题:netty与内存分配(2)-PooledByteBufAlloc

          本文链接:https://www.haomeiwen.com/subject/bmqggqtx.html