springBoot-rabbit MQ-设置手动确认ACK-Channel shutdown异常
简介
MQ全称(Message Queue)又名消息队列,是一种异步通讯的中间件。是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。常见的MQ有kafka、activemq、zeromq、rabbitmq 等等
环境信息
jdk1.8.0_45
Spring Boot 2.0.1.RELEASE
Maven 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
连接配置
application.properties 文件增加以下配置
#==========================================================
# RabbitMQ 连接配置
#==========================================================
# 连接用户名
spring.rabbitmq.username=guest
# 连接密码
spring.rabbitmq.password=guest
# 服务地址
spring.rabbitmq.host=172.18.1.1
# 服务端口号
spring.rabbitmq.port=5672
# 在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。
spring.rabbitmq.virtual-host=/
# 手动ACK 不开启自动ACK模式,目的是防止报错后死循环重复消费错误消息,默认为 none
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=2
# 是否开启消费者重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 重试间隔时间(毫秒)
spring.rabbitmq.listener.simple.retry.initial-interval=3000
# 重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列)
spring.rabbitmq.listener.simple.default-requeue-rejected=false
测试代码
配置队列
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ 配置
*
* @author swordshake
* @since 1.0
*/
@Slf4j
@Configuration
public class RabbitDemoConfig {
public static final String DEMO_ROUTING_KEY = "dev.demo.register.manual.queue";
/**
* 配置RabbitMQ连接模板
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log
.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log
.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange,
routingKey, replyCode, replyText, message));
return rabbitTemplate;
}
/**
* 定义队列。
*
* @return
*/
@Bean
public Queue demoQueue() {
// 参数1:队列名称,参数2:是否持久化。
return new Queue(DEMO_ROUTING_KEY, true);
}
/**
* 一个消费者的情况下用于保证消息队列按顺序一条一条消费的容器配置。
*
* @param configurer
* @param connectionFactory
* @return
*/
@Bean
public SimpleRabbitListenerContainerFactory orderContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setPrefetchCount(1); // 用于保证一次只pop一条消息,只有当本条消息确认后才继续pop下一条。
configurer.configure(factory, connectionFactory);
return factory;
}
}
生产者
// RabbitDemoMsg .java
@Data
@AllArgsConstructor
public class RabbitDemoMsg implements Serializable {
private static final long serialVersionUID = -447646130662400154L;
private String id;
private String name;
}
// 用于生成消息的测试类
@Api(tags = "测试用的服务")
@RestController
@RequestMapping("/testservice")
public class RSTestController {
@Autowired
private RabbitTemplate rabbitTemplate;
@ApiOperation("测试RabbitMQ")
@GetMapping(value = "sendMsg")
public void sendMsg(@RequestParam String p) {
RabbitDemoMsg msg = new RabbitDemoMsg("1", "测试RabbitMQ" + p);
rabbitTemplate.convertAndSend(RabbitDemoConfig .DEMO_ROUTING_KEY, msg);
}
}
消费者
import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.hd123.saleservice.config.RabbitConfig;
import com.hd123.saleservice.config.RabbitDemoConfig;
import com.hd123.saleservice.rs.impl.rabbit.RabbitDemoMsg;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
/**
* 队列消费者。
*
* @author swordshake
* @since 1.0
*/
@Component
@Slf4j
public class RabbitDemoHandler {
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = {
RabbitConfig.DEMO_ROUTING_KEY }, containerFactory = "orderContainerFactory") // 使用自定义的容器工厂,内已配置消息一次只能消费一条
public void handler(RabbitDemoMsg demo, Message message, Channel channel) throws IOException {
log.info("[Rabbit DEMO Handler 监听的消息]-[{}]", demo.toString());
try {
Thread.currentThread().sleep(10000);
int i = 1 / 0;
// 手动确认消费消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("", e);
// ========消费失败处理方式:1、重新入栈消费 (重复消费错误数据会死循环)=======
// channel.basicRecover(false); // 重新压入MQ,参数表示是否能被其它消费者消费,效果类似第三种处理方式开启重新入栈的场景,不同的是它会触发 ListenerContainerConsumerFailedEvent
// ========消费失败处理方式:2、转到其它队列,比如延迟队列进行特殊处理;然后继续消费下一条消息。(推荐做法)=======
rabbitTemplate.convertAndSend(RabbitDemoConfig.DEMO_DEAD_LETTER_ROUTING_KEY, demo);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// ========消费失败处理方式:3、拒绝并重新入栈(重复消费错误数据会死循环)=======
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
// // 第二个参数表示是否重新入栈,为false会直接丢弃当前消息;为true时会重新放入原消息队列位置,重新消费。
// ========消费失败处理方式:4、抛异常,启用了最大重试次数后会被阻塞到unacked消息中=======
// throw new IOException(e); //根据application.properties
// 配置的最大重试次数进行重试,超过的话进入unacked状态。由于本消息未应答,因此下一条消息会被本消息阻塞,不会继续处理。会导致 Ready 消息堆积。
}
}
}
springBoot-rabbit MQ-设置手动确认ACK-Channel shutdown异常
发现MQ配置的时候,如果配置了json解析器。如下:
@Bean
public MessageConverter messageConverter() {
return new ContentTypeDelegatingMessageConverter(new Jackson2JsonMessageConverter());
}
则程序会走自动确认消费,配置文件的配置就不生效了(这个不知道为什么,也可能springboot中某些配置的先后顺序的问题)
这个问题解决办法就是重写一下这个东东,用代码设置手动确认。就OK了
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
网友评论