总览
消息消费分为两种形式并发消费、顺序消费;这次主要讲并发消费。
消息从Broker拉取到客户端之后,等待客户端进行消息消费。拉取消息的方法会将拉取到的消息提交到消息消费的线程池中,供ConsumeMessageService消息消费服务并发消费消息。
- DefaultMQPushConsumerImpl#pullMessage拉取到消息,提交消息消费请求到消息消费线程池中去,根据拉取到消息条数,进行消息不同分批处理,然后构造ConsumeRequest消息消费请求对象。
DefaultMQPushConsumerImpl#pullMessage
// 提交消息消费请求到消息消费线程池中去
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
/**
* 提交消息消费到消息消费线程池中去,供消费者消息消费
* @param msgs 消息列表,默认一次从服务器最多拉取32条消息
* @param processQueue 消息处理队列
* @param messageQueue 消息所属队列
* @param dispatchToConsume 是否转发到消费线程池,并发消费时忽略该参数
*/
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
// 一次消息消费任务ConsumeRequest中包含的消息条数,默认为1,msgs.size()默认最多为32条,
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
// 1条消息,直接放入到ConsumeRequest中,然后将ConsumeRequest提交到消息消费者线程池中,
// 如果提交过程中出现拒绝提交异常则延迟5s再提交,这里其实是给出了一种标准的拒绝提交实现方式,
// 实际过程中由于消费者线程池使用的任务队列为LinkedBlockingQueue无界队列,故不会出现拒绝提交异常。
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
// 如果拉取的消息条数大于consumeBatchSize,则对拉取消息进行分页,每页consumeBatchSize条消息,
// 创建多个ConsumeRequest任务并提交到消费线程池。
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
ConsumeRequest
- ConsumeRequest实现了Runnable接口,可以被提交到消息消费的线程池中,被并发消费。run()方法实现了消息消费的过程。获取消息消费的listener对象,还原真实的topic(将在延迟消息详细介绍),执行钩子函数,进行listener的消息消费,执行后置钩子函数,处理消息消费的返回结果。
@Override
public void run() {
// 首先会检测dropped属性,如果为true,则停止该队列消费,在进行消息重新负载均衡时,
// 如果该消费队列被分配给消费者组内,其他消费者后,需要设置dropped为true,阻止消费者继续消费不属于自己的消费队列。
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
// 每次消费都是新的ConsumeConcurrentlyContext,所以delayLevelWhenNextConsume=0
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
// 恢复重试消息主题名。
// RocketMQ将消息存入commitlog文件时,如果发现消息的延迟级别delayTimeLevel大于0,会首先将重试主题存入在消息的属性中,
// 然后设置主题名称为SCHEDULE_TOPIC,以便消息到后重新参与消息消费。
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// 执行钩子函数
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
// 执行具体的消息消费,调用应用程序注册的消息监听器MessageListenerConcurrently的consumeMessage()方法,进入到具体的消息消费业务逻辑,返回该批消息的返回结果。
// 最终返回CONSUME_SUCCESS,或RECONSUME_LATER需要重新消费
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 执行消息消费后置钩子
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
// 执行业务消息后,在处理结果前再次验证一下ProcessQueue的isDropped状态值,如果设置为true,将不对结果进行处理,
// 也就是说如果在消费者消费过程中,如果由于新的消费者加入或原先的消费者出现宕机导致原先分配给消费者的队列,在负载之后分配给别的消费者,
// 那么在应用程序的角度来看的话,消息会被重复消费。
if (!processQueue.isDropped()) {
// 如果processQueue没有被删除,处理消息消费返回结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}
- 处理消息消费的返回结果。广播消费模式消费失败,消息并不会被重新消费,只是打印了日志。集群消费模式消费失败会发送sendMessageBack()消费失败的ACK应答给Broker端,发送ACK应答失败,延迟5秒再次唤醒再次提交消费请求到消费线程池中,重新进行消息消费。从ProcessQueue移除这批消息,返回偏移量是移除该批消息后ProcessQueue未被消费消息的开始的偏移量,然后用该偏移量更新消息消费进度,以便在消费者重启后能从上一次的消费进度开始消费,避免消息重复消费。
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
switch (status) {
// 消费成功,设置ackIndex=getMsgs().size() - 1;
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
// 消费失败,设置ackIndex = -1,进行消费重新消费
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 广播消费模式:
// 消息并不会被重新消费,只是打印了日志
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
// 集群消费模式:
// i= getMsgs().size() -1 + 1,故等于getMsgs().size(),并不会执行sendMessageCallBack,
// 只有业务方返回RECONSUME_LATER时,该批消息都需要发送ACK消息,如果消费发送ACK失败,
// 则直接将本批ACK消息发送失败的消息再次封装为ConsumeRequest,然后延迟5s后重新消费。
// 如果ACK消息发送成功,则该消息会延迟消费。
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
// 消费者消费消息失败,然后延迟消费,发送延迟消费消息请求给Broker,作为ACK应答消息
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
// 发送CallBack失败,延迟5秒再次唤醒再次提交消费请求到消费线程池中,重新进行消息消费。
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
// 从ProcessQueue移除这批消息,返回偏移量是移除该批消息后ProcessQueue未被消费消息的开始的偏移量,
// 然后用该偏移量更新消息消费进度,以便在消费者重启后能从上一次的消费进度开始消费,避免消息重复消费。
// 值得重点注意的是当消息监听器返回RECONSUME_LATER,消息消费进度也会向前推进,用ProcessQueue未被消费消息的开始的偏移量调用消息消费
// 进度存储器OffsetStore更新消费进度,这是因为当返回RECONSUME_LATER, RocketMQ会创建一条与原先消息属性相同的消息,拥有一个唯一的新msgId,
// 并存储原消息id,该消息会存入到commitlog文件中,与原先的消息没有任何关联,那该消息当然也会进入到ConsuemeQueue队列中,将拥有一个全新的队列偏移量。
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
// 消息消费成功,更新消费进度
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
- 消费端消费消息失败,发送ACK应答给Broker端:
1.1 发送CallBack成功;
1.2 发送CallBack过程中异常;
1.3 步骤1.1、1.2执行完毕后,都会执行1.3。
延迟消息、消息重试后面将会详细介绍。
1.1 发送CallBack成功;broker接受请求,新建%RETRY%+groupNametopic,判断delayLevel>16或者delayLevel<0,进入死信队列,如果delayLevel=0,变为delayLevel=消费次数+3;否则delayLevel=delayLevel;新建Message,将Message存入commitlog
1.2 发送CallBack异常:新建一个Topic为%RETRY%+groupName的消息,延迟级别在消费次数上+3,client端的producer发送新消息到broker;broker处理进入commitlog,进行消息topic转换为SCHEDULE_TOPIC_XXXX,queueId=delayLevel - 1;
原topic%RETRY%+groupName放入新message的属性中,还有queueId,也放入消息属性中。存入commitlog,然后延迟队列定时任务进行延迟消息处理。
1.3 步骤1.1,1.2之后,然后统一处理:ScheduleMessageService的TimerTask执行定时任务,从延迟队列取出这个消息,根据topic,queueId获取consumequeue,从consumequeue中获取commitlog的message,再将message的REAL_TOPIC、REAL_QID属性进行topic、queueId还原,再次存入到commitlog文件中,等待消费者消费。
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
// 发送CallBack成功,延时消息在Broker端进行处理
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
// 发送CallBack失败,新建一个Topic为%RETRY%+groupName的消息,延迟级别在消费次数上+3,client端的producer发送新消息到broker
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
// 将原Topic放入RETRY_TOPIC属性中
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
// 清空消息事务属性
MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 延迟级别+3
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
// producer发送新的消息到broker端
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}
死信队列
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在RocketMQ中,可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
- Broker端处理client端发送的ACK请求,会将这条消息重新存入commitlog中,通过延迟消息进行处理。延迟消息、消息重试将在下节详细讲解。
5.1 创建重试主题,名称:%RETRY%+消费者组名称,并从重试队列中随机选择一个队列,并构建TopicConfig主题配置信息。
5.2 根据消息物理偏移量从commitlog文件中获取消息,同时将消息的主题存入属性中。
5.3 设置消息重试次数,如果消息已重试次数超过maxReconsumeTimes,再次改变newTopic主题为DLQ(%DLQ%)死信队列,该主题权限为只写,说明消息一旦进入到DLQ队列中,RocketMQ将不负责再次调度进行消费了,需要人工干预。
5.4 根据原先的消息创建一个新的消息对象,重试消息会拥有自己的唯一消息id并存入到commitlog中,并不会去更新原先的消息,而是将原先的主题、消息Id存入消息的属性中,主题名称为重试主题,其他与原先消息保持相同。
5.5 将消息存入到commitlog文件中,前面已经介绍,这里重点突出一个机制,消息重试机制依托于定时任务实现。
// Client发送延迟消费消息请求给Broker,Broker做出返回消息回应。
// 延迟消费消息(ACK消息)存入commitlog文件后,将依托于RocketMQ定时消息机制在延迟时间到期后在次将消息拉取,如果在发送过程中失败,将记录所有发送ACK消息失败的消息,
// 然后再次封装成ConsumeRequest,延迟5s执行。
private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
// 消费者消费消息失败,然后延迟消费,发送延迟消费消息请求
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
ConsumeMessageContext context = buildConsumeMessageContext(namespace, requestHeader, request);
this.executeConsumeMessageHookAfter(context);
}
// 消费者组订阅配置信息
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return CompletableFuture.completedFuture(response);
}
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
return CompletableFuture.completedFuture(response);
}
if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return CompletableFuture.completedFuture(response);
}
// 创建重试主题,名称:%RETRY%+消费者组名称,并从重试队列中随机选择一个队列,并构建TopicConfig主题配置信息。
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
// 创建新的topic
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return CompletableFuture.completedFuture(response);
}
if (!PermName.isWriteable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
return CompletableFuture.completedFuture(response);
}
// 根据消息物理偏移量从commitlog文件中获取消息,同时将消息的主题存入属性中。
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return CompletableFuture.completedFuture(response);
}
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);
int delayLevel = requestHeader.getDelayLevel();
// 设置消息重试次数,如果消息已重试次数超过maxReconsumeTimes,再次改变newTopic主题为DLQ(%DLQ%)死信队列,该主题权限为只写,
// 说明消息一旦进入到DLQ队列中,RocketMQ将不负责再次调度进行消费了,需要人工干预。
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
//
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return CompletableFuture.completedFuture(response);
}
} else {
// broker处理client端发送callBack处理重试消息,client发送的重试消息的deLayLevel永远为0;不管这是第几次重试;延迟队列的level升级是根据消费次数的增加来增加延时队列的级数。
// delayLevel的来源:ConsumeMessageConcurrentlyService#sendMessageCallBack#int delayLevel = context.getDelayLevelWhenNextConsume()
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}
// 根据原先的消息创建一个新的消息对象,重试消息会拥有自己的唯一消息id并存入到commitlog中,并不会去更新原先的消息,
// 而是将原先的主题、消息Id存入消息的属性中,主题名称为重试主题,其他与原先消息保持相同。
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
// 将消息存入到commitlog文件中,前面已经介绍,这里重点突出一个机制,消息重试机制依托于定时任务实现。
CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
return putMessageResult.thenApply((r) -> {
if (r != null) {
switch (r.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}
// 统计信息正确topic的信息
this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
default:
break;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(r.getPutMessageStatus().name());
return response;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
return response;
});
}
网友评论