美文网首页
store模块阅读1:TransientStorePool

store模块阅读1:TransientStorePool

作者: 赤子心_d709 | 来源:发表于2017-09-25 19:31 被阅读317次

说明

类名意为短暂的存储池

用于池化管理多个ByteBuffer对象,进行借与还的操作

池化管理可以参考我的文集 commons-pool2

源码

public class TransientStorePool {
    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    private final int poolSize;//池的大小有多少,默认5
    private final int fileSize;//每个commitLog文件大小,默认1G
    private final Deque<ByteBuffer> availableBuffers;//双端队列记录可用的buffers
    private final MessageStoreConfig storeConfig;//存储配置

    public TransientStorePool(final MessageStoreConfig storeConfig) {
        this.storeConfig = storeConfig;
        this.poolSize = storeConfig.getTransientStorePoolSize();
        this.fileSize = storeConfig.getMapedFileSizeCommitLog();
        this.availableBuffers = new ConcurrentLinkedDeque<>();
    }

    /**
     * It's a heavy init method.
     * 初始化函数,分配poolSize个fileSize的堆外空间
     */
    public void init() {
        for (int i = 0; i < poolSize; i++) {
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);//虚拟机外内存中分配的空间

            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));

            availableBuffers.offer(byteBuffer);
        }
    }

    //销毁availableBuffers中所有buffer数据
    public void destroy() {
        for (ByteBuffer byteBuffer : availableBuffers) {
            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize));
        }
    }

    //用完了之后,返还一个buffer,对buffer数据进行清理
    public void returnBuffer(ByteBuffer byteBuffer) {
        byteBuffer.position(0);
        byteBuffer.limit(fileSize);
        this.availableBuffers.offerFirst(byteBuffer);
    }

    //借一个buffer出去
    public ByteBuffer borrowBuffer() {
        ByteBuffer buffer = availableBuffers.pollFirst();
        if (availableBuffers.size() < poolSize * 0.4) {
            log.warn("TransientStorePool only remain {} sheets.", availableBuffers.size());
        }
        return buffer;
    }

    //剩余可用的buffers数量
    public int remainBufferNumbs() {
        if (storeConfig.isTransientStorePoolEnable()) {
            return availableBuffers.size();
        }
        return Integer.MAX_VALUE;
    }
}

在知道什么是池化管理的技术基础上,这个代码就比较好理解
注意几个点

1.涉及Pointer和LibC,NativeLong的东西,没有深入了解
2.分配的ByteBuffer是堆外的

问题

availableBuffers为什么需要时Deque

单向队列就够用哦

相关文章

网友评论

      本文标题:store模块阅读1:TransientStorePool

      本文链接:https://www.haomeiwen.com/subject/ycidextx.html