没有目的,就做不成任何事情;目的渺小,就做不成任何大事。
——狄德罗
大纲
图示本节思考:
>当topic数量增多到100+时,kafka的单个broker的TPS降低了1个数量级,而RocketMQ在海量topic的场景下,依然保持较高的TPS?
>CommitLog的”随机读”对性能的影响?
我们前面知道,RocketMQ是基于文件存储,所有消息的本体都保存在Commitlog上,消息的生产是顺序写,效率很高,但是消费的时候是基于主题的,一个主题的消息随机分布式在Commitlog上,所以这个是随机读,这个对RocketMQ有什么影响。
图示RocketMQ对比Kafka
图示Kafka 中文件的布局是以Topic/partition ,每一个分区一个物理文件夹,在分区文件级别实现文件顺序写,如果一个Kafka集群中拥有成百上千个主题,每一个主题拥有上百个分区,消息在高并发写入时,其IO操作就会显得零散(消息分散的落盘策略会导致磁盘IO竞争激烈成为瓶颈),其操作相当于随机IO,即Kafka在消息写入时的IO性能会随着topic 、分区数量的增长,其写入性能会先上升,然后下降。
而RocketMQ在消息写入时追求极致的顺序写,所有的消息不分主题一律顺序写入commitlog文件,并不会随着topic和分区数量的增加而影响其顺序性。
在消息发送端,消费端共存的场景下,随着Topic数的增加Kafka吞吐量会急剧下降,而RocketMQ则表现稳定。因此Kafka适合Topic和消费端都比较少的业务场景,而RocketMQ更适合多Topic,多消费端的业务场景。
CommitLog之Message格式
图示Store架构设计之消息发送
整个存储设计层次非常清晰,大致的层次如下图:
图示业务层,也可以称之为网络层,就是收到消息之后,一般交给SendMessageProcessor来分配(交给哪个业务来处理)。
DefaultMessageStore,这个是存储层最核心的入口。
另外还有一个重要的是CommitLog.
以上就是三个核心类。
1.Store层处理消息的入口
这个存储层处理消息的过程就是一次RPC请求,所以我们找入口。当然还是由Broker启动
initialize() 图示 registerProcessor()这里还是类似之前讲过的功能号的概念。
线程池1.SendMessageProcessor.processRequest
RocketMQ使用Netty处理网络,框架收到请求的处理就会进入processRequest
processRequest() asyncProcessRequest() asyncSendMessage()2、DefaultMessageStore.processRequest
asyncPutMessage()3、CommitLog.asyncPutMessage
asyncPutMessage()3.1、存储到MappedFileQueue的MappedFile
asyncPutMessage() appendMessage()这里就不详细讲了,无非就是数据的一些格式处理的东西。
3.2、同步刷盘:GroupCommitService(独立的线程)
刷盘是在commitlog的构造方法中就启动了独立的线程处理
CommitLog() 同步刷盘3.3、异步刷盘:CommitRealTimeService/FlushCommitLogService(独立的线程)
CommitLog() FlushRealTimeService CommitRealTimeServiceStore架构设计之消息消费
图示CommitLog的”随机读”对性能的影响?
RocketMQ中,所有的队列存储一个文件(commitlog)中,所以rocketmq是顺序写io,随机读。每次读消息时先读逻辑队列consumeQueue中的元数据,再从commitlog中找到消息体。增加了开销。
那么在RocketMQ中是怎么优化的?
1、本身无论是Commitlog文件还是Consumequeue文件,都通过MMAP内存映射。
2、本身存储Commitlog采用写时复制的容器处理,实现读写分离,所以很大程度上可以提高一些效率。
源码分析之堆外内存
我们根据之前了解可以,一般情况下RocketMQ是通过MMAP内存映射,生产时消息写入内存映射文件,然后消费的时候再读。
但是RocketMQ还提供了一种机制。我们来看下。
TransientStorePool,短暂的存储池(堆外内存)。RocketMQ单独创建一个ByteBuffer内存缓存池,用来临时存储数据,数据先写入该内存映射中,然后由commit线程定时将数据从该内存复制到与目标物理文件对应的内存映射中。
RocketMQ引入该机制主要的原因是提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。同时因为是堆外内存,这么设计可以避免频繁的GC。
1.开启条件及限制
(1)开启位置broker中的配置文件:
图示(2)在DefaultMessageStore.
DefaultMessageStore()构造方法中,也可以看到还有其他限制
开启堆外内存缓冲区,必须是异步刷盘+主节点
DefaultMessageStore() isTransientStorePoolEnable()2.TransientStorePool概要设计
TransientStorePool类 图示这个地方的设计有点类似于连接池的设计,首先,构造方法中init方法用于构造堆外内存缓冲值,默认构造5个。
borrowBufferf()借用堆外内存池ByteBuffer在创建MappedFile时就会进行设置。要注意,这里就会把堆外内存通过returnBuffer()赋给writeBuffer。
init()3.与消息发送流程串联
有了上面的知识,我们就可以确定,在MappedFile中,如果writeBuffer不为null,要么就一定开启了堆外内存缓冲!!!
再结合消息的发送流程。
数据到了存储层,最终会调用MappedFile的appendMessagesInner()进行消息的存储。
appendMessagesInner() 图示按照上图的流程,消息发送就有两条线。
1、 走传统的MMAP内存映射,数据写mappedByteBuffer,然后通过flush刷盘。
2、 走堆外内存缓冲区,数据先写writeBuffer,再通过commit提交到FileChannel中,最后再flush刷盘。
以上两种方式,处理的都是基于bytebuffer的实现,所以都通过 put方法可以写入内存。
所以对应前面讲的刷盘。
你会发现为什么异步刷盘线程有两个。一个是针对的MMAP刷盘,一个是针对的堆外内存缓冲的提交刷盘。
所以了堆外内存缓冲区一定是要异步、Commit的是针对堆外内存缓冲的提交。Flush的是针对MMAP的内存映射的处理。
图示在CommitRealTimeService中最后调用到MappedFile的 commit0方法写入:
具体的如下:
CommitRealTimeService类 commit() commit() commit0()4.两种方式的对比
(1)默认方式,Mmap+PageCache的方式,读写消息都走的是pageCache,这样子读写都在pagecache里面不可避免会有锁的问题,在并发的读写操作情况下,会出现缺页中断降低,内存加锁,污染页的回写(脏页面)。
(2)堆外缓冲区,DirectByteBuffer(堆外内存)+PageCache的两层架构方式,这样子可以实现读写消息分离,写入消息时候写到的是DirectByteBuffer——堆外内存中,读消息走的是PageCache(对于,DirectByteBuffer是两步刷盘,一步是刷到PageCache,还有一步是刷到磁盘文件中),带来的好处就是,避免了内存操作的很多容易堵的地方,降低了时延,比如说缺页中断降低,内存加锁,污染页的回写。
所以使用堆外缓冲区的方式相对来说会比较好,但是肯定的是,需要消耗一定的内存,如果服务器内存吃紧就不推荐这种模式,同时的话,堆外缓冲区的话也需要配合异步刷盘才能使用。
源码分析之ConsumeQueue
1.消息发送时数据在ConsumeQueue的落地
图示连续发送5条消息,消息是不定长,首先所有信息先放入 Commitlog中,每一条消息放入Commitlog的时候都需要上锁,确保顺序的写入。
当Commitlog写成功了之后。数据再同步到ConsunmeQueue中。
并且数据一条条分发,这个是一个典型的轮训。
Queue Offset 代表一个Queue中的第几条消息。
Logic Offset就是Queue Offset*20 因为每一条ConsumeQueue中的消息长度都是20.
Physical Offset,这个是在 Commitlog中每一条消息偏移量。
这种设计非常的巧妙:
查找消息的时候,可以直按根据队列的消息序号,计算出索引的全局位置(比如序号2,就知道偏移量是20),然后直接读取这条索引,再根据索引中记录的消息的全局位置,找到消息、这里面比较耗时两个操作就是分别找到索引和消息所在文件,这两次查找是差不多的,都可以抽象成:
因为每个索引文件或者消息文件的长度的是固定的,对于每一组文件,都维护了一个由小到大有序的文件数组。查找文件的时候,直接通过计算即可获取文件在数组中的序号:
文件在数组中的序号=(全局位置-第一个文件的文件名)/文件固定大小
在通过序号在数组中获取数据的时间复杂度是0(1),二次查找文件的时间复杂度也是是:0(1)+0(1) =0 (1),所以消费时查找数据的时间复杂度也是O(1)。
2.入口:ReputMessageService.doReput(独立线程)
DefaultMessageStore. start()
start()ReputMessageService.run()
run() doReput() dispatch() putMessagePositionInfo() putMessagePositionInfoWrapper() putMessagePositionInfo() appendMessage()3.异步刷盘
DefaultMessageStore() doFlush()我是娆疆_蚩梦,让坚持成为一种习惯,感谢各位大佬的:点赞、收藏和评论,我们下期见!
网友评论