rocketMq具有消息重试的机制,重试也分为两种重试:producer重试和consumer重试。
producer重试
如果由于网络抖动等原因,Producer程序向Broker发送消息时没有成功,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试。
可以通过以下方式进行配置:
1)使用starter的同学可以直接在yml文件中进行配置,主要属性有超时时间,重试次数,另外提供一个在其他broker节点重试的机制,如下所示:
rocketmq:
name-server: http://101.200.36.168:9876
producer:
#指定消息发送者的组,在控制台查询时会用到
group: test
#发送失败超时时间
send-message-timeout: 3000
#重试次数
retry-times-when-send-failed: 3
#在其他broker服务端进行重试默认false,开启设置为on
retry-next-server: false
2)在java代码中可以对producer进行手动设置:
代码设置3)自己初始化Producer的同学可以入第二步一样在初始化时设置好。
consumer重试(两种:监听、自定义消费者)
两种方式其实监听是对自定义的一个封装,只不过自定义可能更灵活一些,使用监听的形式我还没找到在哪里设置重试次数。
下面先看看监听的形式:
在消费者监听器,自定义抛出异常,会发生重试:
/**
* RocketMqProducer
* @date: 2020/11/26
* @author weirx
* @version 3.0
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "test_reconsume", selectorExpression = "*", consumerGroup = "test_reconsume")
public class RetryConsumerListener implements RocketMQListener<MessageExt> {
@SneakyThrows
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("receive sync message:{}", msg);
throw new Exception("开始重试");
}
}
重试时的消息属性如下图:
重试消息属性上图中有一个delay属性,其实是延时等级,还记得我们前面学习的延时消息吗?在borker.conf中配置了了DelayDevel等级(延时等级),重试机制也是按照这个等级来的,默认情况下总共会重试16次,这个等级逐渐加1。下一次的重试属性如下图:
重试属性下面举例一个自定义消费者代码:
package com.cloud.bssp.message.rocketmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 重试consumer
*
* @date: 2020/11/30
* @author weirx
* @version 3.0
*/
@Component
@Slf4j
public class RetryConsumer {
public static void main(String[] args) throws MQClientException {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
//设置NameServer地址,替换成自己的ip地址
consumer.setNamesrvAddr("ip:9876");
//设置实例名称
consumer.setInstanceName("consumer");
//订阅topic
consumer.subscribe("test_reconsume_1", "");
//监听消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//获取消息
for (MessageExt messageExt : list) {
log.info("消息重试:{} ", messageExt.getMsgId() + "---" + new String(messageExt.getBody()));
}
try {
//模拟错误
int i = 5 / 0;
} catch (Exception e) {
e.printStackTrace();
//需要重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
//不需要重试
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
System.out.println("Consumer Started!");
}
}
两种方式默认情况下都是重试16次,使用延时等级配置的时间。
自定义消费者可以使用如下的方式进行配置最大重试次数:
//设置重试次数为2
consumer.setMaxReconsumeTimes(2);
网友评论