消息存储
何时存储消息
分布式队列因为有高可靠性的要求,所以数据要进行持久化存储
- MQ收到一条消息后,需要向生产者返回一个ACK响应,并且将消息存储起来
- MQ推送一条消息给消费者后,等待消费者的ACK响应,需要将消息标记为已消费。如果没有标记为消费,MQ会不断的尝试往消费者推送这条消息
- MQ需要定期删除一些过期的消息,这样才能保证服务一直可用
消息存储介质
RocketMQ采用的是类似于Kafka的文件存储
,也就是直接用磁盘文件来保存消息,而不需要借助MySQL这一类索引工具
磁盘保存文件效率如何
- 磁盘如果使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s,超过了一般网卡的传输速度。但是磁盘随机写的速度大概只有100k/s,和顺序写的性能相差6k倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。
RocketMQ的消息用顺序写,保证了消息存储的速度。
- RocketMQ采用的是零拷贝技术加速了文件读写,使用内存映射的方式
-
什么是零拷贝
- 零拷贝是指计算机执行操作时,CPU不需要先将数据从某处内存复制到一个特定区域。这种技术通常用于通过网络传输文件时节省CPU周期和内存带宽。
- 零拷贝技术可以减少数据拷贝和共享总线操作的次数,消除传输数据在存储器之间不必要的中间拷贝次数,从而有效地提高数据传输效率
- 零拷贝技术减少了用户进程地址空间和内核地址空间之间因为上下文切换而带来的开销
消息存储结构
RocketMQ消息的存储分为三个部分
- CommitLog: 存储消息的元数据。所有消息都会顺序存入到CommitLog文件当中。CommitLog由多个文件组成,每个文件固定大小1G,以第一条消息的偏移量为文件名
- ConsumerQueue: 存储消息在CommitLog的索引。一个MessageQueue一个文件,记录当前MessageQueue被哪些消费者组消费到了哪一条CommitLog
- IndexFile: 为了消费查询提供了一种通过key或时间区间查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送和消费消息的主流程
消费存储结构
-
RocketMQ有几个重要文件
- abort: 这个文件是RocketMQ用来判断程序是否正常关闭的一个标识文件。正常情况下,会在启动时创建,而关闭服务时删除。但是如果遇到一些服务器宕机,或者kill -9这样一些非正常关闭服务的情况,这个abort文件就不会删除,因此RocketMQ就可以判断上一次服务是非正常关闭的,后续就会做一些数据恢复的操作
- checkpoint: 数据存盘检查点
- config/*.json: 这些文件是将RocketMQ的一些关键配置信息进行存盘保存。例如Topic配置、消费者做配置、消费者组消息偏移量等一些信息
消息刷盘机制
RocketMQ需要将消息存储到磁盘上,这样才能保证断电后消息不会丢失。同时这样才可以让存储的消息可以超出内存的限制。RocketMQ为了提高性能,会尽量保证磁盘的顺序写。消息在写入磁盘时,有两种磁盘的方式: 同步刷盘和异步刷盘
同步刷盘
在返回写成功状态时,消息已经被写入磁盘。具体流程是: 消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态
异步刷盘
在返回写成功状态时,消息可能只是写入了内存的PAGECACHE,写操作的返回快,吞吐量大。当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
配置方式
刷盘方式是通过Broker配置文件里的flushDiskType
参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH
中的一个
小结
- 异步刷盘吞吐量很高,但可能会丢消息,同步刷盘确保每条消息都会刷盘成功(安全性高),但吞吐量肯定比异步差很多
消息主从复制
如果Broker以一个集群的方式部署,会有一个master节点和多个slave节点,消息需要从master复制到slave上,而消息复制的方式分为同步复制和异步复制
同步复制
- 同步复制是等master和slave都写入消息成功后才反馈给客户端写入成功的状态
- 在同步复制下,如果master节点故障,slave上有全部的数据备份,这样容易恢复数据。但是
同步复制会增大数据写入的延迟,降低系统的吞吐量
异步复制
- 异步复制是只要master写入消息成功,就反馈给客户端写入成功状态。然后再异步将消息复制给slave节点。
在异步复制下,系统拥有较低的延迟和较高的吞吐量。
但是如果master节点故障,而有些数据没有完成复制就会造成数据丢失。
配置方式
- 消息复制方式是通过Broker配置文件的
brokerRole
参数进行设置的,这个参数可以被设置成ASYNC_MASTER、SYNC_MASTER、SLAVE
三个之中的一个
负载均衡
- Producer发送消息时,
默认会轮询
目标Topic下的所有MessageQueue,并采用递增取模的方式往不同的MessageQueue上发送消息,以达到让消息平均落在不同的Queue上的目的。而由于MessageQueue是分布在不同的Broker上的,所以消息也会发送到不同的Broker上 - 同时生产者在发送消息时,可以指定一个
MessageQueueSelector
。通过这个对象来将消息发送到自己指定的MessageQueue上。这样可以保证消息局部有序
Consumer负载均衡
- Consumer也是以MessageQueue为单位来进行负载均衡。分为
集群模式和广播模式
集群模式
- 在集群消费模式下,每条消息只需要投递到订阅这个Topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条MessageQueue
- 而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。每次分配时,都会将MessageQueue和消费者ID进行排序后,再用不同的分配算法进行分配。
- 内置的分配的算法共有6种,分别对应
AllocateMessageQueueStrategy
下的6种实现类,可以在Consumer中直接set来指定。默认情况下使用的是最简单的平均分配策略
- AllocateMachineRoomNearby: 将同机房的Consumer和Broker优先分配在一起
-
AllocateMessageQueueAveragely(默认)
: 平均分配。将所有MessageQueue平均分配给每一个消费 - AllocateMessageQueueAveragelyByCircle: 轮询分配。轮流的给一个消费者分配一个MessageQueue
- AllocateMessageQueueByConfig: 不分配,直接指定一个MessageQueue列表。类似于广播模式,直接指定所有队列
- AllocateMessageQueueByMachineRoom: 按逻辑机房的概念进行分配。又是对BrokerName和ConsumerIdc有定制化的配置
- AllocateMessageQueueConsistentHash: 一致性哈希策略。只需要指定一个虚拟节点数,是用一个哈希环的算法,虚拟节点是为了让哈希数据再换上分布更为均匀
广播模式
- 广播模式下,每一条消息都会投递给订阅了Topic的所有消费者实例,所以也就没有消息分配这一说。而在实现上,就是在Consumer分配Queue时,所有Consumer都分配到所有的Queue
消息重试
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置。可以有三种配置方式:
- 返回Action.ReconsumerLater(
推荐
) - 返回null
- throw new Exception()
重试消息如何处理
重试的消息会进入一个%RETRY% + ConsumerGroup
的队列中
RocketMQ默认允许每条消息最多重试16
次,每次重试的间隔时间如下
重试时间跟延迟消息的延迟级别是对应的。不过取的是延迟级别的后16级别
messageDelayLevel
= 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
重试次数
- 如果消息重试16次后仍然失败,消息将不再投递,转为进入
死信队列
- 一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的
- 关于这个重试次数,RocketMQ可以进行定制。例如通过consumer.setMaxReconsumeTimes(20),将重试次数设定为20次。当定制的重试次数超过16次后,消息的重试时间间隔均为2小时
死信队列
- 当一条消息消费失败,RocketMQ就会自动进行消息重试。如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者对应的一种特殊队列
死信队列
- 死信队列的名称:
%DLQ% + ConsumerGroup
死信队列的特征
- 一个死信队列对应一个ConsumerGroup,而不是对应某个消费者实例
- 如果一个ConsumerGroup没有产生私信队列,RocketMQ就不会为其创建相应的私信队列
- 一个死信队列包含了这个ConsumerGroup里的所有死信消息,而不区分该消息属于哪个Topic
- 死信队列中的消息不会再被消费者正常消费
- 死信队列的有效期跟正常消息相同。默认3天,对应的broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。
注意
-
一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,
一般需要人工去查看死信队列中的消息
,对错误原因进行排查。然后堆死信消息进行处理,比如转发到正常的Topic重新进行消费,或者丢弃 -
默认创建出来的死信队列,他里面的消息是无法读取的,在控制台和消费者都无法读取。这是因为这些默认的死信队列,他们的
权限被设置成了禁读。
需要手动将死信队列的权限配置成可读可写
才能被消费
消息幂等
MQ中的三种实现语义
- 最多一次(at most once): 每条消息最多只会被消费一次
- 至少一次(at least once): 每条消息至少会被消费一次
- 刚刚好一次(exactly once): 每条消息都只会确定的消费一次
应用场景
- 最多一次(at most once): 最好保证的。RocketMQ中可以直接用异步发送、sendOneWay等方式就可以保证
- 至少一次(at least once): RocketMQ也有同步发送、事务消息等很多方式能够保证
- 刚刚好一次(exactly once): 是MQ最理想也是最难保证的一种语义,需要有非常精细的设计才行。
RocketMQ只能保证最多一次(at most once),保证不了刚刚好一次(exactly once).所以,使用RocketMQ时,需要由业务系统自行保证消息的幂等性
消息幂等的必要性
网络不稳定的情况下,RocketMQ的消息可能会出现重复,主要概括一下场景
- 发送时重复
- 投递时消息重复
- 负载均衡是消息重复
处理方式
- 在RocketMQ中是无法保证每个消息只被投递一次的,所以要在业务自行来保证消息消费的幂等性
- 要处理这个问题,RocketMQ的每条消息都有一个唯一的MessageID,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据
-
MessageId是无法保证全局唯一的,
也会有冲突的情况,所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID,而这个业务标识可以使用Message的key来进行传递 -
注意:
在老版本的RocketMQ中,一条消息无论重试多少次,在这些重试消息的MessageId始终都是一样的,但是在4.7.1版本中,每次重试MessageId都会重建
作者:枫度柚子
链接:https://juejin.cn/post/7099439980195512334
来源:稀土掘金
网友评论