- Producer发送消息到Broker,Broker向Producer确认消息,这个步骤叫Publisher Confirms,详细在前面文章中有介绍——【RabbitMQ的那点事】发送方确认机制(Publisher Confirms):https://www.jianshu.com/p/15f0c1a105fb
- 消息从Broker再发送至Consumer,Consumer向Broker确认消息,这个步骤叫acknowledgements,也就是消费端的消息可靠性,即本文内容。
1. 关于消费端确认模式(acknowledgements modes),官网文章:
- https://www.rabbitmq.com/consumers.html#acknowledgement-modes
- https://www.rabbitmq.com/confirms.html#acknowledgement-modes
2. 当注册一个Consumer程序时,可以选择多种投递方式:
- 无ack模式(AcknowledgeMode.NONE),默认模式,意思是发送方不需要确认,即fire and forget(发后即忘,在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完)
- 发送方需要消费端确认消息模式,(消费端需要告知消息已经收到)。这其中分两种:
- 一种是自动确认:AcknowledgeMode.AUTO模式下,由Container自动应答,正确处理发出ack信息,处理失败发出nack信息,rabbitmq发出消息后将会等待consumer端的应答,只有收到ack确认信息才会把消息清除掉,收到nack信息的处理办法由setDefaultRequeueRejected()方法设置,所以在这种模式下,发生错误的消息是可以恢复的。
- 另一种是手动确认,AcknowledgeMode.MANUAL。基本同AUTO模式,区别是需要人为调用方法给应答。
3. 想要消息的可靠消息,需要采用手动确认模式。
以下是manual mode的代码示例,参考了文章:https://javamana.com/2021/09/20210912132719181S.html
3.1 配置:
spring:
rabbitmq:
port: 5672
host: localhost
virtual-host: spring-boot-test
listener:
simple:
acknowledge-mode: manual
如果我们使用了manual(需要手动在消费端确认消息模式),然后使用了之前的RabbitListener去接收消息:
@RabbitListener(queues = "direct.queue")
public void listen(String in) {
System.out.println("Direct Message Listener: " + in);
}
可以看到确实也能接收到消息,但看控制台,消息依旧还是处于Ready的状态,也就是消息虽然被投递过,但Broker并没有收到确认:

3.2 那么怎样进行手动确认?
还是使用@RabbitListener,但需要在参数中加上:
- message,用来查看是否是再次投递过(getRedelivered)
- channel用来回复ack
- deliveryTag:消息投递序号,每个channel对应一个(long类型),从1开始到9223372036854775807范围,在手动消息确认时可以对指定delivery_tag的消息进行ack、nack、reject等操作。例如上面Ready中有4个消息,那么deliveryTag分别是:1, 2, 3, 4。
示例做了什么事情:
a. 接收消息
b. 处理(1除以0,业务处理报错),即没来得及做act
c. 在异常处理中尝试重新投递,如果发现已经重新投递过一次(通过判断messga的redelivered flag是否为true),如果是,那么拒绝消息。
d. 在异常中尝试再次返回队列处理
以上的处理涉及到的方法:
- 告知Broker消息已收到,使用void basicAck(long deliveryTag, boolean multiple),其中deliveryTag在上述有介绍,是消息投递的序号,每个channel的唯一值。multiple如果设为true,则表示会确实该channel中的所有delivery tags。例如这个channel中有5, 6, 7, 8需要确定,如果调用basicAct(8, true),则会把5-8的delivery tag的消息都确认掉。如果multiple为false,那么例子中只会确认delivery tag = 8的消息。
- 告知Broker消息消费失败:basicNack(long deliveryTag, boolean multiple, boolean requeue),此方法有三个参数,前两个和basicAct相似,最后一个是requeue,如果为true,则会将消息放回原队列头部。如果为false,在配置了dead letter exchange(死信通道)那么则会放这里,否则会丢掉。
- 另一个方法basicReject(long deliveryTag, boolean requeue),和basicNack类似,也是告知Broker消息消费失败,只不过不能multiple确认。
@Slf4j
@Component
public class RabbitMQListener {
@RabbitListener(queues = {"direct.queue"})
public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
MessageProperties messageProperties = message.getMessageProperties();
try {
log.info("Get message: {}", message.toString());
log.info("DeliveryTag: {}", tag);
int a = 1/0; // 主动抛出错误
channel.basicAck(tag, false);
} catch (Exception e) {
log.error("met exception......{}", e.getMessage());
// 当前的消息是否重新投递的消息,也就是该消息是重新回到队列里的消息
log.info("messageProperties.getRedelivered(): {}", messageProperties.getRedelivered());
if (messageProperties.getRedelivered()) {
log.info("消息已重复处理失败,拒绝再次接收...");
// 拒绝消息
channel.basicReject(tag, false);
} else {
log.info("消息即将再次返回队列处理...");
channel.basicNack(tag, false, true);
}
}
}
}
测试结果:
2022-05-08 15:17:07.232 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : Get message: (Body:'hello, i am direct message!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=direct-routing-key, deliveryTag=1, consumerTag=amq.ctag-i3qgOdtD8H4bm7_5OUQXTQ, consumerQueue=direct.queue])
2022-05-08 15:17:07.235 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : DeliveryTag: 1
2022-05-08 15:17:07.236 ERROR 32107 --- [ntContainer#4-1] RabbitMQListener : met exception....../ by zero
2022-05-08 15:17:07.236 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : messageProperties.getRedelivered(): false
2022-05-08 15:17:07.236 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : 消息即将再次返回队列处理...
2022-05-08 15:17:07.238 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : Get message: (Body:'hello, i am direct message!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=direct.exchange, receivedRoutingKey=direct-routing-key, deliveryTag=2, consumerTag=amq.ctag-i3qgOdtD8H4bm7_5OUQXTQ, consumerQueue=direct.queue])
2022-05-08 15:17:07.238 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : DeliveryTag: 2
2022-05-08 15:17:07.238 ERROR 32107 --- [ntContainer#4-1] RabbitMQListener : met exception....../ by zero
2022-05-08 15:17:07.238 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : messageProperties.getRedelivered(): true
2022-05-08 15:17:07.238 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : 消息已重复处理失败,拒绝再次接收...
关于配置:
有些文章用的是listener.direct.acknowledge-mode: manual
参考Spring Boot 2.x的文档:https://docs.spring.io/spring-boot/docs/2.0.0.RC1/reference/html/common-application-properties.html
原因是Spring AMQP现在支持两种container type了。
关于两种type的区别,可以看网友的文章——RabbitMQ笔记(七)-SimpleMessageListenerContainer和DirectMessageListenerContainer:https://blog.csdn.net/yingziisme/article/details/86418580

4. 总结
消费方的ACK机制可以有效的解决消息从Broker到Consumer丢失的问题。但也要注意一点:消息的无限消费。
5. 额外的一些测试
5.1 上述关于deliveryTag,范围是每个channel,例如:
启动两个Consumer,都去订阅同一个queue
这时候Producer往Broker中发10个消息,那么每个Consumer都能收到5个消息,DeliveryTag都是1,2,3,4,5
首先是Consumer端:
启动两个Instance:
@Slf4j
@Component
public class MultipleRabbitMQListener {
@RabbitListener(queues = {"direct.queue"})
public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
log.info("Get message: {}", new String(message.getBody()));
log.info("DeliveryTag: {}", tag);
ThreadUtils.sleep(30000); // 模拟业务处理时间
channel.basicAck(tag, false);
}
}
其次是Producer端,在两个Consumer instance启动后再启动:
@Test
public void sendMessageToDirectExchange1() {
for (int i = 0; i < 10; i ++) {
rabbitTemplate.convertAndSend("direct.exchange", "direct-routing-key", "message - " + i);
}
}
测试结果:可以看到每个Channel的DeliveryTag都是自增的。
另外,就算业务需要处理30秒,Broker在没有收到Consumer的ack之前,不会再把消息给另一个消费者。即:保证了同一个Queue消息只会被消费一次。
Consumer-1的log:

Consumer-2的log:

参考:
spring-rabbit消费过程解析及AcknowledgeMode选择https://blog.csdn.net/weixin_38380858/article/details/84963944
网友评论