美文网首页Java 杂谈
从RocketMQ消息持久化设计看磁盘性能瓶颈的突破!

从RocketMQ消息持久化设计看磁盘性能瓶颈的突破!

作者: 90299fbffdea | 来源:发表于2019-05-09 21:28 被阅读4次

分布式消息队列通常有高可靠性的要求,所以消息数据是需要持久化存储的。那么以什么方式来进行持久化是一个值得商榷的问题。

从存储方式和效率来看,文件系统 > KV存储 > 关系型数据库,直接操作文件系统自然是最快的一种存储方式,但是仅仅如此就可以了吗?

当然不是,在无数的过往学习中,磁盘IO性能拖累系统性能是众所周知的。那么RocketMQ是怎么解决呢?

各位看官且待我慢慢道来。

存储架构设计

首先我们回忆一下,假如现在有一个字你不认识,然后你手上正巧有一本汉语言辞典,请问该怎么做才能以最快的速度查到这个字?

凡是上过小学的人,应该都不会从汉语言辞典第一页开始一页一页的查找。

作为优秀小学毕业生的我们,肯定是先通过偏旁检索到这个字,然后根据检索上这个字的页码,到汉语言辞典里对应的页码中去找到这个字,于是你就知道它读什么了。

大道相通。

RocketMQ在文件系统中,把所有的消息都存在了同一个文件中,这就像一本厚厚的汉语言辞典,作为消费者,想要做到最大效率的实时消费,说白了就是要快速定位到这个消息在文件中的位置,肯定不能从文件偏移量0开始向下查找。

一张图顶几百字:

image

消息存储设计原理图

RocketMQ主要存储文件有三个,分别是:

  • CommitLog:消息存储文件,所有的消息存在这里;
  • ConsumeQueue:消费队列文件,消息在存储到CommitLog后,会将消息所在CommitLog偏移量、大小、tag的hashcode异步转发到消费队列存储,供消费者消费,其类似于数据库的索引文件,存储的是指向物理存储的地址,每个topic下的每个Message Queue都有一个对应的ConsumeQueue文件;
  • Index:索引文件,消息在存储到CommitLog后,会将消息key与消息所在CommitLog偏移量转发到索引文件存储,供消息查询。

从原理图中,我们可以看出消息的生产与消费进行了分离,Producer端发送消息最终写入的是CommitLog,Consumer端先从ConsumeQueue读取持久化消息的起始物理位置偏移量offset、大小size和消息Tag的HashCode值,再从CommitLog中进行读取待拉取消费消息的真正实体内容部分。

上面说了消费者如何快速定位到消息位置,使消费者可以高效的消费,那么下面我们说说RocketMQ中如何做到消息存储的高效性。

我们先思考一个问题,假如你是印刷厂的老板,你如何才能快速印刷出一本完整没有错误的汉语言辞典呢?

答案很简单,从第一页开始,按照顺序一页一页的印刷,不要跳页印刷,更不要随机印刷。

正如我们的磁盘写入一样,据某某调查研究表明,高性能磁盘在顺序写入的时候,速度基本可以堪比内存的写入速度,但是磁盘随机写入的时候,性能瓶颈非常明显,速度会比较慢。

所以RocketMQ采用了全部消息都存入一个CommitLog文件中,并且对写操作加锁(putMessageLock),保证串行顺序写入消息,避免磁盘竟争导致IO WAIT增高,大大提高写入效率。

我们可以用一个更详细的图来说明:

image

生产者按顺序写入CommitLog,消费者通过顺序读取ConsumeQueue进行消费,这里有一个地方需要注意,虽然消费者是按照顺序读取ConsumeQueue,但是并不代表它就是按照顺序读取消息,因为根据ConsumeQueue中的起始物理位置偏移量offset读取消息真实内容,在并发量非常高的情况下,实际上是随机读取CommitLog,而随机读取文件带来的性能开销影响还是比较大的,所以在这里,RocketMQ利用了操作系统的pagecache机制,批量从磁盘读取,作为cache存在内存中,加速后速的读取速度。

存储文件

我们打开RocketMQ在磁盘上持久化的目录(store目录下),便可以很直观的看到CommitLog,ConsumeQueue,Index三个文件夹。(其中config文件夹中是运行期间一些配置信息,而abort,checkpoint我会在后续的文章中讲述它们的作用,关注“IT一刻钟”吧,不要在犹豫中错过了重要内容!)

image

store目录

CommitLog文件夹中的内容

image

可以看到每个文件1G大小,以该文件中第一个偏移量为文件名,偏移量小于20位用0补齐。如图所示,第一个文件的初始偏移量为9663676416,第二个文件的初始偏移量为10737418240。

CommitLog文件内部存储逻辑是,每条消息的前4个字节存储该条消息的总长度(包含长度信息本身),随后便是消息内容。如图所示:

image

消息的长度=消息长度信息(4字节)+ 消息内容长度。

实现消息查找的步骤:

1.消费者从消费队列中获取到某个消息的偏移量offset与长度size;

2.根据偏移量offset定位到消息所在的commitLog物理文件;

3.用偏移量与文件长度取模,得到消息在这个commitLog文件内部的偏移量;

4.从该偏移量取得size长度的内容返回即可。

<bi class="ql-align-justify" style="box-sizing: border-box; text-align: justify; display: block;">注:如果只是根据消息偏移量查找消息,则首先找到文件内偏移量,然后读取前4个字节获取消息的实际长度,然后读取指定的长度。</bi>

这里有一个比较巧妙的设计,CommitLog文件并不是每次生成一个,然后写满之后再创建下一个,而是有一个预分配的机制。

即,CommitLog创建过程是把下一个文件的路径、下下个文件的路径以及文件大小作为参数封装到AllocateRequest对象并添加到队列中,后台运行的AllocateMappedFileService服务线程会不停地run,只要请求队列里存在请求对象,就会去创建下个CommitLog,同时还会将下下个CommitLog预先创建并保存至请求队列中等待下次获取时直接返回,不用再次因为等待CommitLog创建分配而产生时间延迟。

ConsumeQueue文件夹中的内容

image

对于消费者来说,最关心的莫过于某个主题下的所有消息,但是在RocketMQ中,不同主题下的消息都交错杂糅在同一个文件里,想要提高查询速度,必须要构建类似于搜索索引的文件,于是就有了消费队列ConsumeQueue文件。

从实际物理存储来说,ConsumeQueue对应每个Topic和QueuId下面的文件,在上图中,00000000000012000000就是在主题为sim-online-orders,QueueId为1下的ConsumeQueue文件。单个文件大小约5.72M,每个文件由30W条数据组成,每个文件默认大小为600万个字节,即每条数据20个字节。当一个ConsumeQueue类型的文件写满了,则写入下一个文件。

ConsumeQueue文件内部存储逻辑如图:

image

包含消息在commitLog文件的偏移量,消息长度,消息tag的HashCode。

单个ConsumeQueue文件可以看作是ConsumeQueue条目数组,其下标是ConsumeQueue的逻辑偏移量。

消息消费队列是RocketMQ为消息订阅构建的索引文件,目的在于提高主题与消息队列检索消息的速度。

Index文件夹中的内容

image

RocketMQ为了通过消息Key值查询消息真正的实体内容,引入了Hash索引机制。在实际的物理存储上,文件名则是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存2000W个索引。

我们先来看看Index索引文件的内部存储逻辑:

image

IndexFile包含三个部分:IndexHead,Hash槽,Index条目。

1.IndexHead,包含40个字节,记录一些统计信息:

beginTimestamp:该索引文件中包含消息的最小存储时间。

endTimestamp:该索引文件中包含消息的最大存储时间。

beginPhyoffset:该索引文件中包含消息的最小物理偏移量(commitlog文件偏移量)。

endPhyoffset:该索引文件中包含消息的最大物理偏移量(commitlog文件偏移量)。

hashslotCount:hashslot个数,并不是hash槽使用的个数,在这里意义不大。

indexCount:Index条目列表当前已使用的个数,Index条目在Index条目列表中按顺序存储。

2.Hash槽,默认500万个槽,每个槽位存储着该消息key的HashCode所对应的最新Index条目的下标数。

3.Index条目列表,默认一个索引文件包含2000万个条目:

hashcode:key的HashCode。

phyoffset:消息对应的物理偏移量。

timedif:该消息存储时间与第一条消息的时间戳的差值,小于0该消息无效。

preIndexNo:该HashCode上一个条目的Index索引,当出现hash冲突时,构建的链表结构。

大家看懂了这个数据结构没有?设计的真是精妙。

如果没有理解,我给大家画个图,来体会一下这个数据结构的精妙:

首先根据key的HashCode对槽数取模,得到槽位,然后将相应的数据按顺序存入到Index条目中,同时将条目数存回对应的槽内。

image

如果遇到Hash冲突,Index条目会通过pre index no构建链表结构:

image

如图第二个槽位冲突,第5条index条目的pre index no存储原来的第二条序号。

其实就是HashMap的变形结构。

通过以上结构便可以用消息的key快速定位到消息内容。

内存映射

如果说以上内容是RocketMQ通过优化数据结构的方式来提高分布式消息队列的性能,那么这里便是通过操作系统底层来优化性能。

在Linux中,操作系统分为“用户态”和“内核态”,普通的标准IO操作文件时,首先从磁盘将数据复制到内核态内存,接着从内核态内存复制到用户态内存,完成读取操作,然后从用户态内存复制到网络驱动的内核态内存,最后从网络驱动的内核态内存复制到网卡中进行传输,完成写出操作。

这个全过程中涉及到四次复制,可以说效率是可见的低。

于是,在RocketMQ中,通过Java中的MappedByteBuffer(mmap方式)实现“零拷贝”,省去了向用户态的内存复制,提高了消息存储和网络发送的速度。

这里我们说一说什么是mmap内存映射技术。

mmap技术可以直接将用户进程私有地址空间中的一块区域与文件对象建立映射关系,这样程序就好像可以直接从内存中完成对文件读/写操作一样。当发生缺页中断时,直接将文件从磁盘拷贝至用户态的进程空间内,只进行了一次数据拷贝。对于容量较大的文件来说(文件大小一般需要限制在1.5~2G以下),采用mmap的方式读/写效率和性能都非常高。如图:

image

使用Mmap的限制:

a.Mmap映射的内存空间释放的问题:由于映射的内存空间本身就不属于JVM的堆内存区(Java Heap),因此其不受JVM GC的控制,卸载这部分内存空间需要通过系统调用 unmap()方法来实现。然而unmap()方法是FileChannelImpl类里实现的私有方法,无法直接显示调用。RocketMQ中的做法是,通过Java反射的方式调用“sun.misc”包下的Cleaner类的clean()方法来释放映射占用的内存空间;

b.MappedByteBuffer内存映射大小限制:因为其占用的是虚拟内存(非JVM的堆内存),大小不受JVM的-Xmx参数限制,但其大小也受到OS虚拟内存大小的限制。一般来说,一次只能映射1.5~2G 的文件至用户态的虚拟内存空间,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了;

c.使用MappedByteBuffe的其他问题:会存在内存占用率较高和文件关闭不确定性的问题;

突破性能瓶颈的处理方法有哪些?

1.简单高效的数据结构,提高检索速度;

2.磁盘的顺序写入,避免无序io竞争,提高消息存储速度;

3.预分配机制,降低文件处理等待时间;

4.依赖pagecache机制,批量从磁盘读取消息并加载到缓存,提高读取速度;

5.内存映射机制,较少用户态内核态之间的复制次数,提高处理效率。

总结

作为一个开发者,有一个学习的氛围跟一个交流圈子特别重要!

欢迎大家进我的技术交流群【Java高级互联网架构:964357187】点击进入,不管你是小白还是大牛欢迎入驻 ,群内有阿里京东架构师为你答疑解惑,还会分享BAT、阿里面试题、面试经验,讨论技术, 大家一起交流学习成长!

群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!


u=1804430498,1207561382&fm=26&gp=0.jpg

相关文章

  • 从RocketMQ消息持久化设计看磁盘性能瓶颈的突破!

    分布式消息队列通常有高可靠性的要求,所以消息数据是需要持久化存储的。那么以什么方式来进行持久化是一个值得商榷的问题...

  • 面试官上来就问:Java 进程中有哪些组件会占用内存?

    不管是持久化的消息还是非持久化的消息都可以被写入到磁盘。持久化的消息在到达队列时就被写入到磁盘,并且如果可以,持久...

  • 3、Kafka/flume概要

    七、Kafka ·kafka是一个分布式消息系统。具有高性能、持久化、多副本备份、横向扩展能力。将消息保存在磁盘中...

  • RabbitMQ指南(三)

    一、存储机制 无论是持久化的消息,还是非持久化的消息都是可以被写入到磁盘中的。持久化的消息在达到队列的时候被写入磁...

  • RocketMQ刷盘策略

    我们都知道RocketMQ的消息是持久化到文件的,具体的消息的刷盘策略是什么,是发送一条消息就直接持久化到文件中吗...

  • RocketMQ源码分析:Broker概述+同步消息发送原理与高

    1、Broker概述 Broker 在 RocketMQ 架构中的角色,就是存储消息,核心任务就是持久化消息,生产...

  • 「Kafka深度解析」快速入门

    Kafka特性 顺序读写的方式访问磁盘,从而避免随机读写磁盘导致的性能瓶颈2.支持批量读写消息,并且会对消息进行批...

  • Kafka设计

    持久化 文件系统 Kafka在消息的存储和缓存中重度依赖文件系统。 磁盘如果合理使用,性能可以非常高。 在一个6张...

  • RocketMQ源代码笔记(一):文件存储

    0.前言 RMQ对于消息持久化的方式是顺序写到本地磁盘文件,相对于持久化到远程的数据库或者KV来说,往本地磁盘文件...

  • 2 kafka的特性

    消息持久化和缓存Kafka高度依赖文件系统来存储和缓存消息。一般的人都认为“磁盘是缓慢的”,这使得人们对“持久化结...

网友评论

    本文标题:从RocketMQ消息持久化设计看磁盘性能瓶颈的突破!

    本文链接:https://www.haomeiwen.com/subject/fzagoqtx.html