美文网首页编程语言爱好者java学习快到碗里来Java 进阶
RocketMQ(九)高级特性-消息重试机制

RocketMQ(九)高级特性-消息重试机制

作者: 我犟不过你 | 来源:发表于2020-11-30 16:00 被阅读0次

    rocketMq具有消息重试的机制,重试也分为两种重试:producer重试consumer重试

    producer重试

    如果由于网络抖动等原因,Producer程序向Broker发送消息时没有成功,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试。

    可以通过以下方式进行配置:
    1)使用starter的同学可以直接在yml文件中进行配置,主要属性有超时时间重试次数,另外提供一个在其他broker节点重试的机制,如下所示:

    rocketmq:
      name-server: http://101.200.36.168:9876
      producer:
        #指定消息发送者的组,在控制台查询时会用到
        group: test
        #发送失败超时时间
        send-message-timeout: 3000
        #重试次数
        retry-times-when-send-failed: 3
        #在其他broker服务端进行重试默认false,开启设置为on
        retry-next-server: false
    

    2)在java代码中可以对producer进行手动设置:

    代码设置

    3)自己初始化Producer的同学可以入第二步一样在初始化时设置好。

    consumer重试(两种:监听、自定义消费者)

    两种方式其实监听是对自定义的一个封装,只不过自定义可能更灵活一些,使用监听的形式我还没找到在哪里设置重试次数。

    下面先看看监听的形式:
    在消费者监听器,自定义抛出异常,会发生重试:

    /**
     * RocketMqProducer
     * @date: 2020/11/26
     * @author weirx
     * @version 3.0
     */
    @Slf4j
    @Component
    @RocketMQMessageListener(topic = "test_reconsume", selectorExpression = "*", consumerGroup = "test_reconsume")
    public class RetryConsumerListener implements RocketMQListener<MessageExt> {
    
        @SneakyThrows
        @Override
        public void onMessage(MessageExt messageExt) {
            byte[] body = messageExt.getBody();
            String msg = new String(body);
            log.info("receive sync message:{}", msg);
            throw new Exception("开始重试");
        }
    }
    

    重试时的消息属性如下图:

    重试消息属性

    上图中有一个delay属性,其实是延时等级,还记得我们前面学习的延时消息吗?在borker.conf中配置了了DelayDevel等级(延时等级),重试机制也是按照这个等级来的,默认情况下总共会重试16次,这个等级逐渐加1。下一次的重试属性如下图:

    重试属性

    下面举例一个自定义消费者代码:

    package com.cloud.bssp.message.rocketmq.consumer;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.springframework.stereotype.Component;
    
    import java.util.List;
    
    /**
     * 重试consumer
     *
     * @date: 2020/11/30
     * @author weirx
     * @version 3.0
     */
    @Component
    @Slf4j
    public class RetryConsumer {
        public static void main(String[] args) throws MQClientException {
            //创建消费者
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
            //设置NameServer地址,替换成自己的ip地址
            consumer.setNamesrvAddr("ip:9876");
            //设置实例名称
            consumer.setInstanceName("consumer");
            //订阅topic
            consumer.subscribe("test_reconsume_1", "");
            //监听消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    //获取消息
                    for (MessageExt messageExt : list) {
                        log.info("消息重试:{} ", messageExt.getMsgId() + "---" + new String(messageExt.getBody()));
                    }
                    try {
                        //模拟错误
                        int i = 5 / 0;
                    } catch (Exception e) {
                        e.printStackTrace();
                        //需要重试
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    //不需要重试
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            //启动消费者
            consumer.start();
            System.out.println("Consumer Started!");
        }
    }
    

    两种方式默认情况下都是重试16次,使用延时等级配置的时间。
    自定义消费者可以使用如下的方式进行配置最大重试次数:

     //设置重试次数为2
     consumer.setMaxReconsumeTimes(2);
    

    相关文章

      网友评论

        本文标题:RocketMQ(九)高级特性-消息重试机制

        本文链接:https://www.haomeiwen.com/subject/vjmrwktx.html