美文网首页NettyJava学习笔记程序员
自顶向下深入分析Netty(十)--PoolThreadCach

自顶向下深入分析Netty(十)--PoolThreadCach

作者: Hypercube | 来源:发表于2017-08-03 20:25 被阅读807次

    1.PoolThreadCache

    JEMalloc分配算法文中,将PoolThreadCache类比为同城仓库,可以就近提取中小型货物。本文将详细介绍PoolThreadCache的细节和实现,在Netty中,其内部结构可见下图:

    PoolThreadCache

    这里,新引入一个数据类型MemoryRegionCache,其内部是一个ByteBuf队列。每个节点是一个ByteBuf的说法并不准确,切确的说,是不再使用的ByteBuf待释放的内存空间,可以再次使用这部分空间构建ByteBuf对象。根据分配请求大小的不同,MemoryRegionCache可以分为Tiny,Small,Normal三种。为了更方便的根据请求分配时的大小找到满足需求的缓存空间,每一种MemoryRegionCache又根据规范化后的大小依次组成数组,Tiny、Small、Normal的数组大小依次为32、4、12。

    分配请求分类
    其中ByteBuf队列的长度是有限制的,Tiny、Small、Normal依次为512、256、64。为了更好的理解,举例子如下:
    16B  -- TinyCache[1]  -- (Buf512-...-Buf3-Buf2-Buf1)
    32B  -- TinyCache[2]  -- ()
    496B -- TinyCache[31] -- (Buf2-Buf1)
    512B -- SmallCache[0] -- (Buf256-...-Buf3-Buf2-Buf1)
    8KB  -- NormalCache[0] - (Buf64 -...-Buf3-Buf2-Buf1)
    

    在线程缓存中,待回收的空间根据大小排列,比如,最大空间为16B的ByteBuf被缓存时,将被置于数组索引为1的MemoryRegionCache中,其中又由其中的队列存放该ByteBuf的空间信息,队列的最大长度为512。也就是说,16B的ByteBuf空间可以缓存512个,512B可以缓存256个,8KB可以缓存64个。
    明白了这些,开始分析PoolThreadCache的细节实现,首先看成员变量:

        // 各类型的Cache数组
        private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
        private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
        private final MemoryRegionCache<byte[]>[] normalHeapCaches;
        private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
        private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
        private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
        
        // 用于计算normal请求的数组索引 = log2(pageSize)
        private final int numShiftsNormalDirect;
        private final int numShiftsNormalHeap;
        
        private int allocations;    // 分配次数
        private final int freeSweepAllocationThreshold; // 分配次数到达该阈值则检测释放
    
        private final Thread deathWatchThread; // 线程结束观察者
        private final Runnable freeTask;
    

    在构造方法中省略同类型Cache数组的构造,代码如下:

        PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
                        int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                        int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
            this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
            this.heapArena = heapArena;
            this.directArena = directArena;
            if (directArena != null) {
                numShiftsNormalDirect = log2(directArena.pageSize);
                normalDirectCaches = createNormalCaches(
                        normalCacheSize, maxCachedBufferCapacity, directArena);
    
                directArena.numThreadCaches.getAndIncrement();
            } else {
                normalDirectCaches = null;
                numShiftsNormalDirect = -1;
            }
            
            if ( normalDirectCaches != null) {
                freeTask = (Runnable) () -> {free0();};
                deathWatchThread = Thread.currentThread();
                // 观察线程每隔一个周期检测当前线程是否存活
                ThreadDeathWatcher.watch(deathWatchThread, freeTask);
            } else {
                freeTask = null;
                deathWatchThread = null;
            }
        }
    

    缓存数组的构建方法如下:

        private static <T> MemoryRegionCache<T>[] createNormalCaches(
                int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
            if (cacheSize > 0) {
                int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
                int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
    
                MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
                for (int i = 0; i < cache.length; i++) {
                    cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
                }
                return cache;
            } else {
                return null;
            }
        }
    

    其中的参数maxCachedBufferCapacity为缓存Buf的最大容量,因为Normal的ByteBuf最大容量为16MB,且默认缓存64个,这是巨大的内存开销,所以设置该参数调节缓存Buf的最大容量。比如设置为16KB,那么只有16KB和8KB的ByteBuf缓存,其他容量的Normal请求就不缓存,这样大大减小了内存占用。在Netty中,该参数的默认值为32KB。
    为了更好的理解回收内存进而再分配的过程,先介绍PoolThreadCache中的数据结构,首先看MemoryRegionCache,它的成员变量如下:

        private final int size; // 队列长度
        private final Queue<Entry<T>> queue; // 队列
        private final SizeClass sizeClass; // Tiny/Small/Normal
        private int allocations; // 分配次数
    

    其中Entry的成员变量如下:

        final Handle recyclerHandle; // 回收该对象
        PoolChunk<T> chunk; // ByteBuf之前分配所属的Chunk
        long handle = -1; // ByteBuf在Chunk中的分配信息
    

    此处重点分析MemoryRegionCache的构造方法:

        MemoryRegionCache(int size, SizeClass sizeClass) {
            this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
            queue = PlatformDependent.newFixedMpscQueue(this.size);
            this.sizeClass = sizeClass;
        }
    

    这里使用了一个MPSC(Multiple Producer Single Consumer)队列即多个生产者单一消费者队列,之所以使用这种类型的队列是因为:ByteBuf的分配和释放可能在不同的线程中,这里的多生产者即多个不同的释放线程,这样才能保证多个释放线程同时释放ByteBuf时所占空间正确添加到队列中。

    明白了这些,开始分析ByteBuf的回收过程,当一个ByteBuf不再使用,arena首先调用如下方法尝试缓存:

        boolean add(PoolArena<?> area, PoolChunk chunk, long handle, 
                        int normCapacity, SizeClass sizeClass) {
            // 在缓存数组中找到符合的元素
            MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
            if (cache == null) {
                return false;
            }
            return cache.add(chunk, handle);
        }
        
        private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
            switch (sizeClass) {
            case Normal:
                return cacheForNormal(area, normCapacity);
            case Small:
                return cacheForSmall(area, normCapacity);
            case Tiny:
                return cacheForTiny(area, normCapacity);
            default:
                throw new Error();
            }
        }
        
        private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
            // normCapacity >>> 4, 即16B的索引为1
            int idx = PoolArena.tinyIdx(normCapacity);  
            if (area.isDirect()) {
                return cache(tinySubPageDirectCaches, idx);
            }
            return cache(tinySubPageHeapCaches, idx);
        }
    

    实质的缓存操作为MemoryRegionCacheadd()方法,代码如下:

        public final boolean add(PoolChunk<T> chunk, long handle) {
            Entry<T> entry = newEntry(chunk, handle);
            boolean queued = queue.offer(entry);
            if (!queued) {
                // 队列已满,不缓存,立即回收entry对象进行下一次分配
                entry.recycle();
            }
    
            return queued;
        }
        
        private static Entry newEntry(PoolChunk<?> chunk, long handle) {
            Entry entry = RECYCLER.get(); // 从池中取出entry对象
            entry.chunk = chunk;
            entry.handle = handle;
            return entry;
        }
    

    以上便是PoolThreadCache的回收过程,接下来分析分配过程,以Tiny请求分配为例:

        boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, 
                                int reqCapacity, int normCapacity) {
            return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
        }
        
        private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
            if (cache == null) {
                return false;   // 缓存数组中没有
            }
            boolean allocated = cache.allocate(buf, reqCapacity); // 实际的分配
            // 分配次数达到整理阈值
            if (++ allocations >= freeSweepAllocationThreshold) {
                allocations = 0;
                trim(); // 整理
            }
            return allocated;
        }
    

    实质的分配依然在MemoryRegionCache中,代码如下:

        public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
            Entry<T> entry = queue.poll();  // 从队列头部取出
            if (entry == null) {
                return false;
            }
            // 在之前ByteBuf同样的内存位置分配一个新的`ByteBuf`对象
            initBuf(entry.chunk, entry.handle, buf, reqCapacity);
            entry.recycle(); // entry对象回收利用
    
            ++ allocations; // 该值是内部量,和上一方法不同
            return true;
        }
        
        protected void initBuf(PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, 
                    int reqCapacity) {
            chunk.initBufWithSubpage(buf, handle, reqCapacity);
        }
    

    分配正好是回收的逆过程,接下来再分析释放过程,代码如下:

        void free() {
            if (freeTask != null) {
                assert deathWatchThread != null;
                ThreadDeathWatcher.unwatch(deathWatchThread, freeTask);
            }
            free0();
        }
        
        private void free0() {
            int numFreed = free(tinySubPageDirectCaches) + ... +
                    free(normalHeapCaches);
    
            if (directArena != null) {
                directArena.numThreadCaches.getAndDecrement();
            }
    
            if (heapArena != null) {
                heapArena.numThreadCaches.getAndDecrement();
            }
        }
    

    与分配和回收类似,稍有不同的是:释放需要遍历Cache数组,对每一个MemoryRegionCache执行free(),代码如下:

        public final int free() {
            return free(Integer.MAX_VALUE);
        }
    
        private int free(int max) {
            int numFreed = 0;
            for (; numFreed < max; numFreed++) {
                Entry<T> entry = queue.poll();
                if (entry != null) {
                    freeEntry(entry);
                } else {
                    return numFreed; // 队列中所有节点都被释放
                }
            }
            return numFreed;
        }
        
        private  void freeEntry(Entry entry) {
            PoolChunk chunk = entry.chunk;
            long handle = entry.handle;
    
            entry.recycle(); // 回收entry对象
            chunk.arena.freeChunk(chunk, handle, sizeClass); // 释放实际的内存空间
        }
    

    在分配过程还有一个trim()方法,当分配操作达到一定阈值(Netty默认8192)时,没有被分配出去的缓存空间都要被释放,以防止内存泄漏,核心代码如下:

        // 内部类MemoryRegionCache
        public final void trim() {
            // allocations 表示已经重新分配出去的ByteBuf个数
            int free = size - allocations;  
            allocations = 0;
    
            // 在一定阈值内还没被分配出去的空间将被释放
            if (free > 0) {
                free(free); // 释放队列中的节点
            }
        }
    

    也就是说,期望一个MemoryRegionCache频繁进行回收-分配,这样allocations > size,将不会释放队列中的任何一个节点表示的内存空间;但如果长时间没有分配,则应该释放这一部分空间,防止内存占据过多。Tiny请求缓存512个节点,由此可知当使用率超过 512 / 8192 = 6.25% 时就不会释放节点。
    虽然PoolThreadCache名称中含有Thread,但到目前为止的分析,与线程没有一点关系。的确如此,使他们产生联系的正是下一节的主题:PoolThreadLocalCache

    2.PoolThreadLocalCache

    线程缓存,直接想到的应该是ThreadLocal,Netty更进一步,使用改进的FastThreadLocal。首先看类签名:

        final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache>
    

    这里不介绍FastThreadLocal的原理,可简单认为是JDK原生的ThreadLocal的改进版,效率更高。当请求得到线程本地变量时,会调用initialValue()方法,代码如下:

        protected synchronized PoolThreadCache initialValue() {
            final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
            final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
    
            // useCacheForAllThreads是全局变量,是否全部线程使用缓存
            // 或者线程是FastThreadLocalThread即Netty中的IO线程
            if (useCacheForAllThreads || Thread.currentThread() instanceof FastThreadLocalThread) {
                return new PoolThreadCache(
                        heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
                        DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
            }
            // 其他情况下不使用缓存
            return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
        }
    

    当设置全局变量useCacheForAllThreads为true(默认情况true)时或者线程是FastThreadLocalThread的子类(IO线程)时,才会启用缓存。
    其中的leastUsedArena()方法,使得线程均等使用Arena,代码如下:

        private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
            if (arenas == null || arenas.length == 0) {
                return null;
            }
            
            // 寻找使用缓存最少的arena
            PoolArena<T> minArena = arenas[0];
            for (int i = 1; i < arenas.length; i++) {
                PoolArena<T> arena = arenas[i];
                if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
                    minArena = arena;
                }
            }
    
            return minArena;
        }
    

    每次都寻找被最少线程使用的Arena分配新的线程,这样每个Arena都能均等分到线程数,从而平均Arena的负载。
    当线程生命周期结束时,调用onRemoval()方法进行一些清理操作,代码如下:

        protected void onRemoval(PoolThreadCache threadCache) {
            threadCache.free(); // 释放线程缓存中未分配的空间
        }
    

    3.ThreadDeathWatcher

    当线程意外结束时,如果没有释放未分配的空间,将会造成内存泄漏。为了以防万一,Netty采用一个Watcher线程观察使用缓存的线程,当线程意外结束时,确保内存一定被释放。出于性能考虑,Netty并没有使用一个常驻的后台Watcher线程,而是当有需要观察的线程时,才启动Watcher线程;当所有观察任务结束,Watcher线程亦结束。当然,这会增加一些代码的复杂度。
    首先分析成员变量,由于后台只需要一个Watcher线程,所以变量都被定义为static

        // 被观察的线程
        private static final Queue<Entry> pendingEntries = new ConcurrentLinkedQueue<Entry>();
        private static final Watcher watcher = new Watcher(); // 观察任务
        private static final AtomicBoolean started = new AtomicBoolean(); // 观察线程是否启动
        private static volatile Thread watcherThread; // 观察线程
    

    其中的Entry表示被观察的线程及其缓存释放时需要执行的清理操作,其中的变量如下:

        final Thread thread; // 被观察线程
        final Runnable task; // 清理缓存操作
        final boolean isWatch; // 是否需要被观察
    

    PoolThreadCache的构造方法中最后调用ThreadDeathWatcher.watch()方法观察该调用线程,实现代码如下:

        public static void watch(Thread thread, Runnable task) {
            if (!thread.isAlive()) {
                throw new IllegalArgumentException("thread must be alive.");
            }
            schedule(thread, task, true);
        }
        
        // entry.isWatch = isWatch
        private static void schedule(Thread thread, Runnable task, boolean isWatch) {
            pendingEntries.add(new Entry(thread, task, isWatch)); // 加入等待队列
    
            if (started.compareAndSet(false, true)) {
                // started为false则启动观察线程
                Thread watcherThread = threadFactory.newThread(watcher);
                watcherThread.start();
                ThreadDeathWatcher.watcherThread = watcherThread;
            }
        }
    

    可见实现很简单,将需要观察的线程加入到等待队列,当没有观察线程时启动观察线程即可。对应地,当含有缓存的线程主动释放缓存时会调用unwatch()方法,代码如下:

        public static void unwatch(Thread thread, Runnable task) {
            schedule(thread, task, false);
        }
    

    只是设置isWatch,其余与watch如出一辙。下面分析关键的Watcher任务,类方法和成员变量如下:

        private static final class Watcher implements Runnable {
            private final List<Entry> watchees = new ArrayList<Entry>();
        }
    

    由于在watch()unwatch()方法中使用了isWatch判断是否需要观察,此处的watchees中只包含需要观察的线程。核心的run()方法如下:

        public void run() {
            for (;;) {
                fetchWatchees(); // 得到需要观察的线程
                notifyWatchees(); // 线程异常结束执行清理操作
    
                // 再次执行,防止notify触发新的观察线程
                fetchWatchees();
                notifyWatchees();
    
                try {
                    Thread.sleep(1000); // 周期1秒
                } catch (InterruptedException ignore) {
                    // Ignore the interrupt; do not terminate until all tasks are run.
                }
    
                // 没有需要被观察的线程则结束该观察线程
                if (watchees.isEmpty() && pendingEntries.isEmpty()) {.
                    boolean stopped = started.compareAndSet(true, false);
                    assert stopped;
                    
                    if (pendingEntries.isEmpty()) {
                        break; // 没有任务则退出
                    }
    
                    if (!started.compareAndSet(false, true)) {
                        break; // 此时watch()方法启动新的观察线程,该线程退出
                    }
                }
            }
        }
    

    可见,观察线程以1秒为周期检测使用缓存的线程是否非正常死亡,如果非正常死亡则执行遗嘱中的清理操作。检测过程和清理过程如下:

        private void fetchWatchees() {
            for (;;) {
                Entry e = pendingEntries.poll();
                if (e == null) {
                    break;
                }
    
                if (e.isWatch) {
                    watchees.add(e); // 需要观察
                } else {
                    watchees.remove(e); // 正常原因死亡,不需要遗嘱
                }
            }
        }
        
        private void notifyWatchees() {
            List<Entry> watchees = this.watchees;
            for (int i = 0; i < watchees.size();) {
                Entry e = watchees.get(i);  
                if (!e.thread.isAlive()) { // 非正常死亡
                    watchees.remove(i); 
                    try {
                        e.task.run(); // 遗嘱中的清理操作
                    } catch (Throwable t) {
                        logger.warn("Thread death watcher task raised an exception:", t);
                    }
                } else {
                    i ++;
                }
            }
        }
    

    细心的朋友可能会觉得watchees.add(e)watchees.remove(e)会删除不同的entry对象(watch和unwatch添加了两个entry对象),实际中并不会,这是因为entry特殊的equals()方法即isWatch不参与相等比较:

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
    
            if (!(obj instanceof Entry)) {
                return false;
            }
    
            Entry that = (Entry) obj;
            return thread == that.thread && task == that.task;
        }
        
        public int hashCode() {
            return thread.hashCode() ^ task.hashCode();
        }
    

    至此,线程缓存PoolThreadCache分析完毕。
    相关链接:

    1. JEMalloc分配算法
    2. PoolArena
    3. PoolChunk
    4. PoolChunkList
    5. PoolSubpage

    相关文章

      网友评论

        本文标题:自顶向下深入分析Netty(十)--PoolThreadCach

        本文链接:https://www.haomeiwen.com/subject/mzeslxtx.html