美文网首页
RocketMQ源码解读之Store

RocketMQ源码解读之Store

作者: 娆疆_蚩梦 | 来源:发表于2021-06-08 17:43 被阅读0次

    没有目的,就做不成任何事情;目的渺小,就做不成任何大事。

                                                                                        ——狄德罗

    大纲

    图示

    本节思考:

        >当topic数量增多到100+时,kafka的单个broker的TPS降低了1个数量级,而RocketMQ在海量topic的场景下,依然保持较高的TPS?

        >CommitLog的”随机读”对性能的影响?

        我们前面知道,RocketMQ是基于文件存储,所有消息的本体都保存在Commitlog上,消息的生产是顺序写,效率很高,但是消费的时候是基于主题的,一个主题的消息随机分布式在Commitlog上,所以这个是随机读,这个对RocketMQ有什么影响。

    图示

    RocketMQ对比Kafka

    图示

        Kafka 中文件的布局是以Topic/partition ,每一个分区一个物理文件夹,在分区文件级别实现文件顺序写,如果一个Kafka集群中拥有成百上千个主题,每一个主题拥有上百个分区,消息在高并发写入时,其IO操作就会显得零散(消息分散的落盘策略会导致磁盘IO竞争激烈成为瓶颈),其操作相当于随机IO,即Kafka在消息写入时的IO性能会随着topic 、分区数量的增长,其写入性能会先上升,然后下降。

        而RocketMQ在消息写入时追求极致的顺序写,所有的消息不分主题一律顺序写入commitlog文件,并不会随着topic和分区数量的增加而影响其顺序性。

        在消息发送端,消费端共存的场景下,随着Topic数的增加Kafka吞吐量会急剧下降,而RocketMQ则表现稳定。因此Kafka适合Topic和消费端都比较少的业务场景,而RocketMQ更适合多Topic,多消费端的业务场景。

    CommitLog之Message格式

    图示

    Store架构设计之消息发送

        整个存储设计层次非常清晰,大致的层次如下图:

    图示

        业务层,也可以称之为网络层,就是收到消息之后,一般交给SendMessageProcessor来分配(交给哪个业务来处理)。

        DefaultMessageStore,这个是存储层最核心的入口。

        另外还有一个重要的是CommitLog.

        以上就是三个核心类。

    1.Store层处理消息的入口

        这个存储层处理消息的过程就是一次RPC请求,所以我们找入口。当然还是由Broker启动

    initialize() 图示 registerProcessor()

        这里还是类似之前讲过的功能号的概念。

    线程池

    1.SendMessageProcessor.processRequest

        RocketMQ使用Netty处理网络,框架收到请求的处理就会进入processRequest

    processRequest() asyncProcessRequest() asyncSendMessage()

    2、DefaultMessageStore.processRequest

    asyncPutMessage()

    3、CommitLog.asyncPutMessage

    asyncPutMessage()

    3.1、存储到MappedFileQueue的MappedFile

    asyncPutMessage() appendMessage()

        这里就不详细讲了,无非就是数据的一些格式处理的东西。

    3.2、同步刷盘:GroupCommitService(独立的线程)

        刷盘是在commitlog的构造方法中就启动了独立的线程处理

    CommitLog() 同步刷盘

    3.3、异步刷盘:CommitRealTimeService/FlushCommitLogService(独立的线程)

    CommitLog() FlushRealTimeService CommitRealTimeService

    Store架构设计之消息消费

    图示

    CommitLog的”随机读”对性能的影响?

        RocketMQ中,所有的队列存储一个文件(commitlog)中,所以rocketmq是顺序写io,随机读。每次读消息时先读逻辑队列consumeQueue中的元数据,再从commitlog中找到消息体。增加了开销。

        那么在RocketMQ中是怎么优化的?

        1、本身无论是Commitlog文件还是Consumequeue文件,都通过MMAP内存映射。

        2、本身存储Commitlog采用写时复制的容器处理,实现读写分离,所以很大程度上可以提高一些效率。

    源码分析之堆外内存

        我们根据之前了解可以,一般情况下RocketMQ是通过MMAP内存映射,生产时消息写入内存映射文件,然后消费的时候再读。

        但是RocketMQ还提供了一种机制。我们来看下。

        TransientStorePool,短暂的存储池(堆外内存)。RocketMQ单独创建一个ByteBuffer内存缓存池,用来临时存储数据,数据先写入该内存映射中,然后由commit线程定时将数据从该内存复制到与目标物理文件对应的内存映射中。

        RocketMQ引入该机制主要的原因是提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。同时因为是堆外内存,这么设计可以避免频繁的GC。

    1.开启条件及限制

    (1)开启位置broker中的配置文件:

    图示

    (2)在DefaultMessageStore. 

    DefaultMessageStore()构造方法中,也可以看到还有其他限制

    开启堆外内存缓冲区,必须是异步刷盘+主节点

    DefaultMessageStore() isTransientStorePoolEnable()

    2.TransientStorePool概要设计

    TransientStorePool类 图示

        这个地方的设计有点类似于连接池的设计,首先,构造方法中init方法用于构造堆外内存缓冲值,默认构造5个。

        borrowBufferf()借用堆外内存池ByteBuffer在创建MappedFile时就会进行设置。要注意,这里就会把堆外内存通过returnBuffer()赋给writeBuffer。

    init()

    3.与消息发送流程串联

        有了上面的知识,我们就可以确定,在MappedFile中,如果writeBuffer不为null要么就一定开启了堆外内存缓冲!!!

        再结合消息的发送流程。

        数据到了存储层,最终会调用MappedFile的appendMessagesInner()进行消息的存储。

    appendMessagesInner() 图示

        按照上图的流程,消息发送就有两条线。

        1、 走传统的MMAP内存映射,数据写mappedByteBuffer,然后通过flush刷盘。

        2、 走堆外内存缓冲区,数据先写writeBuffer,再通过commit提交到FileChannel中,最后再flush刷盘。

        以上两种方式,处理的都是基于bytebuffer的实现,所以都通过 put方法可以写入内存。

        所以对应前面讲的刷盘。

        你会发现为什么异步刷盘线程有两个。一个是针对的MMAP刷盘,一个是针对的堆外内存缓冲的提交刷盘。

        所以了堆外内存缓冲区一定是要异步、Commit的是针对堆外内存缓冲的提交。Flush的是针对MMAP的内存映射的处理。

    图示

        在CommitRealTimeService中最后调用到MappedFile的 commit0方法写入:

        具体的如下:

    CommitRealTimeService类 commit() commit() commit0()

    4.两种方式的对比

        (1)默认方式,Mmap+PageCache的方式,读写消息都走的是pageCache,这样子读写都在pagecache里面不可避免会有锁的问题,在并发的读写操作情况下,会出现缺页中断降低,内存加锁,污染页的回写(脏页面)。

        (2)堆外缓冲区,DirectByteBuffer(堆外内存)+PageCache的两层架构方式,这样子可以实现读写消息分离,写入消息时候写到的是DirectByteBuffer——堆外内存中,读消息走的是PageCache(对于,DirectByteBuffer是两步刷盘,一步是刷到PageCache,还有一步是刷到磁盘文件中),带来的好处就是,避免了内存操作的很多容易堵的地方,降低了时延,比如说缺页中断降低,内存加锁,污染页的回写。

        所以使用堆外缓冲区的方式相对来说会比较好,但是肯定的是,需要消耗一定的内存,如果服务器内存吃紧就不推荐这种模式,同时的话,堆外缓冲区的话也需要配合异步刷盘才能使用。

    源码分析之ConsumeQueue

    1.消息发送时数据在ConsumeQueue的落地

    图示

        连续发送5条消息,消息是不定长,首先所有信息先放入 Commitlog中,每一条消息放入Commitlog的时候都需要上锁,确保顺序的写入。

        当Commitlog写成功了之后。数据再同步到ConsunmeQueue中。

        并且数据一条条分发,这个是一个典型的轮训。

        Queue Offset 代表一个Queue中的第几条消息。

        Logic Offset就是Queue Offset*20  因为每一条ConsumeQueue中的消息长度都是20.

        Physical Offset,这个是在 Commitlog中每一条消息偏移量。

        这种设计非常的巧妙:

        查找消息的时候,可以直按根据队列的消息序号,计算出索引的全局位置(比如序号2,就知道偏移量是20),然后直接读取这条索引,再根据索引中记录的消息的全局位置,找到消息、这里面比较耗时两个操作就是分别找到索引和消息所在文件,这两次查找是差不多的,都可以抽象成:

        因为每个索引文件或者消息文件的长度的是固定的,对于每一组文件,都维护了一个由小到大有序的文件数组。查找文件的时候,直接通过计算即可获取文件在数组中的序号:

        文件在数组中的序号=(全局位置-第一个文件的文件名)/文件固定大小

        在通过序号在数组中获取数据的时间复杂度是0(1),二次查找文件的时间复杂度也是是:0(1)+0(1) =0 (1),所以消费时查找数据的时间复杂度也是O(1)。

    2.入口:ReputMessageService.doReput(独立线程)

    DefaultMessageStore. start()

    start()

    ReputMessageService.run()

    run() doReput() dispatch() putMessagePositionInfo() putMessagePositionInfoWrapper() putMessagePositionInfo() appendMessage()

    3.异步刷盘

    DefaultMessageStore() doFlush()

    我是娆疆_蚩梦,让坚持成为一种习惯,感谢各位大佬的:点赞收藏评论,我们下期见!


    上一篇:RocketMQ源码解读之Producer

    下一篇:RocketMQ源码解读之Consumer

    相关文章

      网友评论

          本文标题:RocketMQ源码解读之Store

          本文链接:https://www.haomeiwen.com/subject/atmheltx.html