美文网首页java
RabbitMQ消息确认(二)——消费者接收消息手动ACK

RabbitMQ消息确认(二)——消费者接收消息手动ACK

作者: 砒霜拌辣椒 | 来源:发表于2020-09-17 23:05 被阅读0次

    消息接收的确认机制主要有三种模式:

    1. 自动确认AcknowledgeMode.NONE
      RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
      所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
      一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。

    2. 根据情况确认AcknowledgeMode.AUTO
      这也是SpringBoot集成RabbitMQ默认的消息确认情况,如果消费消息时有异常抛出,则会拒绝消息,反之如果没有捕获到异常则确认本次消费成功。

    3. 手动确认AcknowledgeMode.MANUAL
      这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。
      消费者收到消息后,手动调用basicAck/basicNack/basicReject后,RabbitMQ收到这些消息后,才认为本次投递成功。

    1、创建手动确认消息的队列

    @Configuration
    public class DirectRabbitConfig {
        //Direct交换机 起名:directExchange
        @Bean
        public DirectExchange directExchange() {
            return new DirectExchange("directExchange", true, false);
        }
    
        //需要手动确认消息的队列
        @Bean
        public Queue manualAckQueue() {
            return new Queue("manualAckQueue", true, false, false);
        }
    
        //手动确认消息的队列和直连交换机绑定
        @Bean
        public Binding bindingDirectForManualAck() {
            return BindingBuilder.bind(manualAckQueue()).to(directExchange()).with("manualAck");
        }
    }
    

    2、手动确认消息的监听实现

    2.1、通过配置实现

    spring:
      rabbitmq:
        host: 148.70.153.63
        port: 5672
        username: libai
        password: password
        listener:
          simple:
            # 手动确认
            acknowledge-mode: manual
            # 拒绝消息是否重回队列
            default-requeue-rejected: true
    

    2.2、配置类实现(更加灵活)

    @Configuration
    @Slf4j
    public class MessageManualAckListenerConfig {
        @Bean
        public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            // RabbitMQ默认是自动确认,这里改为手动确认消息
            container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            // 设置需要手动确认消息的队列,可以同时设置多个,前提是队列需要提前创建好
            container.setQueueNames("manualAckQueue");
            // 设置监听消息的方法,匿名内部类方式
            container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
                // 开始消费消息
                log.info("body:\n{}", JSONUtil.toJsonPrettyStr(new String(message.getBody())));
                log.info("prop:\n{}", JSONUtil.toJsonPrettyStr(message.getMessageProperties()));
    
                // 手动确认
                long deliveryTag = message.getMessageProperties().getDeliveryTag();
                channel.basicAck(deliveryTag, false); // 肯定确认
            });
            return container;
        }
    }
    

    body:接收的消息内容。
    messageProperties:消息的相关属性。

    3、发送消息测试手动确认

    3.1、调用接口

    @RestController
    public class MessageManualAckController {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostMapping("/manualAck")
        public String manualAck() {
            Map<String, Object> map = new HashMap<>();
            map.put("messageId", String.valueOf(UUID.randomUUID()));
            map.put("messageData", "manualAck");
            map.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            rabbitTemplate.convertAndSend("directExchange", "manualAck", JSONUtil.toJsonStr(map));
            return "ok";
        }
    }
    

    3.2、查看控制台打印输出。

    2020-09-17 22:35:28,635 [INFO] [simpleMessageListenerContainer-1] [net.zhaoxiaobin.rabbitmq.manual.MessageManualAckListenerConfig:32] [] body:
    {
        "createTime": "2020-09-17 22:35:28",
        "messageId": "25398d1a-474e-48cd-a3df-460b780e9d97",
        "messageData": "manualAck"
    }
    2020-09-17 22:35:28,636 [INFO] [simpleMessageListenerContainer-1] [net.zhaoxiaobin.rabbitmq.manual.MessageManualAckListenerConfig:33] [] prop:
    {
        "headers": {
            "spring_listener_return_correlation": "a6622c5e-22f0-4f39-bea5-4360ef8de66b"
        },
        "finalRetryForMessageWithNoId": false,
        "contentLengthSet": false,
        "deliveryTag": 3,
        "receivedExchange": "directExchange",
        "priority": 0,
        "receivedRoutingKey": "manualAck",
        "redelivered": false,
        "consumerTag": "amq.ctag-cqCpyMhe9Ak2vuv_RifFlQ",
        "receivedDeliveryMode": "PERSISTENT",
        "publishSequenceNumber": 0,
        "contentEncoding": "UTF-8",
        "contentLength": 0,
        "contentType": "text/plain",
        "consumerQueue": "manualAckQueue",
        "deliveryTagSet": true
    }
    

    3.3、消费消息时的状态变化

    通过打断点方式查看当消息未被确认时在RabbitMQ server中的状态。

    unacked

    4、确认/拒绝消息

    4.1、basicAck

    确认消息。
    第2个参数如果设为true,则表示批量确认当前通道中所有deliveryTag小于当前消息的所有消息。

    4.2、basicNack

    拒绝消息。
    第2个参数如果设为true,则表示批量拒绝当前通道中所有deliveryTag小于当前消息的所有消息。
    第3个参数如果设为true,则表示当前消息再次回到队列中等待被再次消费。

    4.3、basicReject

    拒绝消息。与basicNack作用类似,只不过一次只能拒绝单条消息。

    对于拒绝消息并且重回队列使用时需要谨慎,避免使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压。

    RabbitMQ消息确认(一)——生产者推送消息

    参考链接

    代码地址

    相关文章

      网友评论

        本文标题:RabbitMQ消息确认(二)——消费者接收消息手动ACK

        本文链接:https://www.haomeiwen.com/subject/ubotyktx.html