美文网首页
(十二)消费者如何进行消息的重试消费

(十二)消费者如何进行消息的重试消费

作者: guessguess | 来源:发表于2021-07-28 19:17 被阅读0次

在前面说到,topic的种类最后有几种。
1.自定义的topic
2.重试的topic
3.延时调度的topic
4.死信的topic
对于这几种topic的生成路径如下:
该图为broker对于sendback消息的处理


topic的变更

那么消费者对于这几种消息是如何进行消费的。
自定义的topic就不说了。关键是对于重试主题,以及死信主题的消费。延时调度的消费为broker自身内部,而重试与死信主题都是被消费者(客户端消费)。
所以只需要关注消费者端对于死信队列或者说重试队列的消费。

重试队列的消费

消费者对于重试主题的订阅

一般我们使用消费者的时候,首先都需要进行自定义主题的订阅
demo如下

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("topic2", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

代码中 consumer.subscribe("topic2", "*")的实现如下,说白了就是往负载服务的订阅表去加数据

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    public void subscribe(String topic, String subExpression) throws MQClientException {
        try {
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                topic, subExpression);
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            if (this.mQClientFactory != null) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }
}

最后消费者进行启动,代码如下

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;
                //检查配置项
                this.checkConfig();
                消费者启动的时候,会调用这个方法,这个方法涉及到重试主题的订阅
                this.copySubscription();
                省略部分代码
    }
}

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    private void copySubscription() throws MQClientException {
        try {
            Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
            if (sub != null) {
                for (final Map.Entry<String, String> entry : sub.entrySet()) {
                    final String topic = entry.getKey();
                    final String subString = entry.getValue();
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                        topic, subString);
                    this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                }
            }

            if (null == this.messageListenerInner) {
                this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
            }

            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    广播模式下,啥也不做
                    break;
                case CLUSTERING:
                    集群模式下,订阅重试主题
                    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                        retryTopic, SubscriptionData.SUB_ALL);
                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                    break;
                default:
                    break;
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }
}

从源码来看
在集群模式下,对于重试主题的订阅是默认的。
而广播模式下,是没有重试主题的。
如果并发消费&广播 消费失败只会打印日志
如果并发&集群 消费失败会优先sendback,sendback失败则会内部重试
如果顺序消费 消费失败会优先内部重试,超过或者到达最大重试次数,进行sendback,若sendback失败,过一会再进行重试消费

模拟重试队列的消费

首先验证并发消费下的广播模式

生产者的代码如下

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producor_group");
        producer.setInstanceName("127.0.0.1:9876");
        producer.start();

        for (int i = 0; i < 1; i++) {
            try {
                Message msg = new Message("topic1",
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

消费者的代码如下

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CONSUMER_GROUP");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("topic1", "*");
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                由于此处是模拟重试,所以直接返回消费失败
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

运行namesrv以及broker
运行结果如下


namesrv运行
broker运行

启动生产者以及消费者
生产者运行结果


生产者运行结果
消费者运行结果
消费者运行结果
从运行结果来看,广播模式下,并发消费者并没有进行重试。
结合一下源码,并发&广播 对于消息是不会重试的,代码位于org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.processConsumeResult(ConsumeConcurrentlyStatus, ConsumeConcurrentlyContext, ConsumeRequest)
            case BROADCASTING:
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                }
                break;

验证并发消费下的集群模式

生产者代码如下

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("my-pro-group");
        producer.setInstanceName("127.0.0.1:9876");
        producer.start();

        for (int i = 0; i < 1; i++) {
            try {
                Message msg = new Message("my-topic-1",
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

消费者代码如下,改为集群模式

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consume-group");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("my-topic1", "*");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        重试次数改1次
        consumer.setMaxReconsumeTimes(1);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

生产者发送消息的结果如下


生产者发送结果

消费者进行消费

15:32:13.221 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=1, storeSize=202, queueOffset=0, sysFlag=0, bornTimestamp=1627544011030, bornHost=/172.17.6.76:40906, storeTimestamp=1627544011033, storeHost=/172.17.6.76:10911, msgId=AC11064C00002A9F00000000007D046D, commitLogOffset=8193133, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='my-topic-1', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1627544011044, UNIQ_KEY=7F000001C44073D16E939388B9150000, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]] 
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=344, queueOffset=21, sysFlag=0, bornTimestamp=1627544011030, bornHost=/172.17.6.76:40906, storeTimestamp=1627544021272, storeHost=/172.17.6.76:10911, msgId=AC11064C00002A9F00000000007D07DE, commitLogOffset=8194014, bodyCRC=613185359, reconsumeTimes=1, preparedTransactionOffset=0, toString()=Message{topic='my-topic-1', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=%RETRY%my-consume-group, ORIGIN_MESSAGE_ID=AC11064C00002A9F00000000007D046D, RETRY_TOPIC=my-topic-1, MAX_OFFSET=22, CONSUME_START_TIME=1627544021274, UNIQ_KEY=7F000001C44073D16E939388B9150000, CLUSTER=DefaultCluster, WAIT=false, DELAY=3, TAGS=TagA, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]] 

由于默认的重试次数为16,另外重试次数会随着重试次数越多,后面延时调度就会越久去转发消息,所以直接改为1次。从结果上来看是符合预期的。第一次+重复消费的1次=2次
我们此前生成的那条消息,已经进入死信队列了。因为达到最大的重试次数,那么如何消费死信队列的消息?

验证的结论

如果并发消费&广播 消费失败只会打印日志
如果并发&集群 消费失败会优先sendback,sendback失败则会内部重试

那么如何消费死信队列的消息?

上面的步骤,由于设置了最大重试次数为1,然后消费了2次还是失败。消息最终进入死信队列。
那么死信队列该如何消费呢?
先将消费者的代码topic改为死信队列,然后进行debug

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consume-group");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("%DLQ%my-consume-group", "*");
        //consumer.subscribe("my-topic1", "*");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setMaxReconsumeTimes(1);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

消费者必然会通过topic从注册中心中拉取路由数据。debug图如下。topic为%DLQ%my-consume-group


image.png

从上图是可以看出,对于死信队列其实是只有写入权限的。并没有读取权限。
读取到路由数据之后,将路由数据转化成订阅数据


路由数据转化成订阅数据
方法位置如下org.apache.rocketmq.client.impl.factory.MQClientInstance.topicRouteData2TopicSubscribeInfo(String, TopicRouteData)
队列权限
public class PermName {
    public static final int PERM_PRIORITY = 0x1 << 3;
    4为可读
    public static final int PERM_READ = 0x1 << 2;
    2位可写
    public static final int PERM_WRITE = 0x1 << 1;
    public static final int PERM_INHERIT = 0x1 << 0;

    public static String perm2String(final int perm) {
        final StringBuffer sb = new StringBuffer("---");
        if (isReadable(perm)) {
            sb.replace(0, 1, "R");
        }

        if (isWriteable(perm)) {
            sb.replace(1, 2, "W");
        }

        if (isInherited(perm)) {
            sb.replace(2, 3, "X");
        }

        return sb.toString();
    }

    public static boolean isReadable(final int perm) {
        return (perm & PERM_READ) == PERM_READ;
    }

    public static boolean isWriteable(final int perm) {
        return (perm & PERM_WRITE) == PERM_WRITE;
    }

    public static boolean isInherited(final int perm) {
        return (perm & PERM_INHERIT) == PERM_INHERIT;
    }
}


public class MQClientInstance {
    public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
        Set<MessageQueue> mqList = new HashSet<MessageQueue>();
        List<QueueData> qds = route.getQueueDatas();
        for (QueueData qd : qds) {
            死信队列的权限为可写2  而读权限为4   2&4=0 
            if (PermName.isReadable(qd.getPerm())) {
                for (int i = 0; i < qd.getReadQueueNums(); i++) {
                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                    mqList.add(mq);
                }
            }
        }

        return mqList;
    }
}

由于死信队列的权限为可写,但是消费者订阅信息所需要的权限为可读,所以消费者其实是可以获取到死信队列的路由数据的,然而并没有权限,所以该topic对应的订阅数据最后为空。
而在负载服务进行队列分配的时候,获取不到人任何的队列,所以无法进行消费。代码如下

public abstract class RebalanceImpl {
    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, mqSet);
                        log.info("messageQueueChanged {} {} {} {}",
                            consumerGroup,
                            topic,
                            mqSet,
                            mqSet);
                    }
                } else {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
                break;
            }
            case CLUSTERING: {
                根据主题获取到的队列为空,所以无法进行队列分配,自然无法进行消费了。
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                }

                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                }

                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }

}

最后如果真的要对死信队列进行消费的话,那么只能对应去修改队列的权限,改为可读。rocketmq这块是提供命令去操作的,需要人为干预。

集群模式下并发消费需要注意的点

避免同个group的消费者订阅的topic不一样

否则会导致消费者可能会消费到非订阅主题的消息!
先来做一个demo
生产者代码如下

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("my-pro-group");
        producer.setInstanceName("127.0.0.1:9876");
        producer.start();

        for (int i = 0; i < 1; i++) {
            try {
                Message msg = new Message("my-topicA",
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

消费者代码如下

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consume-group");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("my-topicA", "*");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setMaxReconsumeTimes(16);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

先后启动生产者与消费者,运行结果如下
由于消费者在集群模式下,且为并发消费,所以会对消息进行重试消费。但是消费者还是消费失败,这里消费者设置最大消费次数为16才失败,所以在该消息没有进入死信队列的时候,将消费者暂停。


生产者运行结果
消费者运行结果

此时,消息还没有进入死信队列,将消费者暂停,另外起一个消费者。
代码如下,group是一致的,但是topic不一致。此前并没有往my-topicB中发送消息。所以按理说消费者应该是获取不到任何消息的。然而结果却出乎意料。

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consume-group");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("my-topicB", "*");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setMaxReconsumeTimes(16);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

运行结果如下,由于每重试消费一次,下次消费就更久,所以这里只截出一部分。

18:55:19.780 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=343, queueOffset=30, sysFlag=0, bornTimestamp=1627555843019, bornHost=/172.17.6.76:45135, storeTimestamp=1627555969395, storeHost=/172.17.6.76:10911, msgId=AC11064C00002A9F00000000007D4486, commitLogOffset=8209542, bodyCRC=613185359, reconsumeTimes=3, preparedTransactionOffset=0, toString()=Message{topic='my-topicA', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=%RETRY%my-consume-group, ORIGIN_MESSAGE_ID=AC11064C00002A9F00000000007D3D16, RETRY_TOPIC=my-topicA, MAX_OFFSET=31, CONSUME_START_TIME=1627556120586, UNIQ_KEY=7F0000015EA073D16E93943D43CB0000, CLUSTER=DefaultCluster, WAIT=false, DELAY=5, TAGS=TagA, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]] 
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=343, queueOffset=31, sysFlag=0, bornTimestamp=1627555843019, bornHost=/172.17.6.76:45135, storeTimestamp=1627556240805, storeHost=/172.17.6.76:10911, msgId=AC11064C00002A9F00000000007D4730, commitLogOffset=8210224, bodyCRC=613185359, reconsumeTimes=4, preparedTransactionOffset=0, toString()=Message{topic='my-topicA', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=%RETRY%my-consume-group, ORIGIN_MESSAGE_ID=AC11064C00002A9F00000000007D3D16, RETRY_TOPIC=my-topicA, MAX_OFFSET=32, CONSUME_START_TIME=1627556240807, UNIQ_KEY=7F0000015EA073D16E93943D43CB0000, CLUSTER=DefaultCluster, WAIT=false, DELAY=6, TAGS=TagA, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]] 
ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=343, queueOffset=32, sysFlag=0, bornTimestamp=1627555843019, bornHost=/172.17.6.76:45135, storeTimestamp=1627556420808, storeHost=/172.17.6.76:10911, msgId=AC11064C00002A9F00000000007D49DA, commitLogOffset=8210906, bodyCRC=613185359, reconsumeTimes=5, preparedTransactionOffset=0, toString()=Message{topic='my-topicA', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=%RETRY%my-consume-group, ORIGIN_MESSAGE_ID=AC11064C00002A9F00000000007D3D16, RETRY_TOPIC=my-topicA, MAX_OFFSET=33, CONSUME_START_TIME=1627556420814, UNIQ_KEY=7F0000015EA073D16E93943D43CB0000, CLUSTER=DefaultCluster, WAIT=false, DELAY=7, TAGS=TagA, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]] 

这里有一个很神奇的地方,第一个消费者A订阅的主题为my-topicA,随后消费失败,消息经过延时调度机制最后放入重试主题的队列。然后在消息没有进入死信队列前,在消费者暂停。启动第二个消费者,也是一个group,但是订阅的主题为my-topicB, 结果居然拉取到my-topicA主题的重试消息。
这种情况下,同一个group的消费者是有可能消费到非自身订阅主题的消息的。
其实说白了就是,同一个消费组中的消费者,是公用同一个重试topic的,所以队列必然也是公用的,必然会使得不同的topic的重试消息会放在同一个队列中的情况。
所以为了避免整个问题,同一个group中的消费者必须完全一致,包括订阅的主题等,否则会可能出现消费其他topic的消息的情况。

相关文章

  • (十二)消费者如何进行消息的重试消费

    在前面说到,topic的种类最后有几种。1.自定义的topic2.重试的topic3.延时调度的topic4.死信...

  • RocketMQ之消息重试与消息重投

    2020-02-25 消息重试 Consumer消费消息失败后,令消息再消费一次。当消费者主动返回重试码RECON...

  • RocketMQ 与 Spring Boot整合(四、消费重试)

    RocketMQ 提供消费重试的机制。在消息消费失败的时候,RocketMQ 会通过消费重试机制,重新投递该消息给...

  • RocketMQ 与 Spring Cloud Stream整合

    RocketMQ 提供消费重试的机制。在消息消费失败的时候,RocketMQ 会通过消费重试机制,重新投递该消息给...

  • RocketMQ队列分配和重平衡

    接着上节来说,Broker端如何进行分配消息的?同时当新增或者删除消费者时,如果进行重平衡,被其他消费者分配后...

  • RabbitMQ消费者注意点

    消息分发     消费者客户端可以通过推模式和拉模式来进行消息消费。    当rabbitmq队列有多个消费者时,...

  • RabbitMQ 死信队列

    死信队列 "死信"模式 指的是,当消费者不能处理接收到的消息时,将这个消息重新发布到另外一个队列中,等待重试或者人...

  • 11.生产者确认

    在上一节中,即使对交换器、队列以及消息进行了持久化,但是不将消息进行配置,当消费者从队列中获得消息以后,消费者出现...

  • RabbitMQ面试题:如何确保消息接收方消费了消息? ---

    消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ ...

  • kafka低进阶

    1、kafka工作流程 Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topi...

网友评论

      本文标题:(十二)消费者如何进行消息的重试消费

      本文链接:https://www.haomeiwen.com/subject/cxdgmltx.html