PoolArena

作者: Pillar_Zhong | 来源:发表于2019-08-01 19:08 被阅读0次

    关键属性

    // tiny数组的大小, 32
    static final int numTinySubpagePools = 512 >>> 4;
    // 所属的PooledByteBufAllocator
    final PooledByteBufAllocator parent;
    // chunk二叉树的最大高度, 默认11
    private final int maxOrder;
    // pagesiz, 默认1 << 13 = 8192
    final int pageSize;
    // pageShifts=log(pageSize),默认13
    final int pageShifts;
    // 默认16M
    final int chunkSize;
    // ~(pageSize - 1) = 11111111111111111110000000000000
    final int subpageOverflowMask;
    // pageShifts - 9 = 4
    final int numSmallSubpagePools;
    // tiny数组
    private final PoolSubpage<T>[] tinySubpagePools;
    // small数组
    private final PoolSubpage<T>[] smallSubpagePools;
    

    构造方法

    image image
    protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
        ...
        // 初始化tinySubpagePools
        // 创建长度为32的PoolSubpage数组
        tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
        for (int i = 0; i < tinySubpagePools.length; i ++) {
            // 给每个元素都初始化一个head
            tinySubpagePools[i] = newSubpagePoolHead(pageSize);
        }
    
        // 初始化numSmallSubpagePools
        numSmallSubpagePools = pageShifts - 9;
        // 创建长度为4的PoolSubpage数组
        smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
        for (int i = 0; i < smallSubpagePools.length; i ++) {
            // 给每个元素都初始化一个head
            smallSubpagePools[i] = newSubpagePoolHead(pageSize);
        }
    
        ...chunklist...
    }
    
    private PoolSubpage<T>[] newSubpagePoolArray(int size) {
        return new PoolSubpage[size];
    }
    
    private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
        PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
        head.prev = head;
        head.next = head;
        return head;
    }
    
    • tinySubpagePools:用于分配小于512字节的内存,默认长度为32,因为内存分配最小为16,每次增加16,直到512,区间[16,512)一共有32个不同值
    • smallSubpagePools:用于分配大于等于512字节的内存,默认长度为4, 每次容量翻倍

    定位

    既然tinySubpagePools或smallSubpagePools设计成不同elemsize组成的,那么怎么根据我请求的内存大小来去定制具体的head呢?

    findSubpagePoolHead

    PoolSubpage<T> findSubpagePoolHead(int elemSize) {
        int tableIdx;
        PoolSubpage<T>[] table;
        // 如果申请的空间是属于tiny的部分
        if (isTiny(elemSize)) { // < 512
            // 根据elemsize除去16可以很容易拿到tinySubpagePools对应大小的head
            tableIdx = elemSize >>> 4;
            table = tinySubpagePools;
        } else {
            // 否则去smallSubpagePools里面申请
            tableIdx = 0;
            // 这里很巧妙的来计算small的对应elemsize的head
            // 512-1023之间的数字>>>10的结果为0
            // 1024-2047之间的数字>>>10的结果为1
            // 2048-4095之间的数字>>>10的结果为10
            // 4096-8195之间的数字>>>10的结果为100
            // 将以上的结果右移一位直到等于0,所用的次数正好是数组的下标
            elemSize >>>= 10;
            while (elemSize != 0) {
                elemSize >>>= 1;
                tableIdx ++;
            }
            table = smallSubpagePools;
        }
    
        return table[tableIdx];
    }
    

    分配

    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
        // 对申请的内存大小做标准化
        final int normCapacity = normalizeCapacity(reqCapacity);
        // 如果小于pagesize,8k
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);
            // 如果小于512,属于tiny的范围
            if (tiny) { // < 512
                // 去本地缓存申请tiny空间,如果能成功,直接返回
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                // 拿到tinySubpagePools中该容量所属的位置
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            // 否则属于small的范围
            } else {
                // 去本地缓存申请small空间,如果能成功,直接返回
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                // 拿到smallSubpagePools中该容量所属的位置
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }
    
            // 拿到对应的head
            final PoolSubpage<T> head = table[tableIdx];
    
            // head同步处理,
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                // head的next有PoolSubpage存在,说明之前已经分配过内存了,看看能不能复用这段内存
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    // 申请空间,拿到内存地址handle
                    long handle = s.allocate();
                    // 申请成功
                    assert handle >= 0;
                    // 将这段申请的内存空间与ByteBuf绑定
                    s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
    
                    if (tiny) {
                        allocationsTiny.increment();
                    } else {
                        allocationsSmall.increment();
                    }
                    return;
                }
            }
            // 到这里,说明没有缓存可用,那么去内存池去获取
            allocateNormal(buf, reqCapacity, normCapacity);
            return;
        }
        // 如果超过page但还没有到chunksize,那么直接新建chunk
        if (normCapacity <= chunkSize) {
            // 去本地缓存申请normal空间
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            // 直接去内存池去获取
            allocateNormal(buf, reqCapacity, normCapacity);
        } else {
            // Huge allocations are never served via the cache so just call allocateHuge
            // 直接去申请Unpooled内存空间
            allocateHuge(buf, reqCapacity);
        }
    }
    

    allocateNormal

    private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
        // 在poolchunklist中看有没有chunk能进行分配,当然了,刚开始,这些里面都是没有的。
        // 只有在实际生成chunk后才会加到这些list里面。
        if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
            q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
            q075.allocate(buf, reqCapacity, normCapacity)) {
            ++allocationsNormal;
            return;
        }
        
        // 新增chunk
        PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
        // 在chunk里申请内存空间
        long handle = c.allocate(normCapacity);
        ++allocationsNormal;
        // 如果申请成功
        assert handle > 0;
        // 与Bytebuf绑定
        c.initBuf(buf, handle, reqCapacity);
        // 将该chunk加到qInit的list。
        qInit.add(c);
    }
    

    标准化

    int normalizeCapacity(int reqCapacity) {
        if (reqCapacity < 0) {
            throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
        }
        // 如果超过chunksize,直接返回
        if (reqCapacity >= chunkSize) {
            return reqCapacity;
        }
        
        // 因为整个chunk只有tiny是16B递增的。其他都是翻倍递增,也就是以2为幂的数
        // 只有当超过tiny,也就是>=512, 才需要计算
        if (!isTiny(reqCapacity)) { // >= 512
            // Doubled
    
            // 这里来计算离给定reqCapacity最近的下一个2的幂次方
            // 比如给定8888,会返回16K
            int normalizedCapacity = reqCapacity;
            normalizedCapacity --;
            normalizedCapacity |= normalizedCapacity >>>  1;
            normalizedCapacity |= normalizedCapacity >>>  2;
            normalizedCapacity |= normalizedCapacity >>>  4;
            normalizedCapacity |= normalizedCapacity >>>  8;
            normalizedCapacity |= normalizedCapacity >>> 16;
            normalizedCapacity ++;
    
            if (normalizedCapacity < 0) {
                normalizedCapacity >>>= 1;
            }
    
            return normalizedCapacity;
        }
    
        // Quantum-spaced
        // 到这里说明是tiny的范围,如果已经是16的倍数了,那么满足条件,直接返回。
        if ((reqCapacity & 15) == 0) {
            return reqCapacity;
        }
        // 否则标准化成16的倍数
        return (reqCapacity & ~15) + 16;
    }
    

    重分配

    void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
        if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
            throw new IllegalArgumentException("newCapacity: " + newCapacity);
        }
    
        int oldCapacity = buf.length;
        if (oldCapacity == newCapacity) {
            return;
        }
    
        PoolChunk<T> oldChunk = buf.chunk;
        long oldHandle = buf.handle;
        T oldMemory = buf.memory;
        int oldOffset = buf.offset;
        int oldMaxLength = buf.maxLength;
        int readerIndex = buf.readerIndex();
        int writerIndex = buf.writerIndex();
    
        allocate(parent.threadCache(), buf, newCapacity);
        // 如果新的容量比之前的大,那么将之前内存的内容复制到新的内存空间里面
        if (newCapacity > oldCapacity) {
            memoryCopy(
                    oldMemory, oldOffset,
                    buf.memory, buf.offset, oldCapacity);
        // 如果是缩容的操作,且能放下部分read的数据,那么将read的数据复制过去
        // 当新容量不足时,允许截取
        } else if (newCapacity < oldCapacity) {
            if (readerIndex < newCapacity) {
                if (writerIndex > newCapacity) {
                    writerIndex = newCapacity;
                }
                memoryCopy(
                        oldMemory, oldOffset + readerIndex,
                        buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
            } else {
                readerIndex = writerIndex = newCapacity;
            }
        }
    
        buf.setIndex(readerIndex, writerIndex);
        
        // 释放老的内存空间
        if (freeOldMemory) {
            free(oldChunk, oldHandle, oldMaxLength, buf.cache);
        }
    }
    

    相关文章

      网友评论

          本文标题:PoolArena

          本文链接:https://www.haomeiwen.com/subject/jencdctx.html