美文网首页
[Netty源码分析]ByteBuf(三)

[Netty源码分析]ByteBuf(三)

作者: 没意思先生1995 | 来源:发表于2018-08-30 23:09 被阅读0次
  1. 内存规格

    0->512B->8K->16M(tiny-->small-->normal-->huge)
    所有内存分配以chunk(16M)形式向操作系统申请分配内存,8K作为一个Page


    内存规格.png
  2. 命中缓存的分配逻辑
    相关数据结构:


    MemoryRegionCache.png
    MemoryRegionCache及相关内存大小.png
    private abstract static class MemoryRegionCache<T> {
        //Netty5和Netty4结构不一样,以下为Netty5
        private final Entry<T>[] entries;
        private final int maxUnusedCached;
        private int head;
        private int tail;
        private int maxEntriesInUse;
        private int entriesInUse;
    
        //Netty4
        private final int size;
        private final Queue<Entry<T>> queue;//存储每种大小的MemoryRegionCache
        private final SizeClass sizeClass;//MemoryRegionCache的种类
        private int allocations;
    }
    final class PoolThreadCache {
            // Hold the caches for the different size classes, which are tiny, small and normal.
            private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
            private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
            private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
            private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
            private final MemoryRegionCache<byte[]>[] normalHeapCaches;
            private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
    }
    分配流程:
    1. 找到对应Size的MemoryRegionCache
    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
        final int normCapacity = normalizeCapacity(reqCapacity);
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);
            if (tiny) { // < 512
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }
            final PoolSubpage<T> head = table[tableIdx];
            /**
             * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
             * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
             */
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate();
                    assert handle >= 0;
                    s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                    incTinySmallAllocation(tiny);
                    return;
                }
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
            }
            incTinySmallAllocation(tiny);
            return;
        }
        if (normCapacity <= chunkSize) {
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
                ++allocationsNormal;
            }
        } else {
            // Huge allocations are never served via the cache so just call allocateHuge
            allocateHuge(buf, reqCapacity);
        }
    }
    ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
    //举例PooledThreadCache的allocateTiny
    boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
        return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
    }
    ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
    //PooledThreadCache的allocate
    private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
        if (cache == null) {
            // no cache found so just return false here
            return false;
        }
        boolean allocated = cache.allocate(buf, reqCapacity);
        if (++ allocations >= freeSweepAllocationThreshold) {
            allocations = 0;
            trim();
        }
        return allocated;
    }
    2. 从queue弹出一个Entry给ByteBuf初始化(实际为一个chunk,chunk的handler可以定位chunk里唯一一块元素的内存)
    //MemoryRegionCache的allocate
    public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
        Entry<T> entry = queue.poll();
        if (entry == null) {
            return false;
        }
        initBuf(entry.chunk, entry.handle, buf, reqCapacity);
        entry.recycle();
        // allocations is not thread-safe which is fine as this is only called from the same thread all time.
        ++ allocations;
        return true;
    }
    ↓↓↓↓↓↓↓↓↓↓↓↓↓↓
    //SubPageMemoryRegionCache
    protected void initBuf(
            PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
            chunk.initBufWithSubpage(buf, handle, reqCapacity);
    }
    ↓↓↓↓↓↓↓↓↓↓↓↓↓↓
    //PoolChunk
    void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int reqCapacity) {
        initBufWithSubpage(buf, handle, bitmapIdx(handle), reqCapacity);
    }
    ↓↓↓↓↓↓↓↓↓↓↓↓↓↓
    private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {
        assert bitmapIdx != 0;
        int memoryMapIdx = memoryMapIdx(handle);
        PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
        assert subpage.doNotDestroy;
        assert reqCapacity <= subpage.elemSize;
        buf.init(
            this, handle,
            runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
                reqCapacity, subpage.elemSize, arena.parent.threadCache());
    }
    //PooledByteBuf
    private void init0(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
        assert handle >= 0;
        assert chunk != null;
        this.chunk = chunk;
        memory = chunk.memory;
        allocator = chunk.arena.parent;
        this.cache = cache;
        this.handle = handle;
        this.offset = offset;
        this.length = length;
        this.maxLength = maxLength;
        tmpNioBuf = null;
    }
    
    3. 将弹出的Entry放到对象池进行复用(减少GC,减少对象创建)
    //MemoryRegionCache
    public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
        //略
        entry.recycle();
        // allocations is not thread-safe which is fine as this is only called from the same thread all time.
        ++ allocations;
        return true;
    }
    ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
    //Entry
    void recycle() {
        chunk = null;
        handle = -1;
        recyclerHandle.recycle(this);//将对象push到handler的stack中
    }
    

相关文章

网友评论

      本文标题:[Netty源码分析]ByteBuf(三)

      本文链接:https://www.haomeiwen.com/subject/eqadwftx.html