美文网首页
kafka 内存管理 BufferPool

kafka 内存管理 BufferPool

作者: 不存在的里皮 | 来源:发表于2020-06-10 10:38 被阅读0次

    闫文亮304 Kafka-生产者-BufferPool

    先看注释

    /**
     * A pool of ByteBuffers kept under a given memory limit. This class is fairly specific to the needs of the producer. In
     * particular it has the following properties:
     * <ol>
     * <li>There is a special "poolable size" and buffers of this size are kept in a free list and recycled
     * <li>It is fair. That is all memory is given to the longest waiting thread until it has sufficient memory. This
     * prevents starvation or deadlock when a thread asks for a large chunk of memory and needs to block until multiple
     * buffers are deallocated.
     * </ol>
     */
    

    结合代码可知,BufferPool负责ByteBuffer的申请和释放。
    BufferPool会维持一组大小为poolableSize的ByteBuffer,便于快速申请/归还这个大小的ByteBuffer。该机制是由free空闲链表维持的。
    对于非poolableSize的ByteBuffer,其申请和释放都委托给JVM
    BufferPool的内存申请是"公平的",永远优先满足先申请的线程,再满足后申请的。这样能防止死锁和饥饿。该机制是由waiters条件队列保障的。

    内存管理

    BufferPool将内存视为三个部分:

    • 已被分配出去的ByteBuffer。它们的大小可能各异,可以是或不是poolableSize
    • 维持在free空闲链表的ByteBuffer(free会维持它们的引用)。每一个的大小都是poolableSize。
      • 申请这个大小的ByteBuffer时,从free中取出即可
      • 归还这个大小的ByteBuffer时,放回free
    • 未分配空闲内存[1]。这块内存在JVM中,是空闲的。用一个数字nonPooledAvailableMemory代表。它的回收是交给gc的。
      • 申请不是poolableSize大小的ByteBuffer时,调用ByteBuffer.allocate(size)
      • 归还不是poolableSize大小的ByteBuffer时,调用者解除对该ByteBuffer的引用,然后nonPooledAvailableMemory增加这个大小即可,其回收交给gc

    waiters条件队列

    维持了一个Condition队列,每个线程在申请内存不足时,会阻塞于生成的一个Condition并进入此队。

    private final Deque<Condition> waiters;
    

    比如此处,在allocate方法中,没有足够内存:


    所以,队列中每一个Condition代表一个因内存不足而阻塞的线程,当有ByteBuffer释放时,取出队首的Condition,调用signal将对应线程唤醒即可。

    allocate

    根据要分配的内存大小有不同的行为。

    如果要分配的内存size等于poolableSize,从free取出一块即可。如果free没有就等待。

    如果size不等于poolableSize,需要从非池化内存分配:

    • 如果有足够的空闲内存,也就是空闲非池化内存+空闲链表总和 >= size[2],那就能一次性分配。
      1. 先逐个释放free中的ByteBuffer,直到有足够的nonPooledAvailableMemory为止
      2. 直接分配size大小的内存
    • 如果没有足够空闲内存,就需要边等待,边分配一部分。
      1. 执行以下循环,直到accumulated>=size:
        1. 等待一定时间(在等待期间,可能有新的内存块插入free,也可能有新的非池化内存,使nonPooledAvailableMemory增加),若超时就抛出内存不足异常,否则返回时未超时,说明有新的空闲内存了。
        2. 逐个释放free中的ByteBuffer,直到有足够nonPooledAvailableMemory为止。期间会扣除nonPooledAvailableMemory的份额,加到accumulated上。但我们并不申请内存,只是把这块份额预留出来。
      2. 循环退出时,要么预留出了足够的内存,申请即可;要么是等待超时,则归还预留的份额(这里和代码的理解不同)


    对归还的份额有疑问:


    归还size?

    deallocate

    根据要释放的内存大小有不同的行为。

    • 如果大小为poolableSize,直接挂进size即可
    • 如果不是,把大小增加到nonPooledAvailableMemory。调用者解除该ByteBuffer的引用,然后交给gc回收即可。


    对代码关于非池化内存的回收行为有疑问:



    1. 代表所有剩下的空闲内存,totalMemory中,没有维持在free内的整块内存,都是未分配空闲内存。

    2. nonPooledAvailableMemory + freeListSize >= size

    相关文章

      网友评论

          本文标题:kafka 内存管理 BufferPool

          本文链接:https://www.haomeiwen.com/subject/bmwetktx.html