在前面说到,topic的种类最后有几种。
1.自定义的topic
2.重试的topic
3.延时调度的topic
4.死信的topic
对于这几种topic的生成路径如下:
该图为broker对于sendback消息的处理

那么消费者对于这几种消息是如何进行消费的。
自定义的topic就不说了。关键是对于重试主题,以及死信主题的消费。延时调度的消费为broker自身内部,而重试与死信主题都是被消费者(客户端消费)。
所以只需要关注消费者端对于死信队列或者说重试队列的消费。
重试队列的消费
消费者对于重试主题的订阅
一般我们使用消费者的时候,首先都需要进行自定义主题的订阅
demo如下
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("topic2", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
代码中 consumer.subscribe("topic2", "*")的实现如下,说白了就是往负载服务的订阅表去加数据
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
}
最后消费者进行启动,代码如下
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
//检查配置项
this.checkConfig();
消费者启动的时候,会调用这个方法,这个方法涉及到重试主题的订阅
this.copySubscription();
省略部分代码
}
}
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private void copySubscription() throws MQClientException {
try {
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subString);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
广播模式下,啥也不做
break;
case CLUSTERING:
集群模式下,订阅重试主题
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
}
从源码来看
在集群模式下,对于重试主题的订阅是默认的。
而广播模式下,是没有重试主题的。
如果并发消费&广播 消费失败只会打印日志
如果并发&集群 消费失败会优先sendback,sendback失败则会内部重试
如果顺序消费 消费失败会优先内部重试,超过或者到达最大重试次数,进行sendback,若sendback失败,过一会再进行重试消费
模拟重试队列的消费
首先验证并发消费下的广播模式
生产者的代码如下
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producor_group");
producer.setInstanceName("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i++) {
try {
Message msg = new Message("topic1",
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
消费者的代码如下
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CONSUMER_GROUP");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("topic1", "*");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
由于此处是模拟重试,所以直接返回消费失败
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
运行namesrv以及broker
运行结果如下


启动生产者以及消费者
生产者运行结果

消费者运行结果

从运行结果来看,广播模式下,并发消费者并没有进行重试。
结合一下源码,并发&广播 对于消息是不会重试的,代码位于org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.processConsumeResult(ConsumeConcurrentlyStatus, ConsumeConcurrentlyContext, ConsumeRequest)
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
验证并发消费下的集群模式
生产者代码如下
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("my-pro-group");
producer.setInstanceName("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i++) {
try {
Message msg = new Message("my-topic-1",
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
消费者代码如下,改为集群模式
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consume-group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("my-topic1", "*");
consumer.setMessageModel(MessageModel.CLUSTERING);
重试次数改1次
consumer.setMaxReconsumeTimes(1);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
生产者发送消息的结果如下

消费者进行消费
15:32:13.221 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=1, storeSize=202, queueOffset=0, sysFlag=0, bornTimestamp=1627544011030, bornHost=/172.17.6.76:40906, storeTimestamp=1627544011033, storeHost=/172.17.6.76:10911, msgId=AC11064C00002A9F00000000007D046D, commitLogOffset=8193133, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='my-topic-1', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1627544011044, UNIQ_KEY=7F000001C44073D16E939388B9150000, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=344, queueOffset=21, sysFlag=0, bornTimestamp=1627544011030, bornHost=/172.17.6.76:40906, storeTimestamp=1627544021272, storeHost=/172.17.6.76:10911, msgId=AC11064C00002A9F00000000007D07DE, commitLogOffset=8194014, bodyCRC=613185359, reconsumeTimes=1, preparedTransactionOffset=0, toString()=Message{topic='my-topic-1', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=%RETRY%my-consume-group, ORIGIN_MESSAGE_ID=AC11064C00002A9F00000000007D046D, RETRY_TOPIC=my-topic-1, MAX_OFFSET=22, CONSUME_START_TIME=1627544021274, UNIQ_KEY=7F000001C44073D16E939388B9150000, CLUSTER=DefaultCluster, WAIT=false, DELAY=3, TAGS=TagA, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]
由于默认的重试次数为16,另外重试次数会随着重试次数越多,后面延时调度就会越久去转发消息,所以直接改为1次。从结果上来看是符合预期的。第一次+重复消费的1次=2次
我们此前生成的那条消息,已经进入死信队列了。因为达到最大的重试次数,那么如何消费死信队列的消息?
验证的结论
如果并发消费&广播 消费失败只会打印日志
如果并发&集群 消费失败会优先sendback,sendback失败则会内部重试
那么如何消费死信队列的消息?
上面的步骤,由于设置了最大重试次数为1,然后消费了2次还是失败。消息最终进入死信队列。
那么死信队列该如何消费呢?
先将消费者的代码topic改为死信队列,然后进行debug
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consume-group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("%DLQ%my-consume-group", "*");
//consumer.subscribe("my-topic1", "*");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setMaxReconsumeTimes(1);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
消费者必然会通过topic从注册中心中拉取路由数据。debug图如下。topic为%DLQ%my-consume-group

从上图是可以看出,对于死信队列其实是只有写入权限的。并没有读取权限。
读取到路由数据之后,将路由数据转化成订阅数据

方法位置如下org.apache.rocketmq.client.impl.factory.MQClientInstance.topicRouteData2TopicSubscribeInfo(String, TopicRouteData)
队列权限
public class PermName {
public static final int PERM_PRIORITY = 0x1 << 3;
4为可读
public static final int PERM_READ = 0x1 << 2;
2位可写
public static final int PERM_WRITE = 0x1 << 1;
public static final int PERM_INHERIT = 0x1 << 0;
public static String perm2String(final int perm) {
final StringBuffer sb = new StringBuffer("---");
if (isReadable(perm)) {
sb.replace(0, 1, "R");
}
if (isWriteable(perm)) {
sb.replace(1, 2, "W");
}
if (isInherited(perm)) {
sb.replace(2, 3, "X");
}
return sb.toString();
}
public static boolean isReadable(final int perm) {
return (perm & PERM_READ) == PERM_READ;
}
public static boolean isWriteable(final int perm) {
return (perm & PERM_WRITE) == PERM_WRITE;
}
public static boolean isInherited(final int perm) {
return (perm & PERM_INHERIT) == PERM_INHERIT;
}
}
public class MQClientInstance {
public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
Set<MessageQueue> mqList = new HashSet<MessageQueue>();
List<QueueData> qds = route.getQueueDatas();
for (QueueData qd : qds) {
死信队列的权限为可写2 而读权限为4 2&4=0
if (PermName.isReadable(qd.getPerm())) {
for (int i = 0; i < qd.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
mqList.add(mq);
}
}
}
return mqList;
}
}
由于死信队列的权限为可写,但是消费者订阅信息所需要的权限为可读,所以消费者其实是可以获取到死信队列的路由数据的,然而并没有权限,所以该topic对应的订阅数据最后为空。
而在负载服务进行队列分配的时候,获取不到人任何的队列,所以无法进行消费。代码如下
public abstract class RebalanceImpl {
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
根据主题获取到的队列为空,所以无法进行队列分配,自然无法进行消费了。
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
}
最后如果真的要对死信队列进行消费的话,那么只能对应去修改队列的权限,改为可读。rocketmq这块是提供命令去操作的,需要人为干预。
集群模式下并发消费需要注意的点
避免同个group的消费者订阅的topic不一样
否则会导致消费者可能会消费到非订阅主题的消息!
先来做一个demo
生产者代码如下
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("my-pro-group");
producer.setInstanceName("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i++) {
try {
Message msg = new Message("my-topicA",
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
消费者代码如下
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consume-group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("my-topicA", "*");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setMaxReconsumeTimes(16);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
先后启动生产者与消费者,运行结果如下
由于消费者在集群模式下,且为并发消费,所以会对消息进行重试消费。但是消费者还是消费失败,这里消费者设置最大消费次数为16才失败,所以在该消息没有进入死信队列的时候,将消费者暂停。


此时,消息还没有进入死信队列,将消费者暂停,另外起一个消费者。
代码如下,group是一致的,但是topic不一致。此前并没有往my-topicB中发送消息。所以按理说消费者应该是获取不到任何消息的。然而结果却出乎意料。
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consume-group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("my-topicB", "*");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setMaxReconsumeTimes(16);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
运行结果如下,由于每重试消费一次,下次消费就更久,所以这里只截出一部分。
18:55:19.780 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=343, queueOffset=30, sysFlag=0, bornTimestamp=1627555843019, bornHost=/172.17.6.76:45135, storeTimestamp=1627555969395, storeHost=/172.17.6.76:10911, msgId=AC11064C00002A9F00000000007D4486, commitLogOffset=8209542, bodyCRC=613185359, reconsumeTimes=3, preparedTransactionOffset=0, toString()=Message{topic='my-topicA', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=%RETRY%my-consume-group, ORIGIN_MESSAGE_ID=AC11064C00002A9F00000000007D3D16, RETRY_TOPIC=my-topicA, MAX_OFFSET=31, CONSUME_START_TIME=1627556120586, UNIQ_KEY=7F0000015EA073D16E93943D43CB0000, CLUSTER=DefaultCluster, WAIT=false, DELAY=5, TAGS=TagA, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=343, queueOffset=31, sysFlag=0, bornTimestamp=1627555843019, bornHost=/172.17.6.76:45135, storeTimestamp=1627556240805, storeHost=/172.17.6.76:10911, msgId=AC11064C00002A9F00000000007D4730, commitLogOffset=8210224, bodyCRC=613185359, reconsumeTimes=4, preparedTransactionOffset=0, toString()=Message{topic='my-topicA', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=%RETRY%my-consume-group, ORIGIN_MESSAGE_ID=AC11064C00002A9F00000000007D3D16, RETRY_TOPIC=my-topicA, MAX_OFFSET=32, CONSUME_START_TIME=1627556240807, UNIQ_KEY=7F0000015EA073D16E93943D43CB0000, CLUSTER=DefaultCluster, WAIT=false, DELAY=6, TAGS=TagA, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=343, queueOffset=32, sysFlag=0, bornTimestamp=1627555843019, bornHost=/172.17.6.76:45135, storeTimestamp=1627556420808, storeHost=/172.17.6.76:10911, msgId=AC11064C00002A9F00000000007D49DA, commitLogOffset=8210906, bodyCRC=613185359, reconsumeTimes=5, preparedTransactionOffset=0, toString()=Message{topic='my-topicA', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=%RETRY%my-consume-group, ORIGIN_MESSAGE_ID=AC11064C00002A9F00000000007D3D16, RETRY_TOPIC=my-topicA, MAX_OFFSET=33, CONSUME_START_TIME=1627556420814, UNIQ_KEY=7F0000015EA073D16E93943D43CB0000, CLUSTER=DefaultCluster, WAIT=false, DELAY=7, TAGS=TagA, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]
这里有一个很神奇的地方,第一个消费者A订阅的主题为my-topicA,随后消费失败,消息经过延时调度机制最后放入重试主题的队列。然后在消息没有进入死信队列前,在消费者暂停。启动第二个消费者,也是一个group,但是订阅的主题为my-topicB, 结果居然拉取到my-topicA主题的重试消息。
这种情况下,同一个group的消费者是有可能消费到非自身订阅主题的消息的。
其实说白了就是,同一个消费组中的消费者,是公用同一个重试topic的,所以队列必然也是公用的,必然会使得不同的topic的重试消息会放在同一个队列中的情况。
所以为了避免整个问题,同一个group中的消费者必须完全一致,包括订阅的主题等,否则会可能出现消费其他topic的消息的情况。
网友评论