消息路由
在RocketMQ的系统架构里,由于服务器端(Broker
)会根据实时压力实施弹性扩缩容等发生变动,客户端为了做负载均衡,就需要有注册中心来提供Broker
的信息:
![](https://img.haomeiwen.com/i10995913/fcebe7d537d53817.png)
注册中心的作用是及时发现Broker服务器的变化,并将存活的Broker
信息返回给客户端做负载均衡。
获取Topic
获取路由信息函数
// DefaultMQProducerImpl#tryToFindTopicPublishInfo
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
发送消息前,必须先从注册中心里获取Broker
服务器信息,包括Topic
、队列、IP,然后采取负载均衡算法发送消息。
常见的负载均衡算法:
1.轮询法:将请求按照顺序轮流地分配到各个服务器上。
2.加权轮询法:在轮询算法的基础上添加了权重的条件
3.随机法
4.加权随机法
5.最小连接法:哪个服务器的连接数少,就分配给哪个服务器新的请求
6.哈希法:计算哈希值,映射到服务器上
tryToFindTopicPublishInfo
/**
* 根据topic获取路由信息
*
* @param topic
* @return
*/
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 1 先从本地 topicPublishInfoTable 中获取路由信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 2 路由信息或 messageQueueList 为空
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
// 2.1 添加空路由对象
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 2.2 更新路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
// 2.3 从更新后的路由表中获取路由信息
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
// 2.4 获取到了就返回
if (topicPublishInfo.isHaveTopicRouterInfo() || (topicPublishInfo != null && topicPublishInfo.ok())) {
return topicPublishInfo;
} else {
// 3 没有获取到路由信息则从注册中心获取
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
从上面的源码可以看出获取路由信息的步骤如下:
1.先从本地topicPublishInfoTable中获取路由信息
2.如果路由信息或messageQueueList为空,则尝试本地更新一下路由信息
3.本地更新PublishInfo路由信息,并尝试获取
4.如果此时能获取到路由信息了,则返回TopicPublishInfo对象
5.本地无法获取到路由信息,则从注册中心尝试获取并更新本地缓存
Topic 路由信息表
上述过程的第一步就是获取路由信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
其中路由信息存储在TopicPublishInfo
对象里:
![](https://img.haomeiwen.com/i10995913/8adda161c45e9c5b.png)
各个字段含义如下:
• orderTopic
:Topic
是否支持排序
• haveTopicRouterInfo
:是否存在路由信息
• messageQueueList
:消息队列List
• sendWhichQueue
:生产者发送消息到哪个队列的索引
• topicRouteData
:路由数据,包括队列、Broker地址、Broker数据
此外,TopicPublishInfo
类还提供了选择某个队列发送消息的默认负载均衡策略:
/**
* 默认【轮询】策略选择一个MessageQueue
*
* @param lastBrokerName
* @return
*/
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
/**
* 选择一个消息队列
*
* @return
*/
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
从上面代码可以看出,默认的选择策略是采用轮询的方法:
lastBrokerName == null
时,说明在此之前还没有进行过选择,直接返回第一个可用的消息队列
lastBrokerName != null
时,且当前轮询到的消息队列不是上一次使用的,则返回当前队列,否则轮询至下一个
更新路由信息
两个子方法
根据tryToFindTopicPublishInfo
的源码,接下来会进行更新路由信息的步骤,访问的主要是MQClientInstance
类下的updateTopicRouteInfoFromNameServer
方法,该方法又调用了两个关键的方法,分别是topicRouteData2TopicPublishInfo
和topicRouteData2TopicSubscribeInfo
-
topicRouteData2TopicPublishInfo
方法的作用是将TopicRouteData
类转换成TopicPublishInfo
,并过滤掉Master挂了的Slave的MessageQueue
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
info.setTopicRouteData(route);
// 如果指定了Topic的Queue的发送顺序
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
// 解析配置文件,创建消息队列
String[] brokers = route.getOrderTopicConf().split(";");
for (String broker : brokers) {
String[] item = broker.split(":");
int nums = Integer.parseInt(item[1]);
for (int i = 0; i < nums; i++) {
MessageQueue mq = new MessageQueue(topic, item[0], i);
info.getMessageQueueList().add(mq);
}
}
// 设置Topic是有序的(消息的发送顺序按配置来)
info.setOrderTopic(true);
} else {
List<QueueData> qds = route.getQueueDatas();
Collections.sort(qds);
// 找到每个QueueData的BrokerData
for (QueueData qd : qds) {
if (PermName.isWriteable(qd.getPerm())) {
BrokerData brokerData = null;
for (BrokerData bd : route.getBrokerDatas()) {
if (bd.getBrokerName().equals(qd.getBrokerName())) {
brokerData = bd;
break;
}
}
if (null == brokerData) {
continue;
}
// 如果BrokerData中没有Master节点id,可能Master挂了,此时不处理消息
if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
continue;
}
// 创建消息队列
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);
}
}
}
// 设置Topic消息发送是无序的
info.setOrderTopic(false);
}
return info;
}
topicRouteData2TopicSubscribeInfo
方法作用是提取TopicRouteData
内的QueueData
字段,生成消息队列,也就是订阅了该Topic的队列
public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
Set<MessageQueue> mqList = new HashSet<MessageQueue>();
List<QueueData> qds = route.getQueueDatas();
for (QueueData qd : qds) {
// QueueData是否可读,只有是可读的才能被订阅
if (PermName.isReadable(qd.getPerm())) {
for (int i = 0; i < qd.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
mqList.add(mq);
}
}
}
return mqList;
}
介绍完了updateTopicRouteInfoFromNameServer
方法里调用的两个子方法之后,下面就来看一下updateTopicRouteInfoFromNameServer
的代码。
updateTopicRouteInfoFromNameServer
更新路由信息是消息投递过程中非常重要的一环,为了防止并发修改注册信息导致数据不一致,这里使用了ReentrantLock
可重入锁。
对于路由消息,就需要注意它可能不存在这种情况
1. 路由消息不存在
第一次访问时,生产者还没有在Broker中创建Topic和消息队列时会发生,此时的解决方案是:如果满足isDefault && defaultMQProducer != null
,则使用默认Topic
来获取路由消息TopicRouteData
![](https://img.haomeiwen.com/i10995913/ccad83bcd689b4ad.png)
![](https://img.haomeiwen.com/i10995913/88be02d275e6dc93.png)
由上面两张图可以清楚看到,默认Topic名称为TBW102
但如果默认主题获取到的TopicRouteData
实例为空呢?此时就要根据Topic名称从注册中心查询了,如果还查询不出来的话就会返回false
2. 路由消息不存在,但是从注册中心获取到了
此时就需要判断本地的路由表和注册中心获取到的路由信息是否有差异,如果差异存在话就把本地路由信息更新为最新版本
上面所有文字部分对应的源码如下:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
// 使用默认的TopicKey尝试获取TopicRouteData
// 当Broker开启自动创建Topic时,会自动进行创建
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
// 判断本地路由表存放的信息和远端注册中心存放的信息
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
if (changed) {
// 克隆的原因是topicRouteData要被设置到下面的publishInfo和subscribeInfo里
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
// 更新Broker相关信息,当某个Broker心跳超时后,会被从BrokerData的BrokerAddrs中移除
// brokerAddrTable也存有Slave的BrokerAddr
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update Pub info
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Map.Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
// Update sub info
{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Map.Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
log.info("topicRouteTable.put TopicRouteData[{}]", cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
}
} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} finally {
this.lockNamesrv.unlock();
}
} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LockTimeoutMillis);
}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
return false;
}
网友评论