美文网首页
【RabbitMQ的那点事】消息的可靠消费

【RabbitMQ的那点事】消息的可靠消费

作者: 伊丽莎白2015 | 来源:发表于2022-05-08 15:49 被阅读0次
  • Producer发送消息到Broker,Broker向Producer确认消息,这个步骤叫Publisher Confirms,详细在前面文章中有介绍——【RabbitMQ的那点事】发送方确认机制(Publisher Confirms):https://www.jianshu.com/p/15f0c1a105fb
  • 消息从Broker再发送至Consumer,Consumer向Broker确认消息,这个步骤叫acknowledgements,也就是消费端的消息可靠性,即本文内容。

1. 关于消费端确认模式(acknowledgements modes),官网文章:

2. 当注册一个Consumer程序时,可以选择多种投递方式:

  • 无ack模式(AcknowledgeMode.NONE),默认模式,意思是发送方不需要确认,即fire and forget(发后即忘,在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完)
  • 发送方需要消费端确认消息模式,(消费端需要告知消息已经收到)。这其中分两种:
    • 一种是自动确认:AcknowledgeMode.AUTO模式下,由Container自动应答,正确处理发出ack信息,处理失败发出nack信息,rabbitmq发出消息后将会等待consumer端的应答,只有收到ack确认信息才会把消息清除掉,收到nack信息的处理办法由setDefaultRequeueRejected()方法设置,所以在这种模式下,发生错误的消息是可以恢复的。
    • 另一种是手动确认,AcknowledgeMode.MANUAL。基本同AUTO模式,区别是需要人为调用方法给应答。

3. 想要消息的可靠消息,需要采用手动确认模式。

以下是manual mode的代码示例,参考了文章:https://javamana.com/2021/09/20210912132719181S.html

3.1 配置:
spring:
  rabbitmq:
    port: 5672
    host: localhost
    virtual-host: spring-boot-test
    listener:
      simple:
        acknowledge-mode: manual

如果我们使用了manual(需要手动在消费端确认消息模式),然后使用了之前的RabbitListener去接收消息:

    @RabbitListener(queues = "direct.queue")
    public void listen(String in) {
        System.out.println("Direct Message Listener: " + in);
    }
可以看到确实也能接收到消息,但看控制台,消息依旧还是处于Ready的状态,也就是消息虽然被投递过,但Broker并没有收到确认: UI Console:Queue
3.2 那么怎样进行手动确认?

还是使用@RabbitListener,但需要在参数中加上:

  • message,用来查看是否是再次投递过(getRedelivered)
  • channel用来回复ack
  • deliveryTag:消息投递序号,每个channel对应一个(long类型),从1开始到9223372036854775807范围,在手动消息确认时可以对指定delivery_tag的消息进行ack、nack、reject等操作。例如上面Ready中有4个消息,那么deliveryTag分别是:1, 2, 3, 4。
示例做了什么事情:

a. 接收消息
b. 处理(1除以0,业务处理报错),即没来得及做act
c. 在异常处理中尝试重新投递,如果发现已经重新投递过一次(通过判断messga的redelivered flag是否为true),如果是,那么拒绝消息。
d. 在异常中尝试再次返回队列处理

以上的处理涉及到的方法:
  • 告知Broker消息已收到,使用void basicAck(long deliveryTag, boolean multiple),其中deliveryTag在上述有介绍,是消息投递的序号,每个channel的唯一值。multiple如果设为true,则表示会确实该channel中的所有delivery tags。例如这个channel中有5, 6, 7, 8需要确定,如果调用basicAct(8, true),则会把5-8的delivery tag的消息都确认掉。如果multiple为false,那么例子中只会确认delivery tag = 8的消息。
  • 告知Broker消息消费失败:basicNack(long deliveryTag, boolean multiple, boolean requeue),此方法有三个参数,前两个和basicAct相似,最后一个是requeue,如果为true,则会将消息放回原队列头部。如果为false,在配置了dead letter exchange(死信通道)那么则会放这里,否则会丢掉。
  • 另一个方法basicReject(long deliveryTag, boolean requeue),和basicNack类似,也是告知Broker消息消费失败,只不过不能multiple确认。
@Slf4j
@Component
public class RabbitMQListener {
    @RabbitListener(queues = {"direct.queue"})
    public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        MessageProperties messageProperties = message.getMessageProperties();
        try {
            log.info("Get message: {}", message.toString());
            log.info("DeliveryTag: {}", tag);

            int a = 1/0; // 主动抛出错误
            channel.basicAck(tag, false);

        } catch (Exception e) {
            log.error("met exception......{}", e.getMessage());
            // 当前的消息是否重新投递的消息,也就是该消息是重新回到队列里的消息
            log.info("messageProperties.getRedelivered(): {}", messageProperties.getRedelivered());

            if (messageProperties.getRedelivered()) {
                log.info("消息已重复处理失败,拒绝再次接收...");
                // 拒绝消息
                channel.basicReject(tag, false);

            } else {
                log.info("消息即将再次返回队列处理...");
                channel.basicNack(tag, false, true);
            }
        }
    }
}

测试结果:

2022-05-08 15:17:07.232 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : Get message: (Body:'hello, i am direct message!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=direct-routing-key, deliveryTag=1, consumerTag=amq.ctag-i3qgOdtD8H4bm7_5OUQXTQ, consumerQueue=direct.queue])
2022-05-08 15:17:07.235 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : DeliveryTag: 1
2022-05-08 15:17:07.236 ERROR 32107 --- [ntContainer#4-1] RabbitMQListener : met exception....../ by zero
2022-05-08 15:17:07.236 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : messageProperties.getRedelivered(): false
2022-05-08 15:17:07.236 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : 消息即将再次返回队列处理...
2022-05-08 15:17:07.238 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : Get message: (Body:'hello, i am direct message!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=direct.exchange, receivedRoutingKey=direct-routing-key, deliveryTag=2, consumerTag=amq.ctag-i3qgOdtD8H4bm7_5OUQXTQ, consumerQueue=direct.queue])
2022-05-08 15:17:07.238 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : DeliveryTag: 2
2022-05-08 15:17:07.238 ERROR 32107 --- [ntContainer#4-1] RabbitMQListener : met exception....../ by zero
2022-05-08 15:17:07.238 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : messageProperties.getRedelivered(): true
2022-05-08 15:17:07.238 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : 消息已重复处理失败,拒绝再次接收...

关于配置:

有些文章用的是listener.direct.acknowledge-mode: manual
参考Spring Boot 2.x的文档:https://docs.spring.io/spring-boot/docs/2.0.0.RC1/reference/html/common-application-properties.html
原因是Spring AMQP现在支持两种container type了。
关于两种type的区别,可以看网友的文章——RabbitMQ笔记(七)-SimpleMessageListenerContainer和DirectMessageListenerContainer:https://blog.csdn.net/yingziisme/article/details/86418580

listener type

4. 总结

消费方的ACK机制可以有效的解决消息从Broker到Consumer丢失的问题。但也要注意一点:消息的无限消费。


5. 额外的一些测试

5.1 上述关于deliveryTag,范围是每个channel,例如:

启动两个Consumer,都去订阅同一个queue
这时候Producer往Broker中发10个消息,那么每个Consumer都能收到5个消息,DeliveryTag都是1,2,3,4,5

首先是Consumer端:
启动两个Instance:

@Slf4j
@Component
public class MultipleRabbitMQListener {
    @RabbitListener(queues = {"direct.queue"})
    public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        log.info("Get message: {}", new String(message.getBody()));
        log.info("DeliveryTag: {}", tag);

        ThreadUtils.sleep(30000); // 模拟业务处理时间
        channel.basicAck(tag, false);
    }
}

其次是Producer端,在两个Consumer instance启动后再启动:

    @Test
    public void sendMessageToDirectExchange1() {
        for (int i = 0; i < 10; i ++) {
            rabbitTemplate.convertAndSend("direct.exchange", "direct-routing-key", "message - " + i);
        }
    }

测试结果:可以看到每个Channel的DeliveryTag都是自增的。
另外,就算业务需要处理30秒,Broker在没有收到Consumer的ack之前,不会再把消息给另一个消费者。即:保证了同一个Queue消息只会被消费一次。

Consumer-1的log:

Consumer-1的log

Consumer-2的log:

Consumer-2的log

参考:
spring-rabbit消费过程解析及AcknowledgeMode选择https://blog.csdn.net/weixin_38380858/article/details/84963944

相关文章

网友评论

      本文标题:【RabbitMQ的那点事】消息的可靠消费

      本文链接:https://www.haomeiwen.com/subject/zgrqurtx.html