前言
一般来说RabbitMQ
有个方法channel.basicNack()
能够让消息回到队列中,这样可以实现重试。但是这样没有明确重试次数,如果当前的消息一直重试的话,则后面的消息就会堆积起来,导致后面的消息无法消费。这是一个致命的缺点。因此这就需要设置重试次数来解决这种问题。下面提供几种解决方案。
- 使用
redis
或者mongo
等第三方存储当前重试次数。 - 在header中添加重试次数,并且使用
channel.basicPublish()
方法重新将消息发送出去后将重试次数加1。 - 使用
spring-rabbit
中自带的retry
功能。
重试
这一节介绍使用spring-rabbit
自带的retry
功能。
关键类介绍
BackOffPolicy
:重试的回退策略,指以何种方式进行下一次重试(第一次重试后什么时候进行第二次重试),比如过了15秒后重试,随机时间重试。
RetryPolicy
:重试策略或条件,可以指定超时重试,一直重试,简单重试等。
MessageRecoverer
:消息回收类,当所有的重试次数都失败后,就会调用该类的recover
方法
RetryTemplate
:组合了BackOffPolicy
,RetryPolicy
,RetryListener
,执行重试步骤的具体类。
RetryOperationsInterceptor
:方法执行失败的拦截器类,拦截失败后交给RertyTemplate
去执行重试。
SimpleMessageListenerContainer
:用于管理消费者。
RetryListener
:重试过程的监听器,第一次重试调用该类的open
,每次重试不成功调用onError
,最后一次重试调用close
。
第一步
设置消息确认模式为auto
,SpringBoot
环境添加以下配置。
spring.rabbitmq.listener.simple.acknowledge-mode=auto
第二步配置重试
private RetryOperationsInterceptor createRetry() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
// 第一次重试调用
return false;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
// 最后一次重试会调用
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
// 每次重试失败后都会调用
}
});
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(5));
retryTemplate.setBackOffPolicy(new NoBackOffPolicy());
return RetryInterceptorBuilder.stateless()
.retryOperations(retryTemplate).recoverer(new DefaultMessageRecoverer()).build();
}
第三步
配置SimpleMessageListenerContainer
,并抛出异常重试。使用simpleRabbitListenerContainerFactory
创建SimpleMessageListenerContainer
有个好处就是在application.properties
中对SimpleMessageListenerContainer
的配置可以生效。比如配置spring.rabbitmq.listener.simple.acknowledge-mode=auto
就可以直接生效。
@Bean
public SimpleMessageListenerContainer config(SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory) {
SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setMessageListener(new MessageListenerAdapter(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
// 处理业务逻辑
}catch (Throwable r) {
// 失败抛出异常重试
throw r;
}
}
}));
endpoint.setId(String.valueOf(UUID.randomUUID()));
SimpleMessageListenerContainer container = simpleRabbitListenerContainerFactory.createListenerContainer(endpoint);
// 配置队列信息
container.setQueueNames("队列1","队列2");
// 配置重试
container.setAdviceChain(createRetry());
return container;
}
网友评论