
书接上文,今天我们来说说RocketMQ生产者发送消息
生产者发送消息

生产者发送消息的主要流程图如上图所示。具体的代码由于比较多,我就不在这边贴出来的。
主要讲一下我认为比较重要的点
消息队列负载均衡
Producer会每隔30s从Namesrv获取最新的Topic路由信息,并缓存到本地
路由信息就是用来发送时选择具体的Broker和队列的
关于异步消息
异步消息需要我们单独加一个回调方法,添加在发送消息成功/失败的一些处理。
因为异步消息没有对Broker回来的结果进行额外的处理,那么自然我们就不能像同步消息一样,对Broker返回回来的结果单独针对SendResult进行单独的重试操作。所以需要我们在失败的回调方法上进行额外的处理
具体的原因,我们可以看下消息发送的主干逻辑
消息发送的主干逻辑
另外我在这里贴一下消息发送的主干代码(具体的代码和它们的注释可以看下我的github 我是一个超链接)
// ... 省略部分代码
// 查找主题路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
// ... 省略部分代码
for (; times < timesTotal; times++) {
// 上次发送的broker名称
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 选择一条messagequeue(轮询+失败规避)
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
// ... 省略部分代码
try {
// ... 省略部分代码
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
// 失败规避
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
// 发送失败重试
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (/** 一系列异常处理 **/) {
}
}
}
}
失败规避
我在上面反复说失败规避,到底什么是失败规避呢?
在我们一次消息发送过程中,消息有可能发送失败。在消息发送失败,重试时选择发送消息的队列时,就会规避上次MessageQueue所在的Broker。这样能够减少很多不必要的请求(因为Broker宕机后,很大情况下这个Broker短时间内依旧是无法使用的)
那么,为什么会有宕机的Broker在我们的内存中存在?
因为NameSrv是根据心跳检测来确定Broker是否可用的(有间隔 10s),且消息生产者更新路由信息也是有间隔的(30s)。且为了Namesrv设计的简单,Namesrv不会主动将Broker宕机的信息推给消息生产者,而是需要消息生产者定时更新的时候,才会感知到Broker宕机。
在这期间存在误差,所以我们是要一个机智(失败规避策略)来减少一些不必要的性能消耗
网友评论