美文网首页
PoolThreadCache

PoolThreadCache

作者: Pillar_Zhong | 来源:发表于2019-08-01 17:51 被阅读0次

    Netty-PoolThreadCache

    概要

    PoolThreadCache顾名思义,就是跟线程绑定的cache。

    PoolThreadCahche是Netty内存管理中能够实现高效内存申请和释放的一个重要原因,Netty会为每一个线程都维护一个PoolThreadCache对象,当进行内存申请时,首先会尝试从PoolThreadCache中申请,如果无法从中申请到,则会尝试从Netty的公共内存池中申请.

    image

    关键属性

    // 从Alloctor选中的heaparena
    final PoolArena<byte[]> heapArena;
    // 从Alloctor选中的directarena
    final PoolArena<ByteBuffer> directArena;
    
    // tiny/small/normal的caches
    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
    private final MemoryRegionCache<byte[]>[] normalHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
    
    private final int numShiftsNormalDirect;
    private final int numShiftsNormalHeap;
    private final int freeSweepAllocationThreshold;
    
    private int allocations;
    
    private final Thread thread = Thread.currentThread();
    private final Runnable freeTask = new Runnable() {
        @Override
        public void run() {
            free0();
        }
    };
    

    初始化

    PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
                    int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                    int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
       
        // TODO
        this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
        this.heapArena = heapArena;
        this.directArena = directArena;
        // directArena处理
        if (directArena != null) {
            // tinySubPageHeapCaches数组缓存的是[16B, 32B, … , 496B]大小的内存块, 
            // 每个元素对应的缓存queue个数不能超过512个
            tinySubPageDirectCaches = createSubPageCaches(
                    tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
            // smallSubPageHeapCaches数组长度为4(如上图所示), 
            // 依次缓存[512K, 1024k, 2048k, 4096k]大小的缓存, 
            // 每个的元素对应的缓存queue个数不能超过256个                
            smallSubPageDirectCaches = createSubPageCaches(
                    smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
            
            numShiftsNormalDirect = log2(directArena.pageSize);
            // normalDirectCaches依次缓存[8k, 16k, 32k]大小的缓存
            // 每个的元素对应的缓存queue个数不能超过64个
            normalDirectCaches = createNormalCaches(
                    normalCacheSize, maxCachedBufferCapacity, directArena);
            
            // 如果上面初始化完成,那么numThreadCaches加一,代表directArena有新的线程来关联
            directArena.numThreadCaches.getAndIncrement();
        } else {
            // No directArea is configured so just null out all caches
            tinySubPageDirectCaches = null;
            smallSubPageDirectCaches = null;
            normalDirectCaches = null;
            numShiftsNormalDirect = -1;
        }
        // 同direct
        if (heapArena != null) {
            // Create the caches for the heap allocations
            tinySubPageHeapCaches = createSubPageCaches(
                    tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
            smallSubPageHeapCaches = createSubPageCaches(
                    smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
    
            numShiftsNormalHeap = log2(heapArena.pageSize);
            normalHeapCaches = createNormalCaches(
                    normalCacheSize, maxCachedBufferCapacity, heapArena);
    
            heapArena.numThreadCaches.getAndIncrement();
        } else {
            // No heapArea is configured so just null out all caches
            tinySubPageHeapCaches = null;
            smallSubPageHeapCaches = null;
            normalHeapCaches = null;
            numShiftsNormalHeap = -1;
        }
    
        
        ThreadDeathWatcher.watch(thread, freeTask);
    }
    

    createNormalCaches

    private static <T> MemoryRegionCache<T>[] createNormalCaches(
            int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
        if (cacheSize > 0) {
            // 一个chunk按道理最大能撑到16M,但是如果只是作为线程的本地缓存,16M*64显然不必要,
            //也极大浪费,而这里normal限定为8k,16k,32k就可以了,不至于浪费
            int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
            // 当然了需要3个位置来放置这些长度
            int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
    
            @SuppressWarnings("unchecked")
            MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
            for (int i = 0; i < cache.length; i++) {
                // NormalMemoryRegionCache代表两个意思,每个不同位置代表不同的normalsize
                // 到时候申请空间,代表是去chunk申请,而不是subpage
                cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
            }
            return cache;
        } else {
            return null;
        }
    }
    

    createSubPageCaches

    // tiny 32*512 0-512
    // small 4*256 512-8192
    // 逻辑同normal
    private static <T> MemoryRegionCache<T>[] createSubPageCaches(
            int cacheSize, int numCaches, SizeClass sizeClass) {
        if (cacheSize > 0) {
            @SuppressWarnings("unchecked")
            MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
            for (int i = 0; i < cache.length; i++) {
                // TODO: maybe use cacheSize / cache.length
                cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
            }
            return cache;
        } else {
            return null;
        }
    }
    

    MemoryRegionCache

    概述

    1564047989450.png 1564048025752.png

    从跟踪queue的add来看,可以得到一个结论。PoolThreadCache并不是自己闷着头将不同规格大小的cache初始化完,对外提供内存。而是去收集上层中不用的内存空间,保存在这里,以便下次复用。如果你不这么做,那么这些内存会被回收掉,也是一种浪费。在高并发的场景,内存的开销会很大。能复用一点再好不过了。

    关键属性

    // 队列能保存的最大元素个数
    private final int size;
    // 队列保存了可复用的内存空间
    private final Queue<Entry<T>> queue;
    // tiny or small or normal
    private final SizeClass sizeClass;
    // 堆外分配过多少次了
    private int allocations;
    

    分配

    public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
        // 如果当前队列没有可用的Entry,那么直接返回false
        Entry<T> entry = queue.poll();
        if (entry == null) {
            return false;
        }
        // 如果有的话,直接将buf与该entry进行绑定,这样也达到内存复用的目的
        initBuf(entry.chunk, entry.handle, buf, reqCapacity);
        entry.recycle();
        
        // 如果成功复用,那么分配次数加一
        ++ allocations;
        return true;
    }
    

    清理

    public final void trim() {
        int free = size - allocations;
        allocations = 0;
    
        // We not even allocated all the number that are
        if (free > 0) {
            free(free);
        }
    }
    

    定位

    这里的目的是根据arena和请求的空间大小来决定我要去拿到哪种规格, 多大的内存空间。

    而这些在PoolThreadCache初始化的时候就已经规划好,下面就是具体定位的逻辑。

    定位成功的话,拿到对应的MemoryRegionCache,而它里面queue队列中就是你需要的内存。假如可以复用的话。

    private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
        // 拿到normCapacity对应的idex
        int idx = PoolArena.tinyIdx(normCapacity);
        // 去拿到对应的tiny的MemoryRegionCache
        if (area.isDirect()) {
            return cache(tinySubPageDirectCaches, idx);
        }
        return cache(tinySubPageHeapCaches, idx);
    }
    
    private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
        int idx = PoolArena.smallIdx(normCapacity);
        if (area.isDirect()) {
            return cache(smallSubPageDirectCaches, idx);
        }
        // 去拿到对应的small的MemoryRegionCache
        return cache(smallSubPageHeapCaches, idx);
    }
    
    private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
        if (area.isDirect()) {
            int idx = log2(normCapacity >> numShiftsNormalDirect);
            return cache(normalDirectCaches, idx);
        }
        int idx = log2(normCapacity >> numShiftsNormalHeap);
        // 去拿到对应的normal的MemoryRegionCache
        return cache(normalHeapCaches, idx);
    }
    
    private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
        // 可以看到,假如你传入的idx不合适的话,说明你申请的空间大小不符合cache的条件
        // 那么返回null,代表cache中没有你需要的空间
        if (cache == null || idx > cache.length - 1) {
            return null;
        }
        return cache[idx];
    }
    

    分配

    boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
        return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
    }
    
    boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
        return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
    }
    
    boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
        return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
    }
    
    private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
        if (cache == null) {
            // no cache found so just return false here
            return false;
        }
        // 去cache里面看有没有可重用的空间
        boolean allocated = cache.allocate(buf, reqCapacity);
        // 每对外分配了freeSweepAllocationThreshold次数后,做trim
        if (++ allocations >= freeSweepAllocationThreshold) {
            allocations = 0;
            // 清理空间
            trim();
        }
        return allocated;
    }
    
    

    清理

    从上面分配可知,当cache复用了一定次数的后,需要对cache里面的空间做一次清理。Netty认为,隔了这么久的时间里面,如果有Entry还没有人认领,那么这些空间可能使用率会指数级递减。与其放着占着内存,不如全部释放掉。

    void trim() {
        // 每种规格的cache都需要清理
        trim(tinySubPageDirectCaches);
        trim(smallSubPageDirectCaches);
        trim(normalDirectCaches);
        trim(tinySubPageHeapCaches);
        trim(smallSubPageHeapCaches);
        trim(normalHeapCaches);
    }
    
    private static void trim(MemoryRegionCache<?>[] caches) {
        if (caches == null) {
            return;
        }
        for (MemoryRegionCache<?> c: caches) {
            trim(c);
        }
    }
    
    private static void trim(MemoryRegionCache<?> cache) {
        if (cache == null) {
            return;
        }
        // 调用MemoryRegionCache的清理
        cache.trim();
    }
    
    // MemoryRegionCache
    public final void trim() {
        // 计算还能存多少个Entry,也就是free
        int free = size - allocations;
        allocations = 0;
    
        // We not even allocated all the number that are
        if (free > 0) {
            free(free);
        }
    }
    
    private int free(int max) {
        int numFreed = 0;
        // 极端情况下,上一步可以得到free个内存空间等待释放,也就是queue全满的情况
        for (; numFreed < max; numFreed++) {
            // 从队列中取出一个Entry
            Entry<T> entry = queue.poll();
            if (entry != null) {
                freeEntry(entry);
            } else {
                // all cleared
                return numFreed;
            }
        }
        return numFreed;
    }
    
    private  void freeEntry(Entry entry) {
        PoolChunk chunk = entry.chunk;
        long handle = entry.handle;
    
        // entry回收
        entry.recycle();
        
        // 底层chunk去释放
        chunk.arena.freeChunk(chunk, handle, sizeClass);
    }
    
    

    相关文章

      网友评论

          本文标题:PoolThreadCache

          本文链接:https://www.haomeiwen.com/subject/xumcdctx.html