RocketMq常用管理命令链接:https://gper.club/articles/7e7e7f7ff3g58gc3g69
Open Message协议
结构简单、解析快、有事务设计、有持久化设计
rocketmq架构图.png
NameServer的作用类似于zk,brocker启动之后会注册到nameserver上,并且每30秒发送一次心跳给nameserver也称服务续约,nameserver每隔10秒钟检查一次broker是否长时间没有发送心跳,默认超过120秒没有发送心跳broker会被nameserver剔除,producer和consumer通过NameServer获取Broker,每30秒拉取刷新一次Broker List,Broker集群节点分为master和slave节点
NameServer之间不通信,是如何保持Broker List一致的?
1、服务注册(Broker新增):Broker主动去每个nameserver续约,保证最终一致
2、服务剔除(Broker关闭或者宕机):每个nameserver主体去检查Broker是否长时间没有心跳,保证最终一致性
3、路由发现(客户端获取最新的Broker List,初始连接、后续连接):客户端(consumer和producer主动每隔30秒主动去拉取刷新Broker List)
消费模式
- 集群模式:消费者互斥的消费消息,一条消息只有一个消费者
- 广播模式:所有消费者都消费生产者发送的所有消息
rocketMq既支持pull也支持push的方式,但push的方式也是通过pull的方式实现的
Message Queue(不同于其他mq的地方)
作用:实现消息的分片存储(存储在不同broker上)(类似于kafaka的partition),同时确定producer和consumer应该同那么broker进行通信。
- 写队列的数量:决定Message Queue的数量(会在磁盘的consumequeue的对应topic下创建同等数量的目录)
- 读队列的数量:决定消费者有几个队列来消费Message Queue
通常情况下最好保证写队列的数量和读队列的数量相同
注:如果读队列数量小于写队列数量,会出现消息消费不到的情况
消费者MessageQueue对象三个重要属性 - topic指定是哪个topic的队列
- brokerName 指定broker
- queueId 一个topic下的队列编号
Message对象
- topic用于标记消息,producer和对应的consumer需要一致
- tags:用于过滤消息,consumer可以定义表达式来过滤同一topic下那些消息可以消费
- keys:索引,需要唯一,用于控制台检索消息
消费者消息消费失败返回:return ConsumeConcurrentlyStatus.RECONSUME_LETER,broker不更新消息偏移量offset
消费者消息消费成功返回:return ConsumeConcurrentlyStatus.RECONSUME_SUCCESS,broker更新消息偏移量offset
rokcetmq支持同步消息、异步消息、无序消息、有序消息、单向消息sendOneWay方式、单向顺序消息sendOneWayOrderly
同步:需要受到消息消费成功的ack,才发送下一条消息
异步:前一条消息未返回,依然发送下一条消息
oneway:单向发送消息,不关心消费情况,不会收的sendresult
RocketMq原理
- 生产者:如何选择Message Queue、如何发送顺序消息、事务消息、延时消息
- Broker:消息如何存储、文件如何清理、主从同步与故障转移
-
消费者: 如何实现负载、rebalance、重试与死信队列
1、自定义消息策略,可以去实现MessageQueueSelector接口返回Messagequeue对象即可
2、如何发送顺序消息:前提条件:1、生产者不能使用多线程(保证消息到底broker是有序的),2、写入broker必须顺序写入同一个broker中,3:消费者只能用一个线程消费顺序消息(多线程情况下消费者消费速率不同,可能导致无序)
生产者发送消息核心方法:processSendResponse,该方法发送消息到broker后必须有响应Response才能发送下一条消息
3、事务消息:在保证本地数据库和mq消息一致性的情况下,需要先操作数据库若数据库操作失败,直接回滚,(如果先操作mq,mq消费之后无法回滚)
生产者发送消息到Broker中,消息状态为未确认状态(消费者不可消费),两阶段提交,先处理本地数据库,再处理mq消息(本地成功,确认Broker消息;失败,丢弃消息;若本地无反馈,rocketmq会主动去获取本地数据是否成功(消息回查))
image.png
4、延时消息:msg.setDelayTimeLevel();
image.png
消息发送至broker临时存储在内存中,通过delayService(定时任务)定时消费
5、rocketmq消息存储在磁盘(commit log文件下)是怎么实现低延迟高吞吐量的?
page cache,cpu操作磁盘数据时先要将磁盘数据拷贝到内核缓冲区,每次拷贝一页(4k),page cache是内核空间的一个缓存用于提高操作效率(page cache有预读取的功能,如当前读取了某页数据,page cache会同时读取相临的页到缓存空间),rocketmq使用零拷贝,通过memory map实现零拷贝(内存映射,通过在用户空间建立内核空间的数据的映射,操作映射来直接操作内核空间的数据,免除了内核空间拷贝数据到用户空间,用户空间数据拷贝到内核空间)
6、文件如何清理
清理策略,过期文件需要清理(默认72小时)或者磁盘空间使用率达到85%直接清理(磁盘空间使用率超过90%会拒绝消息写入)
7、故障转移:当消费者消息消费失败,会通知Broker,broker会生成一个名字前面带retry的topic队列,重试消费,重试多次依然失败会将消息丢入死信队列(名字前带delq的topic队列),死信队列里的消息需要人为去干预处理
rocketmq架构
borker内master节点和slave节点是怎么关联到一起的 ?
1、集群的名字相同,
2、连接到相同的nameserver
3、在配置文件中:brokerId=0代表master,borkerId=1代表是slave
主从同步和刷盘类型
brokerRole(broker的角色)
ASYNC_MASTER主从异步复制(有可能导致消息丢失)(只要master节点消息写入成功就给生产者返回ack,不需要等待slave收到消息)
SYNC_MASTER主从同步双写(推荐)(需要master和slave都收到消息才给生产者返回ack)
flushDiskType(刷盘类型)(对于单个节点)
ASYNC_FLUSH 异步刷盘,(默认)先缓存消息,缓存成功后直接给生产者返回(批量从缓存刷盘,提高了效率)
SYNC_FLUSH 同步刷盘,每一条消息都写入成功,才返回给生产者ack
故障转移(Dledger)
基于raft协议实现的自动选举功能,当master节点挂了,slave节点可以重新选举master节点
网友评论