这篇文章是我根据 https://www.jianshu.com/p/dca01aad6bc8 参考学习的。里面的图片也是
image本次使用的模式是路由模式,根据指定的key来进行交换器 (X) 与 指定的 队列 (Q)进行绑定。从而进行消息消费。
MQ配置的代码如下:
1:生产者
发送方.png2: 消费者
消费者.png异常情况验证
- 1:取消手动ACK,消费者接受正常,但是消息还是处于unacked状态
// 注释掉这行,手动ack模式下, 消费端必须进行手动确认(ack)。会发现untrack存在一条数据
// channel.basicAck(tag, false);// 手动消费确认
手动ack.png
此时数据库关于这条消息的消费状态如下:
001.png
status为3 表示这个消息已经被消费了。
- 2:验证消费端幂等性
上一步, 去掉注释, 重启服务器, 由于有一条未被ack的消息, 所以重启后监听到消息, 进行消费, 但是由于消费前会判断该消息的状态是否未被消费, 发现status=3, 即已消费, 所以, 直接return, 这样就保证了消费端的幂等性。
002.png总结:消费的时候,通过数据库来保存一个消费状态,每次消费的时候,如果存在相同的id,直接return。
- 3:验证消费端发生异常消息也不会丢失
消费端代码可能发生异常, 如果不做处理, 业务没正确执行, 消息却不见了, 给我们感觉就是消息丢失了, 由于我们消费端代码做了异常捕获, 业务异常时, 会触发: channel.basicNack(tag, false, true);, 这样会告诉rabbitmq该消息消费失败, 需要重新入队, 可以重新投递到其他正常的消费端进行消费, 从而保证消息不被丢失
channel.basicNack(tag, false, true) 这个方法是 Reject one or several received messages.。
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
deliveryTag:该消息的index
multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
requeue:被拒绝的是否重新入队列
模拟消费端失败。消费端接受到生产者的通知,准备去发邮件的时候,此时,发送功能出现了异常,但是消息已经在队列中。此时,由于设置了消费端失败后,重新入队列的方法,所以会重新发送,但是一直这样接受也不好,可以设置个次数限制。
模拟代码如下:
006.png更改代码逻辑如下: 当重新投递达到一定次数后,直接丢弃掉。
003.pngconsole显示:
004.png数据库显示:
005.png
网友评论