今天我们就开始学习下默认消息发送流程,学习他的实现思路,也帮助我们工作中,遇到了问题不会手足无措。
思考问题
- 消息发送者是如何做负载均衡的?
- 消息发送者是如何保证高可用的?
- 消息发送批量消息如何保证一致性的?
默认发送流程-工作原理
源码入口:
org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)
启动Demo:
DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("xxx:9876");
producer.start();
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
流程:
1. 校验主题,设置主题
msg.setTopic(withNamespace(msg.getTopic()));
public String withNamespace(String resource) {
return NamespaceUtil.wrapNamespace(this.getNamespace(), resource);
}
2. 默认发送方式为同步发送,默认超时时间为3s
private int sendMsgTimeout = 3000;
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
3. 确认 producer service 运行状态是否为运行中
入口: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#makeSureStateOK
//检查状态,如果不是RUNNING状态则抛出异常
private void makeSureStateOK() throws MQClientException {
if (this.serviceState != ServiceState.RUNNING) {
throw new MQClientException("The producer service state not OK, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
}
}
4. 校验信息
-
topic长达是否大于TOPIC_MAX_LENGTH,topic是否为空
-
是否通过正则校验,body是否为空,body大小是否超过4M
public static void checkTopic(String topic) throws MQClientException {
if (UtilAll.isBlank(topic)) {
throw new MQClientException("The specified topic is blank", null);
}
if (topic.length() > TOPIC_MAX_LENGTH) {
throw new MQClientException(
String.format("The specified topic is longer than topic max length %d.", TOPIC_MAX_LENGTH), null);
}
if (isTopicOrGroupIllegal(topic)) {
throw new MQClientException(String.format(
"The specified topic[%s] contains illegal characters, allowing only %s", topic,
"^[%|a-zA-Z0-9_-]+$"), null);
}
}
// body
if (null == msg.getBody()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
}
if (0 == msg.getBody().length) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
5. 找到主题发布的信息,未找到则抛出异常
入口: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo
消息生产者更新和维护路由信息缓存
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 消息生产者更新和维护路由信息缓存
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
6. 通过TopicPublishInfo 找到对应的MessageQueue下的,BrokerName信息
入口: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue
获取到BrokerName对应的MessageQueue信息
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
如果lastBrokerName为null,通过对 sendWhichQueue 方法获取一个队列
取余,然后从messageQueueList中获取一个MessageQueue
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
7. 最后消息发送
入口: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
1. 根绝BrokerName获取到broker地址
在启动阶段,对BrokerAddrTable信息进行了维护
public String findBrokerAddressInPublish(final String brokerName) {
HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
if (map != null && !map.isEmpty()) {
return map.get(MixAll.MASTER_ID);
}
return null;
}
如果未找到,则通过主题查找主题信息,通过更新路由信息后,在尝试获取,如果还未找到则抛出异常
if (null == brokerAddr) {
// 1.1 如果未找到,则通过主题查找主题信息,通过更新路由信息后,在尝试获取,如果还未找到则抛出异常
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
2. 为消息分配全局唯一ID
// 为消息分配全局唯一ID
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
在RocketMQ消息发送-请求与响应文章中,我们已经学习了请求参数中,创建了全局唯一的MsgId,可以回头看一看
3. 注册钩子消息发送钩子函数
这里主要做了三件事情,确认MsgType类型、是否为延迟消息、调用钩子函数内的方法
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 3.1 通过isTrans来确定MsgType类型
if ("true".equals(isTrans)) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
// 3.2 如果msg里面 __STARTDELIVERTIME 或者 DELAY 不为空,则设置为延迟消息
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
// 3.3 调用钩子函数里的方法
this.executeSendMessageHookBefore(context);
}
4. 设置发送信息请求头SendMessageRequestHeader
最后根据默认发送方式,进行消息的发送
主要利用NettyRemotingClient进行发送,这里就先不展开来说了 入口: MQClientAPIImpl.sendMessage()
问题答复
- 消息发送者是如何做负载均衡的?
默认采用轮询,每一个消息发送者全局会维护一个 Topic 上一次选择的队列,然后基于这个序号进行递增轮询
- AllocateMessageQueueAveragely
平均分配,按照总数除以消费者个数进行,对每个消费者进行分配
- AllocateMessageQueueAveragelyByCircle 轮流平均分配,按照消费者个数,进行轮询分配
- 消息发送者是如何保证高可用的?
在上面的步骤中通过TopicPublishInfo 找到对应的MessageQueue下的,BrokerName信息,利用参数sendLatencyFaultEnable来开启关闭故障规避机制
sendLatencyFaultEnable 设置为 true:开启延迟规避机制,一旦消息发送失败会将 broker-a “悲观”地认为在接下来的一段时间内该 Broker 不可用,在为未来某一段时间内所有的客户端不会向该 Broker 发送消息。
使用本次消息发送延迟时间来计算Broker故障规避时长,不参与消息发送队列负载
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
}
但是这样子做可能带来的后果是Broker没有可用的情况,或者是某个Broker数据激增,增加消费者的压力,所以默认不开启规避机制,遇到消息发送失败,规避 broker-a,但是在下一次消息发送时,即再次调用broker-a。
- 消息发送批量消息如何保证一致性的?
将一个Topic下的消息,通过batch方法包一起发送
客户端ID与使用陷阱
摘自丁威老师的文章
总结
这段时间主要学习了RocketMQ的消息发送,主要是以源码为主,深入了解了消息发送的启动和消息发送的流程,以及认识到客户端ID与使用陷阱 一图总结
作者:叫我小郭_
链接:https://juejin.cn/post/7105315713157431332
来源:稀土掘金
网友评论