美文网首页Java
Netty源码_内存管理(jemalloc3)

Netty源码_内存管理(jemalloc3)

作者: wo883721 | 来源:发表于2021-11-21 18:45 被阅读0次

    Netty 是一个高性能的网络应用程序框架,主要就是进行数据的交互,所以必须有一个高效的内存分配器。
    内存分配器的功能就两个:

    • 用户申请内存时,分配给它内存块。
    • 用户主动释放内存时,回收这个内存块。

    一般我们的做法是:

    • 先申请一个较大的内存块。
    • 当用户申请内存时,从这个内存块中,分割符合申请内存大小的内存块给用户。
    • 用户主动释放内存时,再将这个内存块回收。

    但是这么做有个问题,因为用户申请内存的大小各不相同,分配的内存块大小就不一样,回收以后就是各种尺寸的内存碎片。

    • 例如,我们有一个20大小的总内存块,分配给用户两个大小为5 内存块,和一个内存为 4 内存块,两个内存为2 内存块;
    • 之后都回收了,就有两个为 5,一个为4,三个为2内存碎片
    • 这个时候在申请内存为 6 的内存块时,发现没有办法分配了。

    为了解决这个问题,能够高效地进行内存分配,就要使用内存分配算法了。

    • Netty 4.1.45版本之前使用的是 jemalloc3 算法来进行内存分配的;
    • 而在4.1.45版本之后使用的是 jemalloc4 算法来进行内存分配的。
    • 本篇文章我们先介绍 jemalloc3 算法实现。

    一. 划分内存规格

    产生内存碎片最主要的原因就是因为用户申请的内存大小不一样。

    那么如果用户申请的内存大小都一样,那么不就没有内存碎片了么。

    想法虽然是好的,但是明显是不可能的,因为程序运行过程中,需要的内存本来就是不同的。
    那么我们就换一个思路,虽然不能要求申请的内存大小都一样,但是可以提前划分好不同规格的内存,然后根据请的内存大小不同,分配不同规格的内存快。

    jemalloc3_内存规格.png

    如上图所示,jemalloc3 一共将内存分为四种类型:

    内存规格 描述
    Tiny 微小规格内存块,容量从16B496B 一共31 个内存规格,每个规格容量相差16B
    Small 小规格内存块,容量从512B4KB 一共4 个内存规格,每个规格容量相差一倍
    Normal 正常规格内存块,容量从8KB16MB 一共11 个内存规格,每个规格容量相差一倍
    Huge 巨大内存块,不会放在内存管理中,直接内存中申请

    因此就可以根据用户申请的内存大小,直接对应规格的内存块。

    • 例如申请 40B, 那么就分配 48B 规格的内存块,虽然有 8B 的字节被浪费了,但是避免了内存碎片的产生。
    • 你会发现从Small 开始,每个规格内存块相差都是一倍,这就可以导致 50% 的内存浪费;例如我们申请 513B 大小,那么只能分配1KB 规格的内存块。这个是 jemalloc3 算法的缺陷,只能使用 jemalloc4 算法进行改进,以后我们会说到。

    二. 内存规格算法实现

    内存规格的划分作用和意义我们已经了解了,那么怎么实现它呢?
    Netty 中使用 PoolChunk 来进行内存分配:

    • PoolChunk 先申请一大块内存memory(可以是字节数组,也可以是DirectByteBuffer),大小就是chunkSize(16MB)。
    • 我们知道 Normal 规格最小内存块是 pageSize(8KB) 容量,那么就要能记录最小 Normal 规格内存块使用情况。
    • TinySmall 规格内存块小于 pageSize 大小,可以使用一个最小 Normal 规格内存块来分配多个 TinySmall 规格内存块。
    内存规格算法实现.png

    如图所示:

    • PoolChunk 使用一个满二叉树(用数组实现)来记录内存块的分配使用情况。

      • 因为chunkSize == 16MB,且 pageSize == 8KB,那么树的深度depth 一共 12 层(从011)。
      • 根据不同深度,就可以获得不同大小的内存块,例如最底层即11层所有节点对应的内存块大小就是8KB
    • 使用数组来实现这个满二叉树。

      • 这里有两个数组 memoryMapdepthMap,大小都是4096。做了特殊处理,下标0 这个位置没有任何意义,从下标 1 开始。
      • depthMap 的值表示当前下标对应在二叉树中的层数。例如下标为1的值是 0,表示第 0 层;下标为 6 的值是 2,表示第 2 层;下标为 2048 的值是 11,表示第 11 层。
      • memoryMap 的值表示当前这个节点能分配的内存块大小。刚开始时和depthMap 的值是一样的,但是当它的子节点被分配了,那么值就会变。例如刚开始时,下标为 4 的值是 2,表示能分配 4MB 内存块大小;如果它的一个子节点被分配了,那么它的值就会变成 3,表示只能分配 2MB 内存块大小。
    • 使用 bitmap 数据记录TinySmall规格内存使用情况

      • 最底层的内存块可以在分成 TinySmall规格小内存块。
      • 一旦在最底层的内存块分配了一个 TinySmall规格小内存块,那么这个最底层的内存块就表示被使用了,而且这个内存块只能分配刚分配那个大小的规格的小内存块,直到它被回收(即由它分配的小内存快都被释放),进行重新分配,那么可以分配其他大小的规格的小内存块。即由第一次分配的规格大小来决定。
      • 通过bitmap 位图数组来记录,已经在最底层的内存块上分配了那些小内存块。因为最小内存块大小是16B,而最底层的内存块大小是8KB,因此最多可以分512块;一个 long 类型有64 位二进制数,所以最多需要8long 类型就可以记录。
      • 通过 bitmapIdx 的值,可以得到在bitmap 位图数组中的那一个long 类型的那一位。通过 bitmapIdx >>> 6 (即除以64) 得到bitmap 位图数组的下标;通过 bitmapIdx & 63(即整除64 的余数)得到占据long 类型那一位。
    • 通过 handle 来记录偏移量和内存块大小

      • 32 位用来记录 bitmapIdx,从前面介绍 bitmapIdx的值很小的,最大值就是 64 * 8。最高位肯定是0,次高位(0x4000000000000000L)其实是用来记录是不是TinySmall类型规格。
      • 32 位用来记录 memoryMapIdx
      • 如果是 Normal规格,高32 位的值肯定是0;通过memoryMapIdxdepthMap数组获取对应层数,这样就能得到内存块大小了;根据 memoryMapIdx 可以计算在当前这一层的偏移值。例如 memoryMapIdx = 2050,那么是第11 层,大小就是8KB;偏移值就是 2050 - 2048 = 2,那么偏移量就是 16KB;因此我们就在偏移量16KB处分割一块8KB大小的内存块给用户使用。
      • 如果是TinySmall规格,那么肯定是在最底层,先通过memoryMapIdx 计算偏移值,得到偏移量,然后得到这个最底层内存块分割成小内存的大小,再根据bitmapIdx值得到在这个最底层内存块上的偏移量,最后就能得到最终偏移量和分割内存块大小了。

    三. 源码实现

    3.1 PoolSubpage

    3.1.1 初始化

        PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
            this.chunk = chunk;
            this.memoryMapIdx = memoryMapIdx;
            this.runOffset = runOffset;
            this.pageSize = pageSize;
            // 因为 long 类型是8个字节,64位二进制数;
            // 而 Tiny 类型最小容量都是 16 个字节。
            // 所以 bitmap 位图数组最大长度就是  pageSize / 16/ 64
            bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
            init(head, elemSize);
        }
    
        void init(PoolSubpage<T> head, int elemSize) {
            doNotDestroy = true;
            // 当前这 PoolSubpage 只会分配 elemSize 大小容量的内存
            this.elemSize = elemSize;
            if (elemSize != 0) {
                // PoolSubpage 一共可以分配多少块这个容量的内存
                maxNumElems = numAvail = pageSize / elemSize;
                nextAvail = 0;
                // 无符号右移6位,也就是除以64,因为一个 long 有64个二进制位
                bitmapLength = maxNumElems >>> 6;
                // 如果 maxNumElems 不能整除 64,那么就要将 bitmapLength 加一
                if ((maxNumElems & 63) != 0) {
                    bitmapLength ++;
                }
    
                for (int i = 0; i < bitmapLength; i ++) {
                    bitmap[i] = 0;
                }
            }
            // 添加到 PoolArena 中对应尺寸容量的PoolSubpage链表中
            addToPool(head);
        }
    
    • 刚开始创建的时候,主要是创建 bitmap 位图数组,数组长度就是 pageSize >>> 10,即除以64位二进制数,和最小Tiny类型规格都是16 个字节。
    • init(...) 初始化方法,刚创建的时候或者PoolSubpage被回收重新使用的时候调用。
    • 确定当前PoolSubpage分配内存块大小elemSize
    • 计算最多分配多少这个大小的内存块。
    • 计算真实 bitmap位图数组长度bitmapLength
    • 将这个PoolSubpage添加到PoolArena中对应尺寸容量的PoolSubpage链表中,这样就不需要需要查找,加快内存块分配速度。

    3.1.2 分配内存块

        /**
         * 返回子页面内存分配的位图索引
         * 使用 long 类型每个二进制位数`0`或 `1` 来记录这块内存有没有被分配过,
         * 因为 long 是8个字节,64位二进制数,所以可以表示 64 个内存块分配情况。
         */
        long allocate() {
            if (elemSize == 0) {
                return toHandle(0);
            }
    
            if (numAvail == 0 || !doNotDestroy) {
                return -1;
            }
    
            // 得到下一个可用的位图 bitmap 索引
            final int bitmapIdx = getNextAvail();
            // 除以 64 得到的整数,即 bitmap[] 数组的下标
            int q = bitmapIdx >>> 6;
            // 与 64 的余数,即占据的 long 类型的位数
            int r = bitmapIdx & 63;
            // 必须是 0, 表示这一块内存没有被分配
            assert (bitmap[q] >>> r & 1) == 0;
            // 将r对应二进制位设置为1,表示这一位代表的内存块已经被分配了
            bitmap[q] |= 1L << r;
    
            if (-- numAvail == 0) {
                // 如果可分配内存块的数量numAvail为0,
                // 那么就要这个 PoolSubpage 从 PoolArena 中
                // 对应尺寸容量的PoolSubpage链表中移除。
                removeFromPool();
            }
    
            // 使用 long类型的高32位储存 bitmapIdx 的值,即使用 PoolSubpage 中那一块的内存;
            // 低32位储存 memoryMapIdx 的值,即表示使用那一个 PoolSubpage
            return toHandle(bitmapIdx);
        }
    
        private long toHandle(int bitmapIdx) {
            // 虽然我们使用高 32 为表示 bitmapIdx,但是当bitmapIdx = 0 时,
            // 就无法确定是否表示 bitmapIdx 的值。
            // 所以这里就 0x4000000000000000L | (long) bitmapIdx << 32,那进行区分。
            // 放心  bitmapIdx << 32 是不可能超过 0x4000000000000000L
            return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
        }
    

    方法流程:

    • 如果没有可用内存块了,就直接返回 -1
    • 通过 getNextAvail() 方法,获取下一个能分配的内存块的位图索引bitmapIdx
    • 根据位图索引bitmapIdxbitmap 位图数组中对应二进制位设置为1,表示已经被分配了。
    • 如果分配之后没有内存块了,就将这个PoolSubpagePoolArena中对应尺寸容量的PoolSubpage链表中删除,因为已经不能分配了。
    • 通过 toHandle(bitmapIdx) 返回 handle 值。

    3.1.3 回收内存块

     /**
         * 返回 true,表示这个 PoolSubpage 还在使用,即上面还有其他小内存块被使用;
         * 返回 false,表示这个 PoolSubpage 上面分配的小内存块都释放了,可以回收整个 PoolSubpage。
         */
        boolean free(PoolSubpage<T> head, int bitmapIdx) {
            if (elemSize == 0) {
                return true;
            }
            // 得到位图 bitmap 中的下标
            int q = bitmapIdx >>> 6;
            // 得到使用 long 类型中那一位
            int r = bitmapIdx & 63;
            // 必须不能是 0, 表示这个 bitmapIdx 对应内存块肯定是在被使用
            assert (bitmap[q] >>> r & 1) != 0;
            // 将r对应二进制位设置为0,表示这一位代表的内存块已经被释放了
            bitmap[q] ^= 1L << r;
    
            // 将 bitmapIdx 设置为下一个可以使用的内存块索引,
            // 因为刚被释放,这样就不用进行搜索来查找可用内存块索引。
            setNextAvail(bitmapIdx);
    
            if (numAvail ++ == 0) {
                // 如果可分配内存块的数量numAvail从0开始增加,
                // 那么就要重新添加到 PoolArena 中对应尺寸容量的PoolSubpage链表中
                addToPool(head);
                return true;
            }
    
            if (numAvail != maxNumElems) {
                return true;
            } else {
                // 子页面未使用(numAvail == maxNumElems)
                if (prev == next) {
                    // 如果 prev == next,即 subpage 组成的链表中没有其他 subpage,不能删除它
                    return true;
                }
    
                // 如果 prev != next,即 subpage 组成的链表中还有其他 subpage,那么就删除它
                doNotDestroy = false;
                removeFromPool();
                return false;
            }
        }
    
    • 根据位图索引bitmapIdxbitmap 位图数组中对应二进制位设置为0,表示已经被释放了。
    • 调用 setNextAvail(bitmapIdx),加快下一次分配内存块的速度,不需要重新查找了。
    • 最后再处理一下 PoolArena中对应尺寸容量的PoolSubpage链表。

    3.2 PoolChunk

    3.2.1 分配内存块

    3.2.1.1 allocate 方法

    boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
            final long handle;
            if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
                //  >= pageSize,即 Normal 规格类型内存块,通过 allocateRun 方法分配
                handle =  allocateRun(normCapacity);
            } else {
                // 分配 Tiny 和 Small 规格类型内存块
                handle = allocateSubpage(normCapacity);
            }
    
            if (handle < 0) {
                // 小于 0, 说明当前PoolChunk都被分配完了
                return false;
            }
            ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null;
            // 使用 handle 来初始化 池化缓存区PooledByteBuf
            initBuf(buf, nioBuffer, handle, reqCapacity);
            return true;
        }
    
    • 通过 allocateRun(...) 方法,分配 Normal 规格类型内存块;通过 allocateSubpage(normCapacity)方法,分配TinySmall 规格类型内存块。
    • 通过 initBuf(...) 方法,使用申请的内存块handle 来初始化池化缓存区PooledByteBuf

    3.2.1.2 allocateRun 方法

        private long allocateRun(int normCapacity) {
            /**
             * 默认情况下,maxOrder=11,pageShifts=13
             * normCapacity肯定是大于或者等于pageSize,即 log2(normCapacity) >= pageShifts
             *
             * d 表示在第几层可以分配这个尺寸容量normCapacity 的内存块。
             * 最底层,即11层,最多只能分配 pageSize尺寸容量的内存块。
             */
            int d = maxOrder - (log2(normCapacity) - pageShifts);
            /**
             * 得到尺寸容量normCapacity 内存块的索引id
             */
            int id = allocateNode(d);
            if (id < 0) {
                // 小于 0, 说明当前PoolChunk都被分配完了
                return id;
            }
            freeBytes -= runLength(id);
            return id;
        }
    
    • 通过 allocateNode(d) 方法获取 memoryMapIdx 的值。
    • 减少当前 PoolChunk 可用内存字节数freeBytes

    3.2.1.3 allocateNode 方法

         private int allocateNode(int d) {
            // d 代表层数,其实也代表需要的内存容量,(1 << (maxOrder - d)) * pageSize
            // 当 d 和 maxOrder 相等,即需要内存容量就是 pageSize
            int id = 1;
            /**
             * 例如 d = 11
             * 那么 1 << d   就是 100000000000
             *  - (1 << d)  就是 1111111111111111111111111111111111111111111111111111100000000000
             *  所以 initial的作用就是用来快速判断某个值是不是小于 (1 << d)
             */
            int initial = - (1 << d); // has last d bits = 0 and rest all = 1
            byte val = value(id);
            if (val > d) { // unusable
                return -1;
            }
            /**
             * val < d 表示当前节点id对应的内存容量还大于 d 对应的内存容量,继续寻找。
             * (id & initial) == 0 只有当 id < (1 << d) 时成立,保证 id 是 d层的。
             */
            while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
                id <<= 1;
                val = value(id);
                if (val > d) {
                    /**
                     * val > d,表示从 id 节点对应的内存容量已经不足要求内存块的大小了,
                     * 但是它能走到这一个判断,说明 id 节点的父节点对应的内存容量是可以满足内存块的大小的;
                     * 那一定是因为 id 节点兄弟节点对应内存容量能满足内存块的大小。
                     *
                     * id ^= 1 就是得到 id 节点的兄弟节点
                     */
                    id ^= 1;
                    val = value(id);
                }
            }
            byte value = value(id);
            assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
                    value, id & initial, d);
            // 将当前 memoryMap 的下标id 的值设置为 unusable,
            // 表示它已经被使用了, unusable = maxOrder + 1
            setValue(id, unusable); // mark as unusable
            // 更新这个 id 之上所有父节点的值,
            // 因为id 节点被使用了,那么它之上所有父节点代表的内存容量都收到影响。
            updateParentsAlloc(id);
            return id;
        }
    
        private void updateParentsAlloc(int id) {
            // 通过循环更新所有父节点的值。
            while (id > 1) {
                int parentId = id >>> 1;
                byte val1 = value(id);
                byte val2 = value(id ^ 1);
                // 寻找父节点对应子节点中较小的值
                byte val = val1 < val2 ? val1 : val2;
                setValue(parentId, val);
                id = parentId;
            }
        }
    
    • 现在满二叉树中,在d 对应的那层中寻找还没有被分配的节点(从左到右寻找),返回这个节点的下标值(即memoryMapIdx)。
    • 通过 setValue(id, unusable) 方法,将这个节点值设置成unusable,表示这个节点已经被分配了。
    • 通过 updateParentsAlloc(id) 方法更新父节点可分配的内存大小。
    • 因为 d 层有个节点被分配了,那么这个节点的父节点以及父节点的父节点等,它们的可分配的内存大小就和它们未分配的那个子节点大小一样了。

    3.2.1.4 allocateNode 方法

        private long allocateSubpage(int normCapacity) {
            // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
            // This is need as we may add it back and so alter the linked-list structure.
            // 得到 PoolArena 中对应尺寸容量的PoolSubpage链表头
            PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
            // 因为 Tiny 或者 Small类型容量都小于 pageSize,
            // 所以它们肯定只使用最低层一个 PoolSubpage
            int d = maxOrder;
            synchronized (head) {
                // 寻找没有被分配的 PoolSubpage 索引id
                int id = allocateNode(d);
                if (id < 0) {
                    // 小于 0, 说明当前PoolChunk都被分配完了
                    return id;
                }
    
                final PoolSubpage<T>[] subpages = this.subpages;
                final int pageSize = this.pageSize;
    
                // 当前PoolChunk 可用字节数
                freeBytes -= pageSize;
    
                // 得到 this.subpages 的下标
                int subpageIdx = subpageIdx(id);
                // 需要容量normCapacity的内存就从这个 subpage 中分配
                // 分配之后,这个 subpage 就只能存放这个尺寸容量normCapacity的内存
                // 这里的 subpage 肯定是没有分配过内存的,
                // 因为通过 allocateNode(d) 找到的肯定是没有分配过内存的,
                PoolSubpage<T> subpage = subpages[subpageIdx];
                if (subpage == null) {
                    // 创建 PoolSubpage 实例,构造方法中会调用 subpage.init(head, normCapacity) 方法
                    subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
                    subpages[subpageIdx] = subpage;
                } else {
                    // 这个 PoolSubpage 之前被用过,但是被释放了。
                    // 初始化, 将这个 PoolSubpage 能分配的容量尺寸就是 normCapacity
                    // 再将这个 PoolSubpage 添加到 PoolArena 对应容量尺寸 PoolSubpage<T> 数组中
                    // 这样下次再请求这个尺寸的内存时,直接从 PoolSubpage<T> 数组中找到这个 PoolSubpage,
                    // 进行内存分配
                    subpage.init(head, normCapacity);
                }
                // 在PoolSubpage上进行内存分配
                return subpage.allocate();
            }
        }
    
    • 通过 allocateNode(d) 方法在最底层寻找未分配的内存块。
    • 减少当前 PoolChunk 可用内存字节数 freeBytes
    • 初始化一个 PoolSubpage, 通过它的subpage.allocate() 方法进行TinySmall 规格类型内存块分配。

    3.2.1.5 initBuf 方法

        void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity) {
            // 因为 handle 高32位表示 bitmapIdx, 低32位表示 memoryMapIdx
            int memoryMapIdx = memoryMapIdx(handle);
            int bitmapIdx = bitmapIdx(handle);
            // 因为 0x4000000000000000L | (long) bitmapIdx << 32,
            // 所以 bitmapIdx == 0时,一定是 Normal 类型
            if (bitmapIdx == 0) {
                byte val = value(memoryMapIdx);
                // 肯定是被使用状态
                assert val == unusable : String.valueOf(val);
                // runOffset(memoryMapIdx) 表示在当前这个 PoolChunk 的字节偏移量
                // runLength(memoryMapIdx) 这个内存块容量
                buf.init(this, nioBuffer, handle, runOffset(memoryMapIdx) + offset,
                        reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache());
            } else {
    
                initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx, reqCapacity);
            }
        }
    
        private int runOffset(int id) {
            // depth(id) 得到id对应层数
            // id ^ 1 << depth(id) 就是这个id节点在这一层的偏移值
            // 例如 id = 2049,depth(id) 就是 11,1 << depth(id) 就是 2048
            // 2049 ^ 2048 = 1
            int shift = id ^ 1 << depth(id);
            // 偏移量shift乘以 id对应内存块容量runLength(id),
            // 就得到最后的字节偏移量
            return shift * runLength(id);
        }
    
        private int runLength(int id) {
            // represents the size in #bytes supported by node 'id' in the tree
            // 节点id对应的内存块大小,单位是字节,一个字节就是8位bits
            // log2ChunkSize 是 chunkSize 的log2 的对数,
            // 而 chunkSize = pageSize * maxOrder, depth(id) 就是求id节点对应的层数,最低层就是maxOrder,
            // 所以id节点在最底层,那么depth(id)就是maxOrder,那么结果值就是 pageSize。
            return 1 << log2ChunkSize - depth(id);
        }
    

    初始化 Normal 规格的 PooledByteBuf, 通过 runOffset(memoryMapIdx) 方法计算偏移量,通过 runLength(memoryMapIdx) 方法计算内存块大小。

    3.2.1.6 initBuf 方法

        private void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer,
                                        long handle, int bitmapIdx, int reqCapacity) {
            assert bitmapIdx != 0;
    
            int memoryMapIdx = memoryMapIdx(handle);
    
            // 通过 memoryMapIdx 找到 PoolSubpage
            PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
            assert subpage.doNotDestroy;
            // 容量必须小于或等于 PoolSubpage 对应的块容量elemSize
            assert reqCapacity <= subpage.elemSize;
    
            // runOffset(memoryMapIdx) 表示这个 PoolSubpage 在当前这个 PoolChunk 的字节偏移量;
            // (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize
            // 就是表示这个 Tiny或者Small类型内存块在 中PoolSubpage 偏移量。
            buf.init(
                this, nioBuffer, handle,
                runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
                    reqCapacity, subpage.elemSize, arena.parent.threadCache());
        }
    

    初始化 TinySmall 规格类型的 PooledByteBuf, 内存块大小通过 PoolSubpageelemSize 获取,偏移量要增加 (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize 的值。

    3.2.2 回收内存块

        void free(long handle, ByteBuffer nioBuffer) {
            int memoryMapIdx = memoryMapIdx(handle);
            int bitmapIdx = bitmapIdx(handle);
    
            if (bitmapIdx != 0) { // free a subpage
                // bitmapIdx != 0 说明它是一个Tiny 或者 Small类型
                // 通过 memoryMapIdx 找到对应的PoolSubpage
                PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
                assert subpage != null && subpage.doNotDestroy;
    
                // 获取 PoolArena 中对应尺寸容量的PoolSubpage链表头
                PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
                synchronized (head) {
    
                    // 释放PoolSubpage中 bitmapIdx 对应那一个内存块
                    if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {
                        // 如果 free(...) 方法返回true,说明这个PoolSubpage还在被使用,
                        // 不能被回收,那么直接返回
                        return;
                    }
                }
            }
            /**
             * 运行到这里,
             * 要么它是一个 Normal 类型内存块,那就释放这个内存块。
             * 要么它是一个 Tiny 或者 Small类型,但是它对应 PoolSubpage 那一块内存块都被释放了,这里就释放它
             */
            freeBytes += runLength(memoryMapIdx);
            // 将  memoryMapIdx 对应节点设置回原来值,又可以进行内存块分配了
            setValue(memoryMapIdx, depth(memoryMapIdx));
            // 因为子节点内存块释放,更新父节点的可分配容量
            updateParentsFree(memoryMapIdx);
    
            if (nioBuffer != null && cachedNioBuffers != null &&
                    cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
                cachedNioBuffers.offer(nioBuffer);
            }
        }
    
    • 如果是TinySmall 规格类型的内存块,那么就要使用 PoolSubpagefree(...) 方法释放内存块。如果返回 true,表示这个 PoolSubpage 还在使用,直接返回;如果返回 false,表示这个 PoolSubpage 不在使用,可以被回收了,就要回收对应的整个内存块。
    • 增加当前 PoolChunk 可用内存字节数 freeBytes
    • 通过 setValue(...) 方法,将 memoryMapIdx 对应节点值改成初始层数值,表示这个节点有可以分配了。
    • 通过 updateParentsFree(memoryMapIdx) 方法,更新父节点的节点值,因为子节点内存块被释放,那么父节点可分配内存大小变了。
        private void updateParentsFree(int id) {
            int logChild = depth(id) + 1;
            while (id > 1) {
                // 父节点
                int parentId = id >>> 1;
                // 左子节点
                byte val1 = value(id);
                // 右子节点
                byte val2 = value(id ^ 1);
                // 子节点的标准值
                logChild -= 1; // in first iteration equals log, subsequently reduce 1 from logChild as we traverse up
    
                if (val1 == logChild && val2 == logChild) {
                    // 左子节点和右子节点都是标准值,说明两个子节点都是空闲的
                    // 那么父节点的容量就是两倍
                    setValue(parentId, (byte) (logChild - 1));
                } else {
                    // 取左子节点和右子节点中较大的容量,也是val比较小的值。
                    byte val = val1 < val2 ? val1 : val2;
                    setValue(parentId, val);
                }
    
                id = parentId;
            }
        }
    
    • 如果左子节点和右子节点都是标准值,说明两个子节点都是空闲的,么父节点的容量就是两倍。
    • 如果不是,那么就取左子节点和右子节点中较小值,即可分配内存大小更大。

    3.3 PoolArena

    3.3.1 allocate 方法

        PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
            PooledByteBuf<T> buf = newByteBuf(maxCapacity);
            allocate(cache, buf, reqCapacity);
            return buf;
        }
    

    先通过 newByteBuf(maxCapacity) 方法,创建对应类型的池化缓存区PooledByteBuf,然后调用allocate(cache, buf, reqCapacity) 方法进行内存分配。

      private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
            final int normCapacity = normalizeCapacity(reqCapacity);
            if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
                // 如果容量小于 pageSize 值,即是 Tiny 或者 Small类型
                int tableIdx;
                PoolSubpage<T>[] table;
                boolean tiny = isTiny(normCapacity);
                if (tiny) { // < 512
                    if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                        // 先从当前线程缓存中获取,如果能得到,直接返回
                        return;
                    }
                    // 得到Tiny 类型索引,因为 Tiny 类型是每个相隔16,所以索引就是 normCapacity >>> 4
                    tableIdx = tinyIdx(normCapacity);
                    table = tinySubpagePools;
                } else {
                    if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                        // 先从当前线程缓存中获取,如果能得到,直接返回
                        return;
                    }
                    // 得到Small 类型索引,每个Small 类型是成倍扩展的,即 512 1024 2048 4096, 小于 pageSize 的大小
                    tableIdx = smallIdx(normCapacity);
                    table = smallSubpagePools;
                }
    
                // 得到符合容量尺寸的头PoolSubpage
                final PoolSubpage<T> head = table[tableIdx];
    
                /**
                 * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
                 * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
                 */
                synchronized (head) {
                    // 使用 synchronized 防止并发
                    final PoolSubpage<T> s = head.next;
                    if (s != head) {
                        // 有这种尺寸容量的 PoolSubpage, 容量必须是 normCapacity
                        assert s.doNotDestroy && s.elemSize == normCapacity;
                        // 从 PoolSubpage 中分配内存
                        long handle = s.allocate();
                        assert handle >= 0;
                        // 将分配的
                        s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
                        incTinySmallAllocation(tiny);
                        return;
                    }
                }
                synchronized (this) {
                    // 运行到这里,表示目前没有这个尺寸的 PoolSubpage,
                    // 那么从PoolChunk 中分配
                    allocateNormal(buf, reqCapacity, normCapacity);
                }
    
                // 增加计数
                incTinySmallAllocation(tiny);
                return;
            }
            if (normCapacity <= chunkSize) {
                if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                synchronized (this) {
                    allocateNormal(buf, reqCapacity, normCapacity);
                    ++allocationsNormal;
                }
            } else {
                // Huge类型的内存块肯定不会缓存在当前线程中,直接调用 allocateHuge 分配
                allocateHuge(buf, reqCapacity);
            }
        }
    

    这个方法看起来很复杂,但是其实逻辑很简单:

    • 通过 normalizeCapacity(reqCapacity) 方法来将用户申请内存大小转成规格化大小,例如 18 就变成 32; 990 就变成 1024
    • 根据 Tiny,Small,NormalHuge 不同类型,进行不同内存块分配。
    • 对于 TinySmall 规格类型:
      • 先从线程缓存PoolThreadCache 中获取,如果获取到,就直接返回,获取不到就继续下面步骤。
      • 再通过tinySubpagePoolssmallSubpagePools 进行快速分配内存块,如果 tinySubpagePools 中有对应的 PoolSubpage,那么就直接分配,如果没有,那么继续下面步骤。
      • 通过 allocateNormal(buf, reqCapacity, normCapacity) 方法进行内存块的分配。
    • 对于Normal 规格类型:
      • 先从线程缓存PoolThreadCache 中获取,如果获取到,就直接返回,获取不到就继续下面步骤。
      • 通过 allocateNormal(buf, reqCapacity, normCapacity) 方法进行内存块的分配。
    • 对于Huge 规格类型:

      这种规格是没有线程缓存的,所以直接通过 allocateHuge(buf, reqCapacity) 方法进行内存块的分配。

    3.3.2 normalizeCapacity 方法

        int normalizeCapacity(int reqCapacity) {
            if (reqCapacity < 0) {
                throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
            }
    
            // 大于 chunkSize,表示是一个 Huge 类型
            if (reqCapacity >= chunkSize) {
                // 是否需要进行内存对齐
                return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
            }
    
            if (!isTiny(reqCapacity)) { // >= 512
                // Doubled
    
                // 得到与 reqCapacity 最近的 2的幂数,
                // 如果 reqCapacity 就是2的幂数,那么就是它自己
                int normalizedCapacity = reqCapacity;
                // 先减一,防止 reqCapacity 就是 2的幂数,导致结果值是 reqCapacity 的两步
                normalizedCapacity --;
                normalizedCapacity |= normalizedCapacity >>>  1;
                normalizedCapacity |= normalizedCapacity >>>  2;
                normalizedCapacity |= normalizedCapacity >>>  4;
                normalizedCapacity |= normalizedCapacity >>>  8;
                normalizedCapacity |= normalizedCapacity >>> 16;
                normalizedCapacity ++;
    
                if (normalizedCapacity < 0) {
                    normalizedCapacity >>>= 1;
                }
                assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;
    
                //
                return normalizedCapacity;
            }
    
            // 小于 512 数,Tiny 类型的数,是否需要进行内存对齐
            if (directMemoryCacheAlignment > 0) {
                return alignCapacity(reqCapacity);
            }
    
            // 能够被 16 整除,那么就直接返回
            if ((reqCapacity & 15) == 0) {
                return reqCapacity;
            }
    
            // 结果值是 16 的倍数
            return (reqCapacity & ~15) + 16;
        }
    
    • 对于 Huge 规格类型,只考虑是否需要进行内存对齐,即需要的内存块大小必须是某个数倍数;这个数必须是 2 的幂数。例如内存对齐数directMemoryCacheAlignment16,那么内存块大小必须能整除 16,也就是低四位都是 0
    • SmallNormal 规格类型,它们相隔都是1倍,那么只需要寻找最近的 2 的幂数就行了。
    • Tiny 规格类型,最小值是16,因此只需要16的倍数就可以了。

    3.3.3 allocateNormal 方法

        private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
            // 从 PoolChunkList 中分配内存
            if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
                q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
                q075.allocate(buf, reqCapacity, normCapacity)) {
                return;
            }
    
            // 如果没有从 PoolChunkList 中分配内存,
            // 那么就要新创建 PoolChunk 对象,
            // 默认情况下 pageSize=8192 maxOrder=11 pageShifts=13 chunkSize=16777216
            PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
            boolean success = c.allocate(buf, reqCapacity, normCapacity);
            assert success;
            // 将新创建的 PoolChunk 添加到 qInit 中
            qInit.add(c);
        }
    
    • 先从 PoolChunkList 中寻找可用 PoolChunk 进行内存分配。
    • 找不到,那么就创建新的 PoolChunk 实例。
    • 通过 PoolChunkallocate(buf, reqCapacity, normCapacity) 方法进行内存分配;这个上面已经介绍。
    • 最后将这个 PoolChunk 添加到 PoolChunkList 中。

    3.3.4 allocateHuge 方法

        private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
            // 创建 Huge 类型的PoolChunk,不会放在内存池中 unpooled = true
            PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
            activeBytesHuge.add(chunk.chunkSize());
            buf.initUnpooled(chunk, reqCapacity);
            allocationsHuge.increment();
        }
    

    Huge 规格的内存块是不会进入内存池的。

    3.3.5 释放内存块

        void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
            if (chunk.unpooled) {
                // 非池中内存块,直接回收
                int size = chunk.chunkSize();
                destroyChunk(chunk);
                activeBytesHuge.add(-size);
                deallocationsHuge.increment();
            } else {
                SizeClass sizeClass = sizeClass(normCapacity);
                // 根据不同内存规格,将回收的内存块优先放入线程缓存中
                if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
                    // cached so not free it.
                    return;
                }
    
                // 线程缓存已经满了,那么就释放内存块
                freeChunk(chunk, handle, sizeClass, nioBuffer);
            }
        }
    
     void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass, ByteBuffer nioBuffer) {
            final boolean destroyChunk;
            synchronized (this) {
                switch (sizeClass) {
                case Normal:
                    ++deallocationsNormal;
                    break;
                case Small:
                    ++deallocationsSmall;
                    break;
                case Tiny:
                    ++deallocationsTiny;
                    break;
                default:
                    throw new Error();
                }
                // 调用 PoolChunkList 方法进行内存块释放,需要改变 PoolChunkList 中的一些值
                destroyChunk = !chunk.parent.free(chunk, handle, nioBuffer);
            }
            if (destroyChunk) {
                // destroyChunk not need to be called while holding the synchronized lock.
                destroyChunk(chunk);
            }
        }
    
    • Huge 规格类型的内存块直接释放。
    • Tiny,SmallNormal 规格类型的内存块,优先放入线程缓存中,如果对应的线程缓存已经满了,那么才释放。

    相关文章

      网友评论

        本文标题:Netty源码_内存管理(jemalloc3)

        本文链接:https://www.haomeiwen.com/subject/kcoptrtx.html