美文网首页
netty源码分析(六) - ByteBuf - 5alloca

netty源码分析(六) - ByteBuf - 5alloca

作者: 进击的蚂蚁zzzliu | 来源:发表于2020-10-19 20:22 被阅读0次

    概述

    通过前面两节的铺垫,对netty内存池结构有了大概了解,本节详细分析下具体的内存分配过程(其中涉及缓存MemoryRegionCache和对象池RECYCLER的部分先忽略,后面单独分析)

    1. PooledByteBufAllocator入口

    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        PoolThreadCache cache = threadCache.get();
        PoolArena<ByteBuffer> directArena = cache.directArena;
        final ByteBuf buf;
        if (directArena != null) {
            //调用directArena  allocate
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }
        return toLeakAwareBuffer(buf);
    }
    

    入口为通过PooledByteBufAllocator创建byteBuf,前面章节已经分析过,这里不再分析

    2. PoolArena.allocate

    PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
        //1. 模板方法创建ByteBuf对象
        PooledByteBuf<T> buf = newByteBuf(maxCapacity);
        //2. 基于创建的ByteBuf进行分配
        allocate(cache, buf, reqCapacity);
        return buf;
    }
    

    2.1. 模板方法创建ByteBuf对象

    protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
        if (HAS_UNSAFE) {
            return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
        } else {
            return PooledDirectByteBuf.newInstance(maxCapacity);
        }
    }
    
    static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
        //从对象池中获取进行复用
        PooledUnsafeDirectByteBuf buf = RECYCLER.get();
        buf.reuse(maxCapacity);
        return buf;
    }
    

    主要是通过RECYCLER对象池对ByteBuf进行复用,没有可以复用的对象会创建新的ByteBuf并放到RECYCLER中(关于RECYCLER后续详细分析)


    2.2. 基于创建的ByteBuf进行分配

    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
        //1. 规范申请内存
        final int normCapacity = normalizeCapacity(reqCapacity);
        //是否tiny或small规格,判断capacity < pageSize
        if (isTinyOrSmall(normCapacity)) { // normCapacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;
            //判断normCapacity < 512
            boolean tiny = isTiny(normCapacity);
            if (tiny) { // < 512
                //从缓存中分配成功直接返回,否则继续去分配
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                //获取tinySubpagePools中下标即多大规格的tiny,即分配容量 / 16B
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                //从缓存中分配成功直接返回,否则继续去分配
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                //获取smallSubpagePools中下标即多大规格的small
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }
            //获取到对应规格tiny或small链表头节点head
            final PoolSubpage<T> head = table[tableIdx];
    
            /**
             * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
             * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
             */
            //有两个地方可以操作链表(PoolChunk#allocateSubpage(int)和PoolChunk#free(long))即head节点可能被多个线程同时修改,需要进行同步操作
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                //不是头结点说明存在相同规格且有空闲内存的PoolSubpage在链表上可以被分配,不需要新创建page
                //直接拿出来分配,头结点初始化的时候next和prev指向自己
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    //4. 从poolSubpage上分配
                    long handle = s.allocate();
                    assert handle >= 0;
                    //5. 分配后重新初始化当前chunk
                    s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
                    //tiny或small已分配个数加一
                    incTinySmallAllocation(tiny);
                    return;
                }
            }
            //可能会多线程同时分配,ps没有整个方法同步,减小锁范围
            synchronized (this) {
                //2. 普通分配(先分配page)
                allocateNormal(buf, reqCapacity, normCapacity, cache);
            }
            //tiny或small已分配个数加一
            incTinySmallAllocation(tiny);
            return;
        }
        //大于pagesize小于chunksize,直接以page为单位进行分配
        if (normCapacity <= chunkSize) {
            //通过缓存分配
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            synchronized (this) {
                //2. 普通分配(先分配page)
                allocateNormal(buf, reqCapacity, normCapacity, cache);
                //normal已分配个数加一
                ++allocationsNormal;
            }
        } else {
            // Huge allocations are never served via the cache so just call allocateHuge
            //3. huge的分配不会通过缓存进行,只需调用allocateHuge
            allocateHuge(buf, reqCapacity);
        }
    }
    

    大致流程:

    1. normalizeCapacity获取规范化后的内存大小,向上取最近的跟tinySubpagePools或smallSubpagePools或pagesize整数倍相匹配的大小(例如:1 > 16,17 > 32,1.5K > 2K,9K > 16K)

    2. 根据不同normCapacity采用不同分配策略:

      • normCapacity =< pageSize:获取到对应规格tiny或small链表头节点head,如果存在相同规格且有空闲内存的PoolSubpage在链表上可以被分配,不需要新创建page,直接在该page上分配long handle = s.allocate();,分配后重新初始化当前chunks.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);;如果不存在则进行常规分配allocateNormal(buf, reqCapacity, normCapacity, cache);
      • normCapacity =< chunkSize:直接进行常规分配allocateNormal(buf, reqCapacity, normCapacity, cache);
      • normCapacity > chunkSize:huge的分配不通过缓存进行,直接分配unpooled内存allocateHuge(buf, reqCapacity);

    先来分析allocateNormal

    private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity, PoolThreadCache threadCache) {
        //1. 尝试在现有的chunk上分配
        if (q050.allocate(buf, reqCapacity, normCapacity, threadCache) ||
            q025.allocate(buf, reqCapacity, normCapacity, threadCache) ||
            q000.allocate(buf, reqCapacity, normCapacity, threadCache) ||
            qInit.allocate(buf, reqCapacity, normCapacity, threadCache) ||
            q075.allocate(buf, reqCapacity, normCapacity, threadCache)) {
            return;
        }
    
        //2. 创建一个chunk             8K         11          13            8K
        PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
        //3. 在创建的chunk上分配
        boolean success = c.allocate(buf, reqCapacity, normCapacity, threadCache);
        assert success;
        //4. 新chunk添加到chunkList上
        qInit.add(c);
    }
    
    1. 按照使用率从小到大顺序尝试在PoolChunkList上现有的chunk上分配,分配成功直接返回
    2. 现有chunk上分配失败,创建一个新的chunk
    3. 在新创建的chunk上分配
    4. 新chunk添加到chunkList上

    3. PoolChunkList.allocate

    boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity, PoolThreadCache threadCache) {
        //maxCapacity:该PoolChunkList上chunk最大可用容量
        if (normCapacity > maxCapacity) {
            // Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can
            // be handled by the PoolChunks that are contained in this PoolChunkList.
            return false;
        }
        //遍历PoolChunk
        for (PoolChunk<T> cur = head; cur != null; cur = cur.next) {
            //不为空就尝试分配,分配失败接着尝试在下一个chunk中分配
            if (cur.allocate(buf, reqCapacity, normCapacity, threadCache)) {
                //分配成功并且空闲率小于最小空闲率,则从当前链表移除,放到nextList中
                if (cur.freeBytes <= freeMinThreshold) {
                    remove(cur);
                    nextList.add(cur);
                }
                return true;
            }
        }
        return false;
    }
    
    1. 遍历当前PoolChunkList上节点PoolChunk
    2. 尝试在chunk上进行分配cur.allocate(buf, reqCapacity, normCapacity, threadCache),分配成功直接return true;
    3. 分配成功,如果当前chunk空闲率小于最小空闲率,则从当前链表移除,放到nextList(使用率更高的list)中

    4. PoolChunk.allocate

    boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity, PoolThreadCache threadCache) {
        //一个唯一的值,根据叶子节点id,位图索引生成
        final long handle;
        // normCapacity >= pageSize
        if ((normCapacity & subpageOverflowMask) != 0) {
            //以pageSize为单位进行分配
            handle =  allocateRun(normCapacity);
        } else {
            //以siny或small为单位进行分配
            handle = allocateSubpage(normCapacity);
        }
        if (handle < 0) {
            return false;
        }
        ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null;
        //初始化PooledByteBuf
        initBuf(buf, nioBuffer, handle, reqCapacity, threadCache);
        return true;
    }
    
    1. normCapacity >= pageSize以页为单位进行分配handle = allocateRun(normCapacity);返回分配页的索引
    2. normCapacity < pageSize以siny或small为单位进行分配handle = allocateSubpage(normCapacity);,先分配page然后在page上分配tiny或small,返回叶子节点id和位图索引生成的为一值
    3. 初始化PooledByteBufinitBuf(buf, nioBuffer, handle, reqCapacity, threadCache);

    private long allocateRun(int normCapacity) {
        //确定需要在二叉树的 d 层开始节点匹配   pageShifts默认13
        int d = maxOrder - (log2(normCapacity) - pageShifts);
        //在二叉树中进行节点匹配
        int id = allocateNode(d);
        if (id < 0) {
            return id;
        }
        freeBytes -= runLength(id);
        return id;
    }
    
    1. 计算需要在二叉树哪一层进行节点分配,这里的容量都是pageSize及以上的,log2(normCapacity) - pageShifts 表示容量是页大小的2的多少倍,最大深度索引再减去这个,刚好是定位到页大小倍数的深度索引。例如:normCapacity=8192*2即两页大小2的14次方,则d = 10,即在第10层分配
    2. 在二叉树d层中进行节点匹配int id = allocateNode(d);,返回分配节点索引

    private int allocateNode(int d) {
        //从内存映射索引为1的开始即根节点开始
        int id = 1;
        //用于比较id所在深度索引是否小于d
        int initial = - (1 << d);
        //获取id内存映射的深度值,memoryMap[id],0或1或12
        byte val = value(id);
        //树节点完全分配val=12, 部分分配val=1,未分配val=0
        if (val > d) {
            return -1;
        }
        //深度优先遍历完所有深度索引小于d的可用的子结点,只有到id的深度索引是d的时候才会结束,
        //先找出对应的深度,然后从左到右看是否有内存可分配。
        while (val < d || (id & initial) == 0) {
            //左节点
            id <<= 1;
            //左节点对应深度
            val = value(id);
            //如果大于深度索引 即左节点不能用了
            if (val > d) {
                //异或位运算,获取右结点
                id ^= 1;
                //再取出对应深度索引值
                val = value(id);
            }
        }
        byte value = value(id);
        assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
                value, id & initial, d);
        //memoryMap[id] = 12,即设置为不可用
        setValue(id, unusable);
        //更新父节点值
        updateParentsAlloc(id);
        //返回内存映射索引
        return id;
    }
    
    1. 深度优先遍历所有深度索引小于d的可用的子结点,只有id的深度索引是d的时候才会结束,找出对应的深度,然后判断左节点是否可用,不可用的话再取右节点,最终找到分配节点的索引即id
    2. 更新id节点树深度为12即标记为不可用
    3. 遍历父节点,更新父节点深度值为左右子节点中较小的那个

    private void updateParentsAlloc(int id) {
        //从id开始直到跟节点
        while (id > 1) {
            //获取父节点
            int parentId = id >>> 1;
            //获id节点的深度索引值
            byte val1 = value(id);
            //获取另一个节点的深度索引值,即是左节点就获取右节点,是右节点就获取左节点
            byte val2 = value(id ^ 1);
            //取最小的
            byte val = val1 < val2 ? val1 : val2;
            //设置父节点的深度索引值为子节点最小的那一个
            setValue(parentId, val);
            //继续遍历父节点
            id = parentId;
        }
    }
    
    1. 遍历从id节点开始直到根节点
    2. 设置父节点的深度索引值为左右子节点较小的那一个

    private long allocateSubpage(int normCapacity) {
        //找到当前规格内存normCapacity在subpagePools数组中索引,进而获取该索引内head节点
        PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
        //subpage只能分配在叶子节点
        int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
        synchronized (head) {
            //结点分配
            int id = allocateNode(d);
            if (id < 0) {
                return id;
            }
    
            //this.subpages;   初始化PoolChunk时创建得一个大小为2048得数组,分别对应二叉树中2048个叶子节点
            final PoolSubpage<T>[] subpages = this.subpages;
            final int pageSize = this.pageSize;
            freeBytes -= pageSize;
            //id节点(在二叉树中节点)获取到subpages中索引
            int subpageIdx = subpageIdx(id);
            PoolSubpage<T> subpage = subpages[subpageIdx];
            if (subpage == null) {
                //创建一个subpage并放到subpages中
                subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
                subpages[subpageIdx] = subpage;
            } else {
                //什么时候subpage不为null???
                subpage.init(head, normCapacity);
            }
            //在page上进行分配
            return subpage.allocate();
        }
    }
    
    1. PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);找到当前规格内存normCapacity在subpagePools数组中索引,进而获取该索引内head节点。例如:normCapacity=32属于tiny规格,32 / 16 = 2,即取tinySubpagePools[2]中head节点
    2. int id = allocateNode(d);进行节点分配,上面已经分析过
    3. 获取PoolChunk中subpages数组中subpage(一般情况下为null),创建一个subpage并放到subpages中
    4. subpage.allocate();在subpage上进行分配

    5. PoolSubpage.allocate

    long allocate() {
        if (elemSize == 0) {
            return toHandle(0);
        }
        //numAvail:可用段数量,doNotDestroy:未被销毁,free时改为true
        if (numAvail == 0 || !doNotDestroy) {
            return -1;
        }
        //找到当前page中可分配内存段的bitmapIdx
        //bitmapIdx高26位表示在bitmap数组中位置索引,低6位表示64位long类型内位数索引(2^6 = 64)
        final int bitmapIdx = getNextAvail();
        //bitmap数组下标
        int q = bitmapIdx >>> 6;
        //long数种哪一位需要修改为1
        int r = bitmapIdx & 63;
        assert (bitmap[q] >>> r & 1) == 0;
        //将对应位置的q设置为1
        bitmap[q] |= 1L << r;
        //可用段为0后从链表中删除
        if (-- numAvail == 0) {
            removeFromPool();
        }
        //把两个int(32位)的索引 bitmapIdx 和 memoryMapIdx,合并成一个long类型
        return toHandle(bitmapIdx);
    }
    
    1. final int bitmapIdx = getNextAvail();找到当前page中可分配内存段的bitmapIdx
    2. bitmap[q] |= 1L << r;通过bitmapIdx定位到具体分配了哪个段并把代表该段的索引置为1表示不可用,bitmapIdx 32位int类型高26位表示在bitmap数组中位置索引,低6位表示64位long类型内位数索引(2^6 = 64);
    3. removeFromPool();可用段为0后从链表中删除该page
    4. toHandle(bitmapIdx);把两个int(32位)的索引 bitmapIdx 和 memoryMapIdx,合并成一个long类型返回

    private int getNextAvail() {
        //首次获取时nextAvail为0置为 -1,直接返回0,不需要通过findNextAvail()寻找
        //以后每次都需要通过findNextAvail()寻找???就为了少执行一次findNextAvail()吗?
        int nextAvail = this.nextAvail;
        if (nextAvail >= 0) {
            this.nextAvail = -1;
            return nextAvail;
        }
        return findNextAvail();
    }
    

    主要通过判断nextAvail决定是否需要通过findNextAvail();进行寻找,其实就是只有第一次执行时可以直接返回,后面每一次都要find,从这也可以看出netty代码对性能的最求简直丧心病狂


    private int findNextAvail() {
        //bitmap数组8个long类型节点
        final long[] bitmap = this.bitmap;
        //bitmapLength表示bitmap可用长度,例如16B时需要8个long,bitmapLength=8;32B时只需要4个long,bitmapLength=4
        final int bitmapLength = this.bitmapLength;
        for (int i = 0; i < bitmapLength; i ++) {
            //bits: bitmap数组i位置的long类型数
            long bits = bitmap[i];
            //bits 不是每一位都全是1,表示还有为0的可用段
            if (~bits != 0) {
                return findNextAvail0(i, bits);
            }
        }
        return -1;
    }
    
    1. bitmap: 8个long类型组成的位图数组;bitmapLength:bitmap的有效长度,不一定是8,例如16B时需要8个long,bitmapLength=8;32B时只需要4个long,bitmapLength=4
    2. 遍历bitmap中每一个long类型,判断long是不是全1,不是的话就说明该long类型表示的位图中还有未分配的段
    3. 有可用段时通过findNextAvail0(i, bits);找到索引直接返回

        //该page包含得段数量
        final int maxNumElems = this.maxNumElems;
        //要分配的起始索引,根据第i个位图,如果是0表示第一个long类型表示即0-63进行分配 1表示第二个long类型表示64-127分配
        //例如:i = 00000000000000000000000000000000; baseVal = 00000000000000000000000000000000      十进制:0
        //     i = 00000000000000000000000000000001; baseVal = 00000000000000000000000001000000      十进制:64
        //     i = 00000000000000000000000000000010; baseVal = 00000000000000000000000010000000      十进制:128
        final int baseVal = i << 6;
    
        for (int j = 0; j < 64; j ++) {
            //取出最低位,为0表示可用
            if ((bits & 1) == 0) {
                //相当于baseVal + j, baseVal相当于由于bitmap中索引跳过的值,加上位置序号j后的新的索引值
                //相当于baseVal是十位,j是个位的意思
                int val = baseVal | j;
                if (val < maxNumElems) {
                    return val;
                } else {
                    break;
                }
            }
            //位图右移,即从低位往高位
            bits >>>= 1;
        }
        return -1;
    }
    
    1. maxNumElems: page包含的段数量(pagesize/normalizeCapacity);
    2. final int baseVal = i << 6;:i是在位图数组中索引,左移6位相当于乘以64,可以理解为64进制的一位
    3. 从低位到高位遍历bits(bits >>>= 1;)的每一位,不为0表示该位不可用,继续查看下一位
    4. 为0表示可用,int val = baseVal | j;相当于baseVal + j,表示具体的段索引,小于maxNumElems直接返回,否则返回-1表示未找到可用段
    5. 举例说明:一个段大小为16B的page
      5.1 第1次分配16B大小
      • maxNumElems包含段数量为8192 / 16 = 512
      • 因为是第1次分配,位图数组中所有long都是0,因此 i = 0,baseVal = 0
      • 从低位到高位遍历第1个long类型的每一位,最低位就是0表示可用,val = 0 + 0 < 512,直接返回0
        5.2 第65次分配16B大小
      • maxNumElems包含段数量为8192 / 16 = 512
      • 1个long类型表示64位,因此bitmap中第一个long已经全部位数为1表示不可用,开始使用第2个long类型,因此 i = 1,baseVal = 64
      • 从低位到高位遍历第2个long类型的每一位,最低位就是0表示可用,val = 64 + 0 < 512,直接返回64

    private long toHandle(int bitmapIdx) {
        //把两个int(32位)的索引 bitmapIdx 和 memoryMapIdx,合并成一个long类型
        //加 0x4000000000000000L 是为了让高32位不全是0
        return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
    }
    

    可以想一下如何把两个不同类型索引或两个int类型值同时返回?netty这里把两个int(32位)的索引 bitmapIdx 和 memoryMapIdx,合并成一个long类型,加 0x4000000000000000L 是为了让高32位不全是0,后面initBuf会用到


    PoolSubpage.allocate分析完了,主流程中还剩下一个PoolChunk.allocate中的initBuf方法,下面接着分析

    void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity, PoolThreadCache threadCache) {
        //(int) handle long类型直接强转成int类型,高32位被丢弃,返回低32位即memoryMapIdx
        int memoryMapIdx = memoryMapIdx(handle);
        //(int) (handle >>> Integer.SIZE);  又移32位,返回高32位即bitmapIdx
        int bitmapIdx = bitmapIdx(handle);
        //allocateRun以page为单位进行分配,bitmapIdx为0;
        if (bitmapIdx == 0) {
            byte val = value(memoryMapIdx);
            assert val == unusable : String.valueOf(val);
            buf.init(this, nioBuffer, handle, runOffset(memoryMapIdx) + offset,
                    reqCapacity, runLength(memoryMapIdx), threadCache);
        } else {
            initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx, reqCapacity, threadCache);
        }
    }
    
    1. memoryMapIdx(handle);:long类型直接强转成int类型,高32位被丢弃,返回低32位即memoryMapIdx
    2. bitmapIdx(handle);:右移32位,返回高32位即bitmapIdx
    3. allocateRun以page为单位进行分配,bitmapIdx为0,内存偏移量只精确到page级别
    4. 重点看下runOffset(memoryMapIdx) + offset其中runOffset(memoryMapIdx)表示page在chunk中偏移量pagesize的倍数,offset表示缓存行偏移量,通常为0
    5. allocateSubpage时高32位也可能为0即第一次分配时页上bitmapIdx就是为0,为了放置跟allocateRun混淆,前面在计算handle时加了个0x4000000000000000L。在subpage上再分配段,内存偏移量只精确到page中的段

    private void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer,
                                    long handle, int bitmapIdx, int reqCapacity, PoolThreadCache threadCache) {
        assert bitmapIdx != 0;
    
        int memoryMapIdx = memoryMapIdx(handle);
        //拿到具体的subpage
        PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
        assert subpage.doNotDestroy;
        assert reqCapacity <= subpage.elemSize;
        //初始化PooledByteBuf
        //0x3FFFFFFF跟0x4000000000000000L对应,相当于还原真是的bitmapIdx
        buf.init(
            this, nioBuffer, handle,
            runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
                reqCapacity, subpage.elemSize, threadCache);
    }
    
    1. PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];subpages数组中拿到具体的subpage
    2. runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset分配的段相对整个chunk的偏移量,相比上面以页为单位进行分配增加了页内偏移量(bitmapIdx & 0x3FFFFFFF) * subpage.elemSize,其中0x3FFFFFFF跟之前0x4000000000000000L对应,这里目的时解出原始的bitmapIdx

    private void init0(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
        assert handle >= 0;
        assert chunk != null;
        //该byteBuf属于哪个PoolChunk
        this.chunk = chunk;
        //该byteBuf申请的jdk底层数据结构,heap类型就是byte[],direct类型就是DirectByteBuffer
        memory = chunk.memory;
        //jdk nio 的ByteBuffer
        tmpNioBuf = nioBuffer;
        //分配器
        allocator = chunk.arena.parent;
        //PoolThreadCache
        this.cache = cache;
        //bitmapIdx + memoryMapIdx
        this.handle = handle;
        //缓存行偏移
        this.offset = offset;
        //申请的内存大小reqCapacity
        this.length = length;
        //规范化后的内存大小normalCapacity
        this.maxLength = maxLength;
    }
    

    到此allocate主流程的一个主分支分析完了

    相关文章

      网友评论

          本文标题:netty源码分析(六) - ByteBuf - 5alloca

          本文链接:https://www.haomeiwen.com/subject/leikmktx.html