在前面看了消费者并发消费的源码。得知消费者在消费消息失败之后,会尝试将消息sendback, 当sendback失败之后再进行重试消费。但是对于sendback这个机制还是不太清楚。所以看了一下sendback的源码。
首先还是定位一下入口。
public class MQClientAPIImpl {
public void consumerSendMessageBack(
final String addr,
final MessageExt msg,
final String consumerGroup,
final int delayLevel,
final long timeoutMillis,
final int maxConsumeRetryTimes
) throws RemotingException, MQBrokerException, InterruptedException {
ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
requestHeader.setGroup(consumerGroup);
requestHeader.setOriginTopic(msg.getTopic());
requestHeader.setOffset(msg.getCommitLogOffset());
requestHeader.setDelayLevel(delayLevel);
requestHeader.setOriginMsgId(msg.getMsgId());
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
}
从请求头上来看,设置了消费者对应的消费组,以及原有的topic,以及消息的偏移量,以及延迟级别,以及原有的消息id,还有设置最大重试次数。
RequestCode.CONSUMER_SEND_MSG_BACK = 36.
那么Broker是如何处理sendback的消息
还是先来看看源码,首先找到入口。在rocketmq里面,NettyRequestProcessor为专门用于处理请求的一个channelHandler.有不同的请求处理器针对不同的请求。
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.asyncConsumerSendMsgBack(ctx, request);
default:
省略无关代码。
}
}
}
broker是如何处理返回的消息
从上面代码看,无疑是asyncConsumerSendMsgBack方法。
因此下面来看看实现的细节
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
省略部分代码。
根据消费组的group,找到对应的SubscriptionGroupConfig。
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
如果为Null,直接返回响应
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return CompletableFuture.completedFuture(response);
}
如果没有写权限,直接返回响应
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
return CompletableFuture.completedFuture(response);
}
如果可重试的队列<=0,直接返回,说明不可以进行重试。
if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return CompletableFuture.completedFuture(response);
}
生成新的topic,%RETRY% + 消费组的group
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
随机找一个重试队列
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
创建topic,并且将新的topic配置同步到注册中心,用于生成新的路由数据,这样子消费者才找到的消息。
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
如果top创建失败,或者不存在,则返回响应
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return CompletableFuture.completedFuture(response);
}
如果没有写的权限,则返回响应
if (!PermName.isWriteable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
return CompletableFuture.completedFuture(response);
}
根据偏移量找回以前的消息
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
如果消息不存在的话,则直接返回响应
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return CompletableFuture.completedFuture(response);
}
如果该消息没有被重试过,不会有对应的重试主题属性,需要设置
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);
延迟级别
int delayLevel = requestHeader.getDelayLevel();
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
如果消息的重试次数以及大于最大的重试次数,或者delayLevel<0.
则将消息放入死信队列
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
生成新的主题,%DLQ%+消费组的group
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
随机生成队列id
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
在broker中创建新的topic,并且同步到注册中心
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0);
主题不存在,或者创建失败,则直接返回响应
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return CompletableFuture.completedFuture(response);
}
} else {
if (0 == delayLevel) {
设置延迟级别,在保存之前会调整topic改为SCHEDULE_TOPIC_XXXX。暂时先不管这块是如何实现的
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}
-----------------------------------------------------------------------------------------------------------------------------------------------
生成新消息,绝大多数属性都是原有消息的。
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
//设置新主题
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
--------------------------------------------------------------------------------------------------------------------------------------------
对新生成的消息进行存储。存储后返回响应
CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
return putMessageResult.thenApply((r) -> {
if (r != null) {
switch (r.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}
this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
default:
break;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(r.getPutMessageStatus().name());
return response;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
return response;
});
}
}
从上面的代码来看,消息如果消费失败,那么在mq中,是不会将一个消息进行重复消费的(除非broker挂了,消费者无法Sendback,则只能对一个消息多消费几次),其实是采取不断生成新消息的方式来(大多数内容采取复制,新消息有自己的msgId,以及不同的topic,但是会将一些原有的信息保留,比如原来的topic),来进行重复消费。
topic最后分为几类。
1.自定义topic
这里没太特殊,其实就是我们自定义的topic
2.重试主题
%RETRT%前缀 + GROUP 比如一个消费者的GROUP为group1。那么重试主题就为%RETRY%group1
如果消息消费失败,且允许重试消费的话,就会复制一个新消息,保存到%RETRY%GROUP中。
消费者一开始会默认创建重试的主题的订阅信息,用来保证对重试消息的消费。
源码如下
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private void copySubscription() throws MQClientException {
try {
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subString);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
集群模式下,需要设置重试主题
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
}
3.调度主题
在重复次数没超的情况下,会根据重复次数设置延时级别,如果延时级别越高的话,那么延时的时间则越长,放入SCHEDULE_TOPIC_XXXX队列。在保存之前,根据消息的延时级别,判断是否要修改主题。
public class CommitLog {
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
msg.setStoreTimestamp(System.currentTimeMillis());
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
修改主题为"SCHEDULE_TOPIC_XXXX"
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
备份原有的topic以及队列id,用于后续转发
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
更新主题
msg.setTopic(topic);
msg.setQueueId(queueId);
return putMessageResult;
});
}
}
SCHEDULE_TOPIC_XXXX是broker内部的一个主题,对于消费者生产者而言都是不可见的,仅仅用于消息的延时生成,会有定时器去将延时消息消费,将延时消息转发到原来对应的队列。
举个例子,某消息被消费失败了,随后进行重试消费,topic改为%RETRY%group1,随后发现需要延时,在保存消息时候,则将原来的主题(%RETRY%group1)以及队列id等相关信息保存,将消息的主题改为SCHEDULE_TOPIC_XXXX。最后进行保存。
而这个内部的主题SCHEDULE_TOPIC_XXXX是如何被消费的,其实就是有定时器去把主题里面的消息读出来,然后根据那些消息原有的消息以及队列,将消息放到对应的队列中去。
说白了这个叫SCHEDULE_TOPIC_XXXX的主题,其实就是用于将重试的消息延迟放入到对应的队列中。而它的作用相当于,先保存,后转发。
4.死信主题
如果重复次数超过配置的最大重试次数,则进入死信队列。
%DLQ%+消费组的group,死信队列的主题为%DLQ%+消费者的group。
这个是需要人工去干预的。
网友评论