RocketMQ架构图

NameServer
NameServer是相当于zookeeper,用来管理Broker集群信息的,竟然管理消息队列的集群信息就需要知道各个broker的地址以及topic的分布
NameServer的作用
- 记录broker集群中机器的状态,能够知道broker是否可用
- 记录topic的信息,包括有哪些queue,在哪些broker中
- 生产者和消费者可以通过NameServer拿到topic和broker的信息,同时可以让客户端感知borker不可用
NameServer的特性
- NameServer集群中各个节点是不通信的,某个时间点数据不完全一样
NameServer的作用简单描述为,Borker启动的时候向所有NameServer配置,生产者向队列中发消息的时候从NameServer中获取Borker的地址列表,根据负载算法来进行选择发送。
来拆分下这段话,提出问题,看RocketMQ是如何解决的
Borker如何向NameServer通信,通信提交的数据有哪些?
- 各个Broker和NamerServer是在启动的时候建立的长链接(使用netty),NameServer可以配置启动参数,broker启动后每30s向NameServer中发送心跳包心跳包中会包括broker的信息
- NameServer中存储的信息包括
//Topic路由信息,消息发送时根据路由表负载均衡
HashMap<String,List<QueueData>> topicQueueTable;
//Broker基础信包括brokerName,集群名称,主备broker地址
HashMap<String,BrokerData> brokerAddrTable;
//Broker集群信息,存储集群中所有Broker名称
HashMap<String,Set<String> clusterAddrTable;
//Broker状态信息,NameServer每次收到心跳会替换该信息
HashMap<String,BrokerLiveInfo> brokerLiveTable;
//BrokerFilterServer列表,用户类消息过滤
HashMap<String,List<String>> fiterServerTable;
class QueueData{
String brokerName;
int readQueueNums;
int writeQueueNums;
int perm;//读写权限
int topicSynFlag;
}
class BrokerData{
String cluster;
String brokerName;
//key:borkerID(id为0代表Master,大于0代表Slave),value:broker的真实地址
HashMap<Long,String> brokerAddrs;
}
class BrokerLiveInfo{
Long lastUpdateTimestamp;
DataVersion dataVersion;
Channel channel;
String haServerAddr;
}


NamerServer是如何判断Broker宕机的?
- NameServer启动时会启动一个线程,每10s扫描borkerLiveTable状态表,如果BrokerLive中的lastUpdateTimestamp距离超过120s则认为Broker失效,移除该Broker关闭链接,同时更新保存的信息
- Broker正常关闭的时候会主动通知NameServer,执行下线操作
- NameServer启动的时候会启动一个线程每10分钟会打印一次KV的配置信息
生产不存在的主题是如何返回信息,自动创建主题的?
不存在的主题NameServer会返回不存在,自动创建主题是producer获取默认的主题进行发送,和broker的交互(NameServer对于主题有就返回,没有就返回没有,自动创建主题是系统主题,broker端的处理)
Producer
Producer顾名思义,就是将消息正常的发送到服务器。知道发送的结果以便后续的处理
Producer有几种发送方式?
- producer发送方式有三种,可靠的同步发送,可靠的异步发送,单向发送
- 一个Topic分多个Queue,消息最后存储是在Queue中,消息发送本质是指明把哪条消息发送到哪个queue中,如果只是指明发送的消息,会通过负载自动算出发送的Queue进行发送
- 创建topic时指定读写队列的数量会决定实际队列的数量,比如集群中两台机器,创建topic的读写队列数量为8.那么总共会有16个队列。每次生产的时候会从16个队列中随机一个进行写入,同理代表最多可以有16个消费者来消费。(在broker中topic信息就是这么记录的,提交给nameserver的时候会告诉有几个读写队列之后负责均衡)
Producer如何根据topic获取到Broker的地址?
- 生产者消费者都是在MQClientInstance中注册的(是客户端与Broker,NameServer打交道的网络通道)。一个JVM中生产和消费者获取到的MQClientInstance实例是同一个
- 客户端会先通过缓存查找,查找不到使用MQClientInstance从NameServer中查找数据,并将结果缓存到本地
- 每30s会从NameServer拉取数据更新一次缓存
发送的消息有哪些属性?
- 主要是Message类的属性包括
class Message {
String topic
int flag;
Map properties;
byte[] data
}
- topic:消息所属的主题,消息Flag(RocketMq不做处理),扩展属性,消息体
- 扩展属性(properties)包括,tag:消息TAG用于消息的过滤,keys:Message的索引多个用空格隔开可以根据这些Key快速定位消息,waitStoreMsgOK:消息发送时是否等消息存储完成后再返回,delayTimeLevel:消息延迟级别用户定时消息或消息重试
- 为什么没有Queue的信息,不能直接指明Queue发消息吗?Queue的信息是如果要直接指明发送队列的时候调用发送方法的另一个参数
Broker宕机Producer如何知道?
- 只用在更新缓存的时候才会知道Broker宕机,从本地缓存中移除
正在向Broker发数据的生产者发送失败怎么处理?
两种方式
- 不启用broker故障延迟机制(默认)
首先选择轮询选择一个队列所在的broker,进行发送,如果broker不可用发送失败,则重新选择队列的broker,不包括失败的broker(producer必须开启重试机制才可以)
//参数为上次发送失败的brokerName
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
int index= this . sendWhichQueue.getAndincrement();
for (inti= O; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++)% this.messageQueueList.size();
if (pos < 0)
pos = O;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAnd工ncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = O;
return this.messageQueueList.get(pos);
}
- 启用故障延迟机制
在发送到broker时,由于broker不可用导致的消息发送失败,会将该broker不可用记录同时计算出不可用的时间,在选择broker中的时候进行isAvaible的判断。
生产不存在的主题是如何返回信息,自动创建主题的?
当主题不存在时,producer会尝试获取默认的系统主题,如果开始可以自动创建主题的时候,NameServer会返回默认主题的信息(tbw102),producer会更新本地路由信息将默认的topic换成查询的topic,当发送消息到broker的时候会创建对应的topic
由于broker创建完成后会将topic信息同步到nameServer中去,producer心跳NameServer会更新topic信息导致不是所有的broker都会加入创建的主题中。
数据存储
数据存储,是Broker接收到数据后进行消息的持久化,之后的消费可以从存储上找到对应的数据
Topic存储
topic存储是config目录下topics.json,会记录topic的读写队列数量,上报nameserver的时候也是这么上报,所以不会存在队列找不到的情况(如果是直接读consumequeue下的目录来解析的话就会存在队列不存在的情况)
存储内容如:

Broker是怎么存储数据的?
RocketMQ主要存储的文件包括,Commitlog文件,ConsumeQueue文件,IndexFile文件。
- Commitlog:存储所有topic的所有消息,每条消息长度不固定,头四个字节存储消息长度
- ConsumeQueue:队列中的queue,存储消息在CommitLog存储的位置,消息的大小,和其他信息,固定长度20字节,所以可以直接根据offset算消息的位置
-
IndexFile:索引文件,会存储hash对应的消息的位置
image.png

消息在发送到broker中的时候首先会将消息全量存储到CommitLog中,然后异步将消息所在的地址存入ConsumeQueue中,以及Key和位置存入到IndexFile中去
Broker是怎么查找数据的?
简单例子:比如要获取topicA,queue0的offset为100的数据,首先100*20算出consumeQueue文件中消息所在的位置,然后解析20个字节算出CommitLog所在的位置,最后从CommitLog中找到消息信息
Broker存储的数据结构是什么?

ConsumeQueue:

IndexFile:

Broker存储刷盘如何处理?
首先Borker的文件操作都是通过Mappedfile来进行操作的,相当于直接无需内存拷贝直接操作内核中的文件信息。可以根据位置直接定位到文件的具体位置进行读写
- 同步刷盘
- 异步刷盘
注:文件删除策略是10s调度一次清理线程清除过期文件。
消费者
消费者,生产者生产完消息后,消费者可以拿到消息进行消费。
消费者是怎么获取Broker地址的?
和生产一样都会注册MQClientInstance从里面链接NameServer进行获取
一个消费者组只能消费一个topic,因为在队列分配的时候会根据组名称获取总共有多少个消费者然后根据这个数量来进行分区分配,所以如果同一个消费者组消费不同的分区时,会导致分区分配错误
消费者是怎么获取消费消息的?
- 分为推和拉模式(本质都是拉模式)
- 也有消费者组的概念
- 消费者组之间有集群模式(一条消息只被消费一次)和广播模式(一条消息被所有消费者消费)
消费者是怎么存储消费位置的?
广播模式(数据存储在本地):
<storeDir>/.rocketmq_offsets/<clientId>/<group>/offsets.json
集群模式(数据存储在服务端):
<rootPath>/config/consumerOffset.json
数据内容:
{
"offsetTable":{
//key的格式topic@group
"test@benchmark_consumer_61":{
0:5280,1:5312,2:5312,3:5312
}
}
}
消费者怎么提交位移?
消费者拉到一批数据后将数据放入processqueue中,之后异步的处理消息,同时再次发送pullRequest从broker拉取消息,再放入processqueue中(每次拉取时会判断,如果processqueue大于1000条,或者数据量大于100M时会等待一段时间在拉取),processqueue根据offset会将消息封装一个treemap,每次异步线程消费完之后将treemap中的消息移除,同时取treemap中第一个元素,如果treeMap为空则提交最大的offset进行位移提交
(
firstEntry()来获取未处理的消息中的最小位移进行提交,所以
1、先消费完offset比较大的消息也获取的是未消费的offset不会造成消息丢失,等消费完小的offset自然就会提交已经消费过的offset,但可能重复消费
2、先消费完最小的offset就会直接提交位移,不会造成重复消费
3、先消费完最大的offset,只有等所有消息都消费完,treeMap为空时获取最大的offset进行提交,也不会重复消费
)
位移提交是在内存中存储,每10ms进行broker的位移提交,在shutdown和重平衡时也会进行brooker端位移的提交
其他
读写队列
RocketMQ的读写队列一般设置为一样。比如,一个Topic读队列8个,写队列4个,代表这个Topic总共会有8个队列,其中只有前4个队列生产者会生产消息,后面的4个队列消费者是不会拿到消息的。同样,如果一个Topic写队列8个,读队列4,代表总共有8个队列,其中生产者会向8个队列中生产,但是消费者只会从前4个队列消费,其他四个队列中的消息是不会被消费的。
slave节点
slave会开启线程进行从master节点消息的拉取同步。只是备份无切换功能
延迟队列的实现

目前rocketmq支持固定时间的延迟,比如,1小时,1分钟,1天等。不同的延迟时间放入不同的queue中,所以会有一个队列特性,先放入的一定是先到消费时间的
消息处理就是判断消息有没有设置延迟级别,如果设置的话,根据设置的不同的级别放入不同的定时队列中,同时存储时间到之后的实际的存储topic,如果没有设置延迟级别,直接将消息投递到对应的业务Topic中去

新的消费者组消费老topic消费位置
问题发现:
rocketmq新的消费者组默认位置为CONSUME_FROM_MAX_OFFSET,但是发现新创建的topic发送几条消息后,在启动新的消费者组还可以 消费到启动前发送的消息
原因:
该队列在Broker端的最小偏移量为0,消息并且都在内存中(pageCache),说的更直白点,consumequeue/topicName/queueNum的第一个消息消费队列文件为00000000000000000000,并且消息其对应的消息缓存在Broker端的内存中(pageCache),其返回给消费端的偏移量为0,故会从0开始消费,而不是从队列的最大偏移量处开始消费。
消费端客户端代码:
switch (consumeFromWhere) {
case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
case CONSUME_FROM_MIN_OFFSET:
case CONSUME_FROM_MAX_OFFSET:
case CONSUME_FROM_LAST_OFFSET: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
}
// First start,no offset
else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
result = 0L;
} else {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
如果返回的偏移量大于等于0,则直接使用该offset,这个也能理解,大于等于0,表示查询到有效的消息消费进度,从该有效进度开始消费,但我们要特别留意lastOffset为0是什么场景,因为返回0,并不会执行CONSUME_FROM_LAST_OFFSET(语义)。
如果lastOffset为-1,表示当前并未存储其有效偏移量,可以理解为第一次消费,如果是消费组重试主题,从重试队列偏移量为0开始消费;如果是普通主题,则从队列当前的最大的有效偏移量开始消费,即CONSUME_FROM_LAST_OFFSET语义的实现。
所以需要看为什么会返回offset的值为0?
服务端端查询offset代码:
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
final QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) response.readCustomHeader();
final QueryConsumerOffsetRequestHeader requestHeader =
(QueryConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
long offset =
this.brokerController.getConsumerOffsetManager().queryOffset(
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
if (offset >= 0) {
//1
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else {
//2
long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
if (minOffset <= 0
&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
responseHeader.setOffset(0L);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else {
response.setCode(ResponseCode.QUERY_NOT_FOUND);
response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");
}
}
return response;
}
代码@1:如果消息消费进度文件中存储该队列的消息进度,其返回的offset必然会大于等于0,则直接返回该偏移量该客户端,客户端从该偏移量开始消费。
代码@2:如果未从消息消费进度文件中查询到其进度,offset为-1。则首先获取该主题、消息队列当前在Broker服务器中的最小偏移量(@4)。如果小于等于0(返回0则表示该队列的文件还未曾删除过)并且其最小偏移量对应的消息存储在内存中而不是存在磁盘中,则返回偏移量0,这就意味着ConsumeFromWhere中定义的三种枚举类型都不会生效,直接从0开始消费
网友评论