1、什么是消息确认ACK
如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ支持消息确定-ACK。
2、ACK的消息确认机制
ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。
-
如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中。
-
如果在集群的情况下,RabbitMQ会立即将这个消息推送给这个在线的其他消费者。这种机制保证了在消费者服务端故障的时候,不丢失任何消息和任务。
-
消息永远不会从RabbitMQ中删除,只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
-
消息的ACK确认机制默认是打开的。
3、ACK机制的开发注意事项
如果忘记了ACK,那么后果很严重。当Consumer退出时候,Message会一直重新分发。然后RabbitMQ会占用越来越多的内容,由于RabbitMQ会长时间运行,因此这个"内存泄漏"是致命的。
4、结合项目实例进行,理解一下ACK机制。之前写过RabbitMQ的交换器Exchange之direct(发布与订阅 完全匹配),这里借助这个进行消息持久化测试。生产者的代码不发生改变。控制层的触发生产者生产消息,这里只生产一条消息,方便观察现象。
消息端产生一个npe问题,会导致ack状态处于Unacked
Listener method 'public void (org.springframework.amqp.core.Message)' threw exception\n\tat org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:140)\n\tat org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:106)\n\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:856)\n\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:779)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:105)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:208)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1381)\n\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:760)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1324)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1294)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1800(SimpleMessageListenerContainer.java:105)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1550)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.NullPointerException: null
image.png
如何解决问题呢,如果消息发送的时候,程序出现异常,后果很严重的,会导致内存泄漏的。
- 在程序处理中可以进行异常捕获,保证消费者的程序正常执行,这里不进行介绍了。
- 可以使用RabbitMQ的ack确认机制。开启重试,然后重试次数,默认为3次。这里设置为5次。
# 开启重试 spring.rabbitmq.listener.simple.retry.enabled=true # 重试次数,默认为3次 spring.rabbitmq.listener.simple.retry.max-attempts=5
网友评论