一、RabbitMQ简介
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
图1-1 RabbitMQ主要组件.png 图1-2 RabbitMQ主要组件.png
相关组件介绍请参考:
Exchange分发消息时,共四种类型:direct、fanout、topic、headers,相关区别请参考:
二、Docker安装RabbitMQ
本文使用阿里云的主机进行演示,使用docker安装RabbitMQ。(安全组开放5672和15672端口)
图2-1 安全组端口配置.png下载RabbitMQ镜像
docker pull rabbitmq:3-management
启动容器,设置用户名、密码(yourusername /yourpassword),不设置默认为guest
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=yourusername -e RABBITMQ_DEFAULT_PASS=yourpassword -p 15672:15672 -p 5672:5672 rabbitmq:3-management
进入容器
docker exec -it rabbit bash
开启插件
rabbitmq-plugins enable rabbitmq_management
至此就能在本地通过IP+端口号访问RabbitMQ。
三、Spring Boot整合RabbitMQ
3.1 注入Queue、Exchange、Binding
手动注入队列、交换器和绑定组件。
@Bean
public Queue queue1() {
return new Queue("queue1");
}
@Bean
public Queue queue2() {
return new Queue("queue2");
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange("directExchange");
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
//queue1和queue2绑定fanoutExchange
@Bean
public Binding bindingExchangeAndQueue1(Queue queue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
@Bean
public Binding bindingExchangeAndQueue2(Queue queue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
//queue1和queue2绑定topicExchange
@Bean
public Binding bindingExchangeAndQueue3(@Qualifier("queue1") Queue queue1, TopicExchange topicExchange) {
return BindingBuilder.bind(queue1).to(topicExchange).with("topic.match");
}
//#匹配0个或多个 *匹配一个
@Bean
public Binding bindingExchangeAndQueue4(@Qualifier("queue2") Queue queue2, TopicExchange topicExchange) {
return BindingBuilder.bind(queue2).to(topicExchange).with("topic.#");
}
自定义序列化:
//自定义序列化
@Bean
public MessageConverter getMessageConverter() {
return new Jackson2JsonMessageConverter();
}
发送与接收消息:发送消息调用RabbitTemplate的消息发送方法,结合@RabbitListener注解接收消息。
3.2 AmqpAdmin使用
使用AmqpAdmin的declare相关方法可以创建队列、交换器和绑定组件,其他操作请查询对应的API。
@Resource
private AmqpAdmin amqpAdmin;
amqpAdmin.declareQueue(new Queue(queueName));
amqpAdmin.declareExchange(exchange);
amqpAdmin.declareBinding(new Binding(queueName,Binding.DestinationType.QUEUE, fanoutExchangeName, null, null));
四、消息确认机制(ACK)
yaml添加失败返回、发送确认等配置:
spring:
rabbitmq:
publisher-returns: true #失败返回
publisher-confirms: true #发送确认
listener:
simple:
acknowledge-mode: manual #手动应答
retry:
enabled: true #重试
max-attempts: 3 #重发次数--无效
direct:
acknowledge-mode: manual
注入RabbitTemplate,配置ReturnCallback和ConfirmCallback方法。
@Bean
public RabbitTemplate getRabbitTemplate(ConnectionFactory connectionFactor) {
Logger log = LoggerFactory.getLogger(RabbitTemplate.class);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactor);
// 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
rabbitTemplate.setMandatory(true);
// 消息返回, yml需要配置 publisher-returns: true
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
String correlationId = message.getMessageProperties().getCorrelationId();
log.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
});
// 消息确认, yml需要配置 publisher-confirms: true
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息发送到exchange成功");
} else {
log.info("消息发送到exchange失败,原因: {}", cause);
}
});
return rabbitTemplate;
}
配置消息接受者,调用basicAck方法手动确认,如果发生异常根据情况判断重发还是丢弃消息。
@RabbitListener(queues = "queue1")
public void getMsg(String msg,Message message, Channel channel) throws IOException {
try {
int a = 1 / 0;
//false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
//ack返回false,并重新回到队列--注意次数限制 -- 最后一个参数为false则摒弃
logger.info("消息接收失败,重发!");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//拒绝消息
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
springboot整合rabbitmq实现消息确认机制
SpringBoot集成RabbitMQ消息队列搭建与ACK消息确认入门
rabbitmq监控之消息确认ack
五、延迟队列 & 死信队列
使用延迟队列和死信队列完成消息延迟发送和消息重复发送需求。
5.1 消息延迟发送
消息的TTL:消息的存活时间(Time To Live),当消息在队列中存在的时间超过这个设定值之后,系统会认为这个消息死了,故称为"死信"。
死信Exchange:死信最后被转发到的路由地址。(和普通Exchange相同)
配置延迟交换器、延迟队列、延时绑定。核心参数如下:
x-dead-letter-exchange //死信交换器
x-dead-letter-routing-key //死信路由键值
x-message-ttl //过期时间
//延迟队列
@Bean
public Queue delayQueue() {
return QueueBuilder.durable(IRabbitMQConst.DELAY_QUEUE)
// DLX,dead letter发送到的exchange ,设置死信队列交换器到处理交换器
.withArgument(IRabbitMQConst.DEAD_LETTER_EXCHANGE_ARGUMENT, IRabbitMQConst.DEAD_LETTER_EXCHANGE)
// dead letter携带的routing key,配置处理队列的路由key
.withArgument(IRabbitMQConst.DEAD_LETTER_ROUTEKEY_ARGUMENT,IRabbitMQConst.DEAD_LETTER_ROUTEKEY)
// 设置过期时间
.withArgument(IRabbitMQConst.MESSAGE_TTL_ARGUMENT, IRabbitMQConst.EXPIRE_TIME)
.build();
}
//延迟交换器-主题
@Bean
public TopicExchange delayExchange() {
return new TopicExchange(IRabbitMQConst.DELAY_EXCHANGE);
}
//延迟绑定
@Bean
public Binding delayBinding(Queue delayQueue, TopicExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with(IRabbitMQConst.DELAY_ROUTEKEY);
}
配置死信交换器、死信队列、死信绑定。
//死信队列
@Bean
public Queue deadLetterQueue(){
return QueueBuilder.durable(IRabbitMQConst.DEAD_LETTER_QUEUE).build();
}
//死信交换器-主题
@Bean
public TopicExchange deadLetterExchange(){
return new TopicExchange(IRabbitMQConst.DEAD_LETTER_EXCHANGE);
}
//死信绑定
@Bean
public Binding deadLetterBinding(Queue deadLetterQueue, TopicExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(IRabbitMQConst.DEAD_LETTER_ROUTEKEY);
}
延迟发送消息测试,消息和队列都可以设置过期时间,取两者最小值:
@GetMapping(value = "delay")
public void delayTest() {
System.out.println("消息已发出:" + LocalDateTime.now().toString());
rabbitTemplate.convertAndSend(IRabbitMQConst.DELAY_EXCHANGE, IRabbitMQConst.DELAY_ROUTEKEY, "hello world!", message -> {
// 设置延迟毫秒值 -- 已消息和队列设置的最小值为准
message.getMessageProperties().setExpiration(String.valueOf(20000L));
return message;
});
}
由于队列的先进先出特性,只有当过期的消息到了队列的顶端(队首),才会被真正的丢弃或者进入死信队列。所以当我们设置了一个较短的消息超时时间,但是因为它之前有队列尚未完结,故此消息不会进入死信。
5.2 消息重发
消息重发流程如下所示,包括工作队列、重发队列和失败队列。在重发队列中设置延迟发送时间,并将其死信队列和私信交换器绑定为工作队列和工作交换器,当重试次数超过设置的次数,将消息发送到失败队列等待特定消费者处理或者人工处理。
图5-2 消息重发流程图.png按照图5-2的流程图分别设置工作队列、重发队列和失败队列以及其关联的交换器和绑定组件。
工作队列、工作交换器以及工作绑定组件如下所示:
//工作队列
@Bean
public Queue workQueue() {
return new Queue("workQueue");
}
//工作交换器
@Bean
public TopicExchange workExchange() {
return new TopicExchange("workExchange");
}
//工作绑定
@Bean
public Binding topicQueueBinding(Queue workQueue, TopicExchange workExchange) {
return BindingBuilder.bind(workQueue).to(workExchange).with("work.topic.*");
}
重发队列、重发交换器以及重发绑定组件如下所示:
//重发队列
@Bean
public Queue retryQueue() {
return QueueBuilder.durable("retryQueue")
// DLX,dead letter发送到的exchange ,设置死信队列交换器到处理交换器
.withArgument(IRabbitMQConst.DEAD_LETTER_EXCHANGE_ARGUMENT, "workExchange")
// dead letter携带的routing key,配置处理队列的路由key
.withArgument(IRabbitMQConst.DEAD_LETTER_ROUTEKEY_ARGUMENT,"work.topic.retry")
// 设置过期时间
.withArgument(IRabbitMQConst.MESSAGE_TTL_ARGUMENT, 3000L)
.build();
}
//重发交换器
@Bean
public TopicExchange retryExchange() {
return ExchangeBuilder.topicExchange("retryExchange").durable(true).build();
}
//重发绑定
@Bean
public Binding retryDirectBinding(@Qualifier("retryQueue") Queue retryQueue,
@Qualifier("retryExchange") TopicExchange retryExchange) {
return BindingBuilder.bind(retryQueue).to(retryExchange).with("work.retry.*");
}
失败队列、失败交换器以及失败绑定组件如下所示:
//失败队列
@Bean
public Queue failQueue() {
return QueueBuilder.durable("failQueue").build();
}
//失败交换器
@Bean
public TopicExchange failExchange() {
return ExchangeBuilder.topicExchange("failExchange").durable(true).build();
}
//失败绑定
@Bean
public Binding failDirectBinding(@Qualifier("failQueue") Queue failQueue,@Qualifier("failExchange") TopicExchange failExchange) {
return BindingBuilder.bind(failQueue).to(failExchange).with("work.fail.*");
}
消息接收端接收消息,发送异常时将消息推送到重发队列,重发超过3次将消息推送到失败队列。
@RabbitListener(queues = "workQueue")
public void getMsg3(Message message, Channel channel) throws IOException {
System.out.println("消息已收到:"+LocalDateTime.now().toString());
logger.info("收到消息");
try {
//模拟异常
int a = 1/0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
long retryCount = getRetryCount(message.getMessageProperties());
if(retryCount >= 3){
System.out.println("重发超过三次,发送到失败队列!");
rabbitTemplate.convertAndSend("failExchange","work.fail.queue",message);
}else{
System.out.println("第" + (retryCount+1) + "次重发!");
rabbitTemplate.convertAndSend("retryExchange","work.retry.queue",message);
}
}finally {
//避免死循环
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
消息重发测试:
@GetMapping(value = "retry")
public void retryTest() {
rabbitTemplate.convertAndSend("workExchange", "work.topic.queue", "hello world!");
}
Spring Boot 整合——RabbitMQ配置延时队列和消息重试
RabbitMQ实现消息的消费确认,死信队列、延迟重试队列
六、消息幂等性
幂等性:任意多次执行所产生的影响均与一次执行的影响相同。
消息幂等性实现思路如下所示,客户端实现思路类似,不再赘述,具体实现细节请查看源码。
图6-1 消息幂等性实现思路.png
配置MQ时如果配置了json解析器,则程序会走自动确认消费,配置文件的配置不生效。手动添加RabbitListenerContainerFactory的配置,设置手动确认。
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
RabbitMQ消息幂等性问题
RabbitMQ消息可靠性投递与消费,消费幂等
springBoot-rabbit MQ-设置手动确认ACK-Channel shutdown异常
七、综合测试
使用PostMan进行测试:
图7-1 请求测试.png测试结果如下所示:
图7-2 测试结果.png
网友评论