一、摘要
- 默认消息发送超时时间为3s
- 默认消息发送是同步的发送模式,同步发送会发送1+重试次数,默认重试2,一共3次
- 消息内容不能为0,也不能超过4M
- 同步消息发送才会有重试机制,异步发送和oneway发送模式都只有一次发送机会。同步发送 1+重试次数(默认2)
- pull模式、push模式启动的时候都不会检查nameserv,pull模式在fetchqueue时没有nameserv时会报错,push模式没有nameserv不会报错
二、简单实例(官方实例)
- 发送实例
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
- pull接收实例
public class PullConsumer {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("ProducerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.start();
// 从nameserv上拉取的topic的队列信息
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TOPIC_TEST");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
// 远程拉取消息,拉取32个
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND://找到消息
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
for (MessageExt m : messageExtList) {
System.out.println("msgId:"+m.getMsgId());
System.out.println(new String(m.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
// consumer.shutdown();
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
}
- push接收实例
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PUSH_CONSUME_GROUP");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TOPIC_TEST", "*");// topic tag
/**
* 全新的消费组 才适用这些策略
* CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息
* CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
* CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
*/
// consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// consumer.setConsumeTimestamp("20170422221800"); //时间格式 yyyyMMddHHmmss
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
for(MessageExt msg:msgs){
System.out.println("msgId:"+msg.getMsgId() + " body:" + new String(msg.getBody()));
}
/**
* CONSUME_SUCCESS 消费成功
* RECONSUME_LATER 重试消费,重试次数可以设置,默认是16次
*/
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
三、RocketMQ发送普通消息的全流程解读
① DefaultMQProducer的启动流程
- 入口:
DefaultMQProducerImpl.start()
-
初始化得到 MQClientInstance 实例对象
- MQClientInstance 的理解:
- MQClientInstance 封装了 RocketMQ 网络处理的API,是Producer、Consumer与NameServer、Broker打交道的网络通道
- 同一个JVM中不同 consumer 和 producer 获取的 MQClientInstance 实例是同一个
- MQClientInstance 创建过程:
- 先设置 clientId,clientId 组成有 ip@pid@unitname,unitname 可选,例如:10.204.246.26@27066
- 根据 clientId 从 factoryTable 中获取,也就是缓存 map 中获取。没有则new一个 MQClientInstance
- MQClientInstance 的理解:
-
注册 Producer
- 将当前 Producer 缓存到 MQClientInstance 实例的 producerTable 成员变量中
- key 为 Producer 的名称,value 为当前 Producer 的实例
- 将默认Topic(“TBW102”)保存至本地缓存变量 topicPublishInfoTable 中
-
MQClientInstance 实例对象调用自己的
start()
方法,启动一些客户端本地的服务线程,如拉取消息服务、客户端网络通信服务、重新负载均衡服务以及其他若干个定时任务- Producer 感知 topic 的路由信息变化是通过定时线程,每30s去 nameserver 拉取,然后对本地缓存的路由信息更新
- 每30s向 Broker 发送一次心跳
- 每30s更新本地缓存的存活 Broker
private void startScheduledTask() { //如果没有namesrvAddr,则启动定时器获取namesrvAddr地址(2分钟执 行1次) if (null == this.clientConfig.getNamesrvAddr()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr(); } catch (Exception e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } //每30秒更新一次所有的topic的路由信息,延迟10毫秒执行 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); // 每30秒对下线的broker进行移除 // 每30秒发送一次心跳 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.cleanOfflineBroker(); MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); // 持久化消费端offSet,每5s执行一次 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); //定时调整消费者端线程池大小 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.adjustThreadPool(); } catch (Exception e) { log.error("ScheduledTask adjustThreadPool exception", e); } } }, 1, 1, TimeUnit.MINUTES); }
-
最后向所有的Broker代理服务器节点发送心跳包
image.png
② send发送方法的核心流程
-
消息校验
消息的校验主要就是校验topic以及消息体的长度,发送消息的最大长度不能超过4M -
尝试获取TopicPublishInfo的路由信息
先从缓存里面取,缓存没有则从nameserver取,取到之后会放到缓存
我们一步步debug进去后会发现在sendDefaultImpl()方法中先对待发送的消息进行前置的验证。
调用tryToFindTopicPublishInfo()
方法,根据待发送消息的中包含的Topic尝试从Client端的本地缓存变量topicPublishInfoTable中查找,如果没有则会从NameServer上更新Topic的路由信息(其中,调用了MQClientInstance实例的updateTopicRouteInfoFromNameServer
方法,最终执行的是MQClientAPIImpl实例的getTopicRouteInfoFromNameServer
方法)
这里分别会存在以下两种场景:- 生产者第一次发送消息(此时,Topic在NameServer中并不存在):
因为第一次获取时候并不能从远端的NameServer上拉取下来并更新本地缓存变量topicPublishInfoTable成功。因此,第二次需要通过默认Topic—TBW102的TopicRouteData变量来构造TopicPublishInfo对象,并更新DefaultMQProducerImpl实例的本地缓存变量topicPublishInfoTable。
另外,在该种类型的场景下,当消息发送至Broker代理服务器时,在SendMessageProcessor业务处理器的sendBatchMessage/sendMessage
方法里面的super.msgCheck(ctx, requestHeader, response)
消息前置校验中,会调用TopicConfigManager的createTopicInSendMessageMethod
方法,在Broker端完成新Topic的创建并持久化至配置文件中(配置文件路径:{rocketmq.home.dir}/store/config/topics.json) - 生产者发送Topic已存在的消息:由于在NameServer中已经存在了该Topic,因此在第一次获取时候就能够取到并且更新至本地缓存变量中topicPublishInfoTable,随后
tryToFindTopicPublishInfo
方法直接可以return。
- 生产者第一次发送消息(此时,Topic在NameServer中并不存在):
public class TopicPublishInfo {
//是否是顺序消息
private boolean orderTopic = false;
//是否存在路由信息
private boolean haveTopicRouterInfo = false;
//改topic对应的逻辑队列,每一个逻辑队列就对应一个MessageQueue
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
//用于选择消息队列的值,每选择一次消息队列,该值会自增1。
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
// topic路由元数据,如borker的元信息和队列的元信息
private TopicRouteData topicRouteData;
//....
//....
}
//数据结构如下:
TopicPublishInfo [
orderTopic=false,
messageQueueList=[MessageQueue [topic=TopicTest, brokerName=Silence.local, queueId=0], MessageQueue [topic=TopicTest, brokerName=Silence.local, queueId=1], MessageQueue [topic=TopicTest, brokerName=Silence.local, queueId=2], MessageQueue [topic=TopicTest, brokerName=Silence.local, queueId=3]],
sendWhichQueue=ThreadLocalIndex{threadLocalIndex=null},
haveTopicRouterInfo=true]
/**
* 根据msg的topic从topicPublishInfoTable获取对应的topicPublishInfo
* 如果没有则更新路由信息,从nameserver端拉取最新路由信息
*
* topicPublishInfo
*
* @param topic
* @return
*/
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
//step1.先从本地缓存变量topicPublishInfoTable中先get一次
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//step1.2 然后从nameServer上更新topic路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
//step2 然后再从本地缓存变量topicPublishInfoTable中再get一次
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
/**
* 第一次的时候isDefault为false,第二次的时候default为true,即为用默认的topic的参数进行更新
*/
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
/**
* 本地缓存中不存在时从远端的NameServer注册中心中拉取Topic路由信息
*
* @param topic
* @param timeoutMillis
* @param allowTopicNotExist
* @return
* @throws MQClientException
* @throws InterruptedException
* @throws RemotingTimeoutException
* @throws RemotingSendRequestException
* @throws RemotingConnectException
*/
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
//设置请求头中的Topic参数后,发送获取Topic路由信息的request请求给NameServer
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
//这里由于是同步方式发送,所以直接return response的响应
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
//如果NameServer中不存在待发送消息的Topic
case ResponseCode.TOPIC_NOT_EXIST: {
if (allowTopicNotExist && !topic.equals(MixAll.DEFAULT_TOPIC)) {
log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
}
break;
}
//如果获取Topic存在,则成功返回,利用TopicRouteData进行解码,且直接返回TopicRouteData
case ResponseCode.SUCCESS: {
byte[] body = response.getBody();
if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);
}
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}

-
选择消息发送的队列
在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下,selectOneMessageQueuef()
方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。具体的容错策略均在MQFaultStrategy这个类中定义private boolean sendLatencyFaultEnable = false
通过一个sendLatencyFaultEnable开关来进行选择采用下面哪种方式:-
sendLatencyFaultEnable开关打开
在随机递增取模的基础上,再过滤掉not available的Broker代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550Lms,就退避3000Lms;超过1000L,就退避60000L -
sendLatencyFaultEnable开关关闭(默认关闭)
采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息
-
sendLatencyFaultEnable开关打开
/**
* 根据sendLatencyFaultEnable开关是否打开来分两种情况选择队列发送消息
* @param tpInfo
* @param lastBrokerName
* @return
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
//1.在随机递增取模的基础上,再过滤掉not available的Broker代理;对之前失败的,按一定的时间做退避
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
//2.采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
-
发送封装后的RemotingCommand数据包
在选择完发送消息的队列后,RocketMQ就会调用sendKernelImpl()
方法发送消息(该方法为,通过RocketMQ的Remoting通信模块真正发送消息的核心)。在该方法内总共完成以下几个步流程:- 根据前面获取到的MessageQueue中的brokerName,调用MQClientInstance实例的
findBrokerAddressInPublish()
方法,得到待发送消息中存放的Broker代理服务器地址,如果没有找到则跟新路由信息 - 如果没有禁用,则发送消息前后会有钩子函数的执行
executeSendMessageHookBefore()
、executeSendMessageHookAfter()
方法 - 将与该消息相关信息封装成RemotingCommand数据包,其中请求码
RequestCode
为以下几种之一:- A.SEND_MESSAGE(普通发送消息)
- B.SEND_MESSAGE_V2(优化网络数据包发送)
- C.SEND_BATCH_MESSAGE(消息批量发送)
- 根据获取到的Broke代理服务器地址,将封装好的RemotingCommand数据包发送对应的Broker上,默认发送超时间为3s
- 这里,真正调用RocketMQ的Remoting通信模块完成消息发送是在MQClientAPIImpl实例
sendMessageSync()
方法中 - processSendResponse方法对发送正常和异常情况分别进行不同的处理并返回sendResult对象
- 发送返回后,调用updateFaultItem更新Broker代理服务器的可用时间
- 对于异常情况,且标志位
retryAnotherBrokerWhenNotStoreOK
,设置为true时,在发送失败的时候,会选择换一个Broker
- 根据前面获取到的MessageQueue中的brokerName,调用MQClientInstance实例的
③ Broker代理服务器的消息处理简析
Broker代理服务器中存在很多Processor业务处理器,用于处理不同类型的请求,其中一个或者多个Processor会共用一个业务处理器线程池。对于接收到消息,Broker会使用SendMessageProcessor这个业务处理器来处理。SendMessageProcessor会依次做以下处理:
- 消息前置校验,包括broker是否可写、校验queueId是否超过指定大小、消息中的Topic路由信息是否存在,如果不存在就新建一个
- 构建MessageExtBrokerInner
- 调用
brokerController.getMessageStore().putMessage
将MessageExtBrokerInner做落盘持久化处理 - 根据消息落盘结果(正常/异常情况),BrokerStatsManager做一些统计数据的更新,最后设置Response并返回
四、总结
消息发送的核心逻辑在sendKernelImpl方法,这里简单归纳下,主要做了以下几件事:
- 根据对应的messageQuene获取broker网络地址。
- 为消息分配全局的唯一id
- 压缩消息,如果消息体大小超过compressMsgBodyOverHowmuch配置的值(默认4K),则进行压缩
- 是否存在发送消息钩子sendMessageHookList,存在则执行钩子
- 构建消息发送的请求头SendMessageRequestHeader
- 根据不同发送方式进行消息的发送。如果失败进入循环重试
- 同步发送(SYNC):同步阻塞等待broker处理完消息后返回结果
-
异步发送(ASYNC):
不阻塞等待broker处理消息的结果,通过提供回调方法,响应消息发送结果。
这种方式的发送,RocketMQ做了并发控制,通过clientAsyncSemaphoreValue参数控制,默认值是65535。
异步发送消息的消息重试次数是通过retryTimesWhenSendAsyncFailed控制的,但如果网络出现异常是无法发生重试的 - 单向发送(ONEWAY):不关心消息发送是否成功,只管发送
- 继续判断发送消息钩子,有则执行钩子的after逻辑
网友评论