美文网首页Java 杂谈
Spring AMQP的使用

Spring AMQP的使用

作者: 无知者云 | 来源:发表于2019-07-28 10:07 被阅读2次

本文主要介绍如何在Spring Boot项目中使用RabbitMQ,源代码请参考:

https://github.com/davenkin/spring-amqp-learning

本文不是RabbitMQ的入门文章,需要事先对RabbitMQ的概念有所了解,初学者建议先行阅读RabbitMQ官网文档或者笔者的RabbitMQ学习笔记

Hello World

Spring AMQP在默认情况下已经为我们配置好了一个基本可用的RabbitMQ基础实施,这里我们先采用简单配置的方式来完成一个Hello World。

  • 第1步,本地启动RabbitMQ:创建一个docker-compose.yml文件:
version: "2"
services:
  rabbitmq:
    restart: always
    container_name: spring-amqp-learning-rabbitmq
    image: rabbitmq:3-management
    networks:
      - ecommerce-order-net
    environment:
      - "RABBITMQ_DEFAULT_USER=rabbitmq-user"
      - "RABBITMQ_DEFAULT_PASS=rabbitmq-password"
    volumes:
      - ecommerce-order-rabbitmq-data:/var/lib/rabbitmq
    ports:
      - "15673:15672"
      - "5673:5672"

networks:
  ecommerce-order-net:
    driver: bridge

volumes:
  ecommerce-order-rabbitmq-data:
    driver: local

此时,切换到在与docker-compose.yml文件相同目录,命令行运行docker-compose up后,通过http://localhost:15673/可以访问RabbitMQ的管理页面:

  • 第2步,加入对spring AMQP的依赖:在build.gradle文件中:
    implementation("org.springframework.boot:spring-boot-starter-amqp")
  • 第3步,Spring连接RabbitMQ,在application.yml文件中配置:
spring:
  rabbitmq:
    host: localhost
    port: 5673
    virtual-host: /
    username: rabbitmq-user
    password: rabbitmq-password
  • 第4步,配置Queue:创建RabbitHelloWorldConfiguration.java
@Configuration
public class RabbitHelloWorldConfiguration {
    @Bean
    public Queue helloWorldQueue() {
        return new Queue("HelloWorldQueue", false, false, false);
    }
}
  • 第5步,发送消息:创建RabbitHelloWorldController.java
@RestController
@RequestMapping(value = "/rabbit/helloworld")
public class RabbitHelloWorldController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping
    public void helloWorld() {
        rabbitTemplate.convertAndSend("HelloWorldQueue", "HelloWorld!" + LocalDateTime.now().toString());
    }
}
  • 第6步,接收消息:创建RabbitHelloWorldListener.java
@Component
@RabbitListener(queues = "HelloWorldQueue")
public class RabbitHelloWorldListener {
    private Logger logger = AutoNamingLoggerFactory.getLogger();

    @RabbitHandler
    public void receiveHelloWorld(String queueMessage) {
        logger.info("Received message:{}", queueMessage);
    }
}
  • 第7步,见证奇迹:启动程序./run.sh,发送消息curl http://localhost:8080/rabbit/helloworld,此时可以在命令行中看到日志输出:
2019-07-25 09:17:59.871 -- INFO  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] c.e.o.r.h.RabbitHelloWorldListener : Received message:HelloWorld!2019-07-25T09:17:59.869

表明消息已经成功发送并接收。

在对上文的“Hello World”做出解释之前,先介绍一下Spring AMQP中的主要对象类以及Spring Boot对RabbitMQ的默认配置。

Spring AMQP的主要对象

Spring AMQP主要对象类如下:

作用
Queue 对应RabbitMQ中Queue
AmqpTemplate 接口,用于向RabbitMQ发送和接收Message
RabbitTemplate AmqpTemplate的实现类
@RabbitListener 指定消息接收方,可以配置在类和方法上
@RabbitHandler 指定消息接收方,只能配置在方法上,可以与@RabbitListener一起使用
Message 对RabbitMQ消息的封装
Exchange 对RabbitMQ的Exchange的封装,子类有TopicExchange、FanoutExchange和DirectExchange等
Binding 将一个Queue绑定到某个Exchange,本身只是一个声明,并不做实际绑定操作
AmqpAdmin 接口,用于Exchange和Queue的管理,比如创建/删除/绑定等,自动检查Binding类并完成绑定操作
RabbitAdmin AmqpAdmin的实现类
ConnectionFactory 创建Connection的工厂类,RabbitMQ也有一个名为ConnectionFactory的类但二者没有继承关系,Spring ConnectionFactory可以认为是对RabbitMQ ConnectionFactory的封装
CachingConnectionFactory Spring ConnectionFactory的实现类,可以用于缓存Channel和Connection
Connection Spring中用于创建Channel的连接类,RabbitMQ也有一个名为Connection的类,但二者没有继承关系,Spring Connection是对RabbitMQ Connection的封装
SimpleConnection Spring Connection的实现类,将实际工作代理给RabbitMQ的Connection类
MessageListenerContainer 接口,消费端负责与RabbitMQ服务器保持连接并将Message传递给实际的@RabbitListener/@RabbitHandler处理
RabbitListenerContainerFactory 接口,用于创建MessageListenerContainer
SimpleMessageListenerContainer MessageListenerContainer的实现类
SimpleRabbitListenerContainerFactory RabbitListenerContainerFactory的实现类
RabbitProperties 用于配置Spring AMQP的Property类

对于发送方而言,需要做以下配置:

  • 配置CachingConnectionFactory
  • 配置Exchange/Queue/Binding
  • 配置RabbitAdmin创建上一步的Exchange/Queue/Binding
  • 配置RabbitTemplate用于发送消息,RabbitTemplate通过CachingConnectionFactory获取到Connection,然后想指定Exchange发送

对于消费方而言,需要做以下配置:

  • 配置CachingConnectionFactory
  • 配置Exchange/Queue/Binding
  • 配置RabbitAdmin创建上一步的Exchange/Queue/Binding
  • 配置RabbitListenerContainerFactory
  • 配置@RabbitListener/@RabbitHandler用于接收消息

可以看到,在Spring中使用RabbitMQ的配置还不少,但是Spring Boot已经帮我们做了很多默认配置,才使得我们刚才的“Hello World”可以如此简单的完成。

Spring Boot的默认RabbitMQ设施

在默认情况下,Spring Boot为我们配置了一下RabbitMQ的对象:

  • 一个CachingConnectionFactory
  • 一个名为“rabbitListenerContainerFactory”的SimpleRabbitListenerContainerFactory,默认自动查找IoC容器中的MessageConverter
  • 一个RabbitTemplate
  • 一个AmqpAdmin

因此,通常情况下,我们只需要配置自己的Exchange/Queue/Binding即可。

对Hello World的解释

有了以上知识,再让我们来看看刚才的“Hello World”背后发生了什么:

  • 首先,我们配置了一个名为“HelloWorldQueue”的Queue,但是并没有配置Exchange,也没有配置“HelloWorldQueue”的Binding,这其实使用了RabbitMQ的默认行为,即所有Queue都以其自身的名称为routingKey绑定到了一个默认的Exchange上,该默认Exchange的名称为“”
  • 由于Spring Boot自动配置了AmqpAdmin,该AmqpAdmin将自动向RabbitMQ创建名为“HelloWorldQueue”的Queue。
  • 在发送消息的时候,我们直接使用了RabbitTemplate,这个RabbitTemplate是Spring Boot自动为我们配置好的,RabbitTemplate所依赖的CachingConnectionFactory也由Spring Boot自动配置。
  • 发送消息时指定了routingKey为“HelloWorldQueue”,但是没有指定Exchange,此时消息将会发送到RabbitMQ默认的Exchange,又由于名为“HelloWorldQueue”的Queue向默认Exchange绑定的routingKey正是“HelloWorldQueue”,因此消息将由默认Exchange转发到名为“HelloWorldQueue” 的Queue。至此发送方任务结束。
  • 在消息接收方,由于Spring Boot默认为我们配置了SimpleRabbitListenerContainerFactory,因此只需要配置@RabbitListener和@RabbitHandler接收消息即可。

另外,Spring Boot采用了很多默认配置,通过统一的RabbitProperties同时完成消费方和生产方的配置,在默认情况下主要的配置如下:

配置项 默认值 作用
host localhost RabbitMQ服务器地址
port 5672 RabbitMQ服务器端口
username 账户名 guest
password 密码 guest
virtualHost RabbitMQ虚拟主机名 /
publisherConfirms false 设置是否启用生产方确认
publisherReturns false 设置是否启用生产方消息返回
ssl 对象 配置SSL,默认停用
template 对象 设置RabbitTemplate
template.retry 默认停用 设置RabbitTemplate发送消息时的重试,主要用于RabbitTemplate与RabbitMQ之间的网络连接
template.mandatory false 设置发送消息失败时(无接收queue)是否return 消息,与return callback一并使用
template.exchange "" 默认发送的exchange
template.routingKey "" 默认发送消息时的routing key
template.defaultReceiveQueue null 默认接收消息的queue
listener.simple 对象 设置SimpleRabbitListenerContainerFactory
listener.direct 对象 设置DirectRabbitListenerContainerFactory
listener.simple.concurrency null 并发消费方数量
listener.simple.acknowledgeMode AUTO 设置消费方确认模式,这里的AUTO与RabbitMQ的自动确认不是一回事
listener.simple.prefetch 250 设置消费方一次性接收消息的条数
listener.simple.defaultRequeueRejected true 当Listener发生异常时是否requeue
listener.simple.retry 对象 设置Listener的重试机制,默认停用,当启用时,Listener对于消息处理过程中的异常将进行requeue重试,超过重试次数再抛弃,此时AmqpRejectAndDontRequeueException异常也会被重试

所有默认配置如下:

spring:
  rabbitmq:
    host: localhost
    port: 5673
    virtual-host: /
    username: rabbitmq-user
    password: rabbitmq-password
    ssl:
      enabled: false
      keyStore:
      keyStoreType: PKCS12
      keyStorePassword:
      trustStore:
      trustStoreType: JKS
      trustStorePassword:
      algorithm:
      validateServerCertificate: true
      verifyHostname: true
    addresses:
    requestedHeartbeat:
    publisherConfirms: false
    publisherReturns: false
    connectionTimeout:
    cache:
      channel:
        size:
        checkoutTimeout:
      connection:
        mode: CHANNEL
        size:
    listener:
      type: SIMPLE
      simple:
        autoStartup: true
        acknowledgeMode:
        prefetch:
        defaultRequeueRejected:
        idleEventInterval:
        retry:
          enabled: false
          maxAttempts: 3
          initialInterval: PT1S
          multiplier: 1
          maxInterval: PT10S
          stateless: true
        concurrency:
        maxConcurrency:
        transactionSize:
        missingQueuesFatal: true
      direct:
        autoStartup: true
        acknowledgeMode:
        prefetch:
        defaultRequeueRejected:
        idleEventInterval:
        retry:
          enabled: false
          maxAttempts: 3
          initialInterval: PT1S
          multiplier: 1
          maxInterval: PT10S
          stateless: true
        consumersPerQueue:
        missingQueuesFatal: false
    template:
      retry:
        enabled: false
        maxAttempts: 3
        initialInterval: PT1S
        multiplier: 1
        maxInterval: PT10S
      mandatory:
      receiveTimeout:
      replyTimeout:
      exchange: ''
      routingKey: ''
      defaultReceiveQueue:
    parsedAddresses:

Exchange和Queue的定义

在使用RabbitMQ时,Exchange和Queue的定义是需要精心设计的,在笔者的RabbitMQ最佳实践文章中,我建议每个微服务对应一个Exchange等方式。另外,除了定义正常情况下的Exchange和Queue之外,我们还需要考虑到异常情况,因此我们还需要配置死信交换(Dead Letter Exchange, DLX)和死信队列(Dead Letter Queue, DLQ)用于对异常情况的处理。

接下来,我们围绕以下业务场景来定义相应的Exchange和Queue:

在电商系统中,当订单(Order)创建之后,需要给用于发送短信通知。

我们将采用RabbitMQ来完成这个过程,即下单之后发送消息,短信服务监听该消息然后完成短信发送。

在消息发送方,首先需要定义一个Exchange作为接收消息的第一站,在OrderRabbitMqConfig.java中配置:

    //"发送方Exchange"
    @Bean
    public TopicExchange orderPublishExchange() {
        return new TopicExchange(ORDER_PUBLISH_EXCHANGE, true, false, ImmutableMap.of("alternate-exchange", ORDER_PUBLISH_DLX));
    }

对于发送方而言,我们只需要关系消息能够成功发送到Exchange即可,而不用关心该消息实际是否到达了某个Queue以及哪个Queue。因此,发送方并不具备定义Queue的职责,定义一个Exchange即可。

然而,有时发送到Exchange的消息没有任何Queue可以接收,要么因为根本就没有绑定Queue,要么消息的RoutingKey设置导致无法路由到任何Queue。严格意义上讲,这些问题也不是发送发需要关心的事情,然而在很多时候,消息的发送方和接收方事实上都是关系密切的系统,为了更好的支持接收方,笔者建议在发送方将那些无法路由到任何Queue的消息先保存下来,以便后续处理。在RabbitMQ中,可以通过定义Exchange时的alternate-exchange参数指定当消息无法路由时应该发到的Exchange。

在上面定义的ORDER_PUBLISH_EXCHANGE中,我们其实已经指定了该Exchange的alternate-exchangeORDER_PUBLISH_DLXORDER_PUBLISH_DLX的定义如下:

    //"发送方DLX",消息发送失败时传到该DLX
    @Bean
    public TopicExchange orderPublishDlx() {
        return new TopicExchange(ORDER_PUBLISH_DLX, true, false, null);
    }

另外,定义一个Queue(ORDER_PUBLISH_DLQ)用于存储ORDER_PUBLISH_DLX中的消息:

    //"发送方DLQ",所有发到"发送DLX"的消息都将路由到该DLQ
    @Bean
    public Queue orderPublishDlq() {
        return new Queue(ORDER_PUBLISH_DLQ, true, false, false, ImmutableMap.of("x-queue-mode", "lazy"));
    }

    //"发送方DLQ"绑定到"发送方DLX"
    @Bean
    public Binding orderPublishDlqBinding() {
        return BindingBuilder.bind(orderPublishDlq()).to(orderPublishDlx()).with("#");
    }

此时,当发送方的消息到达了ORDER_PUBLISH_EXCHANGE而无法路由到任何Queue时,该消息会被RabbitMQ发送到ORDER_PUBLISH_DLX,进而路由到ORDER_PUBLISH_DLQ

至此,发送发的RabbitMQ配置完成。

在消息接收方,首先定义用于接收消息的Queue(ORDER_RECEIVE_QUEUE):

    //接收方的所有消息都发送到该"接收方Queue",即"接收方queue"可以绑定多个"发送方Exchange"
    @Bean
    public Queue orderReceiveQueue() {
        ImmutableMap<String, Object> args = ImmutableMap.of(
                "x-dead-letter-exchange",
                ORDER_RECEIVE_DLX,
                "x-overflow",
                "drop-head",
                "x-max-length",
                300000,
                "x-message-ttl",
                24 * 60 * 60 * 1000);
        return new Queue(ORDER_RECEIVE_QUEUE, true, false, false, args);
    }

    //"接收方queue"绑定到"发送方exchange"
    @Bean
    public Binding orderReceiveBinding() {
        return BindingBuilder.bind(orderReceiveQueue()).to(orderPublishExchange()).with("order.#");
    }

在消息处理的过程中,可能发生异常,此时需要将异常消息记录下来,同样采用DLX和DLQ机制。在定义ORDER_RECEIVE_QUEUE时,事实上我们已经指定了DLX为ORDER_RECEIVE_DLX,定义该DLX和对应的DLQ(ORDER_RECEIVE_DLQ)为:

    //"接收方DLX",消息处理失败时传到该DLX
    @Bean
    public TopicExchange orderReceiveDlx() {
        return new TopicExchange(ORDER_RECEIVE_DLX, true, false, null);
    }

    //"接收方DLQ",所有发到"接收DLX"的消息都将路由到该DLQ
    @Bean
    public Queue orderReceiveDlq() {
        return new Queue(ORDER_RECEIVE_DLQ, true, false, false, ImmutableMap.of("x-queue-mode", "lazy"));
    }

    //"接收方DLQ"绑定到"接收方DLX"
    @Bean
    public Binding orderReceiveDlqBinding() {
        return BindingBuilder.bind(orderReceiveDlq()).to(orderReceiveDlx()).with("#");
    }

此外,对于发送到ORDER_RECEIVE_DLQ的死信消息,我们需要对其进行手动处理,一种处理方式便是在后台定期运行一个Job将死信消息重新处理。为此,我们需要定义一个用于手动回复的Exchange(ORDER_RECEIVE_RECOVER_EXCHANGE),并将接收方的所有的消费Queue绑定到该Exchange:

    //"接收方恢复Exchange",用于手动将"接收方DLQ"中的消息发到该DLX进行重试
    @Bean
    public TopicExchange orderReceiveRecoverExchange() {
        return new TopicExchange(ORDER_RECEIVE_RECOVER_EXCHANGE, true, false, null);
    }

    //"接收方Queue"绑定到"接收方恢复Exchange"
    @Bean
    public Binding orderReceiveRecoverBinding() {
        return BindingBuilder.bind(orderReceiveQueue()).to(orderReceiveRecoverExchange()).with("#");
    }

以上关于Exchange/Queue和DLX/DLQ的配置可以成为一种模式,使得我们一方面能够尽量少地配置RabbitMQ的基础设施,另一方面又能对异常情况进行处理。在该模式下存在以下概念:

概念 类型 解释 命名 示例
发送方Exchange Exchange 用于接收一个系统(比如微服务)中所有消息的Exchange,一个系统只有一个发送方Exchange xxx-publish-x order-publish-x
发送方DLX Exchange 用于接收发送方无法路由的消息 xxx-publish-dlx order-publish-dlx
发送方DLQ Queue 用于存放发送方DLX的消息 xxx-publish-dlq order-publish-dlq
接收方Queue Queue 用于接收发送方Exchange的消息,一个系统只有一个接收方Queue xxx-receive-q product-receive-q
接收方DLX Exchange 用于接收消费失败的消息 xxx-receive-dlx product-receive-dlx
接收方DLQ Queue 用于存放接收方DLX的消息 xxx-receive-dlq product-receive-dlq
接收方恢复Exchange Exchange 用于接收从接收方DLQ中手动恢复的消息,接收方Queue应该绑定到接收方恢复Exchange xxx-receive-recover-x product-receive-recover-x

该模式的RabbitMQ配置架构图如下:


一种RabbitMQ配置架构

消息序列化

RabbitMQ中所有消息都是二进制的,在Spring中我们需要在Java对象与RabbitMQ消息体之间进行相互转换。在默认情况下,Spring Boot使用了SimpleMessageConverter用于转换String,Java的Serializable对象以及字节数组。通常情况下,我们希望使用Json格式,此时可以采用以下配置:

    @Bean
    public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(objectMapper);
        messageConverter.setClassMapper(classMapper());
        return messageConverter;
    }

    @Bean
    public DefaultClassMapper classMapper() {
        DefaultClassMapper classMapper = new DefaultClassMapper();
        classMapper.setTrustedPackages("*");
        return classMapper;
    }

Spring的RabbitTemplateRabbitListenerContainerFactory会自动找到以上配置的MessageConverter并使用之。此时,在RabbitTemplate发送消息时,它会在消息的头部加入Java对象的类型信息,然后在RabbitListener接收到消息时,通过该头部的类型信息指导应该发序列化为哪个类的对象。

在消费消息时,配置@RabbitListener

@Component
@RabbitListener(queues = {ORDER_RECEIVE_QUEUE})
public class OrderEventNotificationListener {
    private Logger logger = AutoNamingLoggerFactory.getLogger();

    @RabbitHandler
    public void onOrderCreated(OrderCreatedEvent event) {
        // send notification
        //sendSmsNotification(event);

        logger.info("Notification sent for OrderCreatedEvent:{}", event.getOrderId());
        throw new RuntimeException("");
    }

}

其中,@RabbitListener可以用于类和方法,而@RabbitHandler只能用于方法,通常的用法是:@RabbitListener用在类上,接收不同Queue的不同类型消息,然后在方法上使用@RabbitHandler来处理特定类型的消息。

需要指出的是,不同的处理方法之间必须没有歧义,即我们需要保证对于一种类型的消息,只有一个方法可以接收。比如:对于OrderCreatedEvent,如果它还有个父类叫OrderEvent,那么如果再添加一个方法用于处理OrderEvent,那么将会报错:

@Component
@RabbitListener(queues = {ORDER_RECEIVE_QUEUE})
public class OrderEventNotificationListener {
    private Logger logger = AutoNamingLoggerFactory.getLogger();

    @RabbitHandler
    public void onOrderCreated(OrderCreatedEvent event) {
        // send notification
        //sendSmsNotification(event);

        logger.info("Notification sent for OrderCreatedEvent:{}", event.getOrderId());
    }
    
    @RabbitHandler
    public void onOrderEvent(OrderEvent event) {
        logger.info("Received order event");
    }

}

此时报错为:

Caused by: org.springframework.amqp.AmqpException: Ambiguous methods for payload type: class com.ecommerce.order.order.model.OrderCreatedEvent: onOrderCreated and onOrderEvent
    at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.findHandlerForPayload(DelegatingInvocableHandler.java:206)
    at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:147)
    at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:129)
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:60)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
    ... 26 common frames omitted

发送方确认

要使用发送方确认需要:

  1. 设置publisherConfirms = true
  2. 在发送消息时,加入CorrelationData
        rabbitTemplate.convertAndSend(ORDER_PUBLISH_EXCHANGE,
                "order.created",
                event,
                new CorrelationData(event.getOrderId()));
  1. 在RabbitTemplate中注册确认CallbacksetConfirmCallback(),并通过CorrelationData确定是哪条消息被确认了:
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            String eventId = correlationData.getId();
            if (ack) {
                logger.info("Publish confirmed event[{}].", eventId);
            } else {
                logger.warn("Domain event[{}] is nacked while publish:{}.", eventId, cause);
            }

        });

在正常情况下,运行OrderApiTest.should_create_order(),输出的日志为:

2019-07-27 14:09:13.517 -- INFO  [AMQP Connection 127.0.0.1:5673] c.e.o.order.OrderApplicationService : Publish confirmed event[89cacd65a550493c95957729c4f24066].

表明消息被RabbitMQ确认已经收到。在RabbitMQ无法确保消息成功投递时,将作nack确认,比如发送方Exchange不存在,此时输出的日志为:

2019-07-27 14:14:12.731 -- WARN  [rabbitConnectionFactory1] c.e.o.order.OrderApplicationService : Domain event[d27e1afdd4ea449d827af272209e9125] is nacked while publish:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'dfd' in vhost '/', class-id=60, method-id=40).

要使用发送方返回需要:

  1. 设置publisherReturns = true
  2. 设置template.mandatory = true
  3. 在RabbitTemplate中注册返回CallbacksetReturnCallback()

通常来讲,我们没有必要对返回消息进行处理,因此这里不展开讲解。

消费方确认

Spring AMQP在默认情况下是启用了消费方确认的,它会根据消费方是否抛出异常来决定是ack还是nack。AcknowledgeMode用于配置Spring AMQP的消费方确认行为,其值有以下几种:

  • NONE:无消费方确认,即表示消费方自动确认,等同于RabbitMQ的autoAck=true
  • MANNUAL:消费方手动确认,用户需要自行调用ack/nack操作
  • AUTO:默认值,Spring根据Listener中是否抛出异常来决定是ack还是nack,需要注意的是,这里的AUTO与RabbitMQ的autoAck是不同的东西。

除了AcknowledgeMode外,消费方确认的行为还受defaultRequeueRejected参数的影响,当defaultRequeueRejected=true(默认值)时,表示如果消费方拒绝了消息,那么消息将被重新放入Queue中以便下次重新消费,当为false时,被拒绝的消息将不会被重新放入Queue中,而是要么直接抛弃,要么放入DLX。

另外,通过抛出指定异常可以绕开defaultRequeueRejected的影响:

  • AmqpRejectAndDontRequeueException:表示直接将消息抛弃或者DLX,而不是Requeue,无论defaultRequeueRejected的值是什么。
  • ImmediateRequeueAmqpException:与AmqpRejectAndDontRequeueException正好相反,表示立即将消息进行Requeue处理,无论defaultRequeueRejected的值是什么。

当Listener中抛出异常时,Spring 默认采用ConditionalRejectingErrorHandler来判断哪些异常(通常是不能恢复的异常)可以对消息进行Requeue操作,哪些不能。

在消费消息时,还可以配置重试机制,Spring直接支持消费方的重试(spring-retry),只是在默认情况下是禁用的,可以通过以下方式启用消费方重试:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true

需要注意的是,这里的重试并没有RabbitMQ的参与,只是Spring内部的重试。也就是说,重试时并不是讲消息做Requeue操作,而是拿到同一条消息在Spring内部做重复处理。默认情况下,重试最大此时为3,当达到最大重试次数时,Spring默认将使用RejectAndDontRequeueRecoverer对消息进行恢复处理,该类将不会对消息做Requeue操作,而是直接抛弃/DLX掉。

在没有启用重试机制的情况下,并且defaultRequeueRejected=true,此时如果业务处理过程中一直抛出异常,那么将会导致消息被不断Requeue的情况,为了避免这种情况,要么启用重试机制;要么将defaultRequeueRejected设置为false;或者同时启动二者。笔者推荐同时启用二者。

综上,在使用RabbitMQ时,我们希望采用以下实践:

  • 默认开启生产方确认
  • 默认设置defaultRequeueRejected=false
  • 默认设置消费方重试

为此,除了直接在application.yml中配置之外,还可以将这些默认配置固化到代码中,创建一个ConnectionFactory并且默认开启生产方确认(setPublisherConfirms(true)):

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setAddresses(rabbitProperties.getAddresses());
        factory.setUsername(rabbitProperties.getUsername());
        factory.setPassword(rabbitProperties.getPassword());
        factory.setPort(rabbitProperties.getPort());
        factory.setVirtualHost(rabbitProperties.getVirtualHost());
        factory.setPublisherConfirms(true);
        return factory;
    }

默认配置消费方的defaultRequeueRejected=false以及重试:

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
                                                                               MessageConverter messageConverter,
                                                                               TaskExecutor taskExecutor) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setDefaultRequeueRejected(false);
        factory.setPrefetchCount(rabbitProperties.getListener().getSimple().getPrefetch());
        factory.setConcurrentConsumers(rabbitProperties.getListener().getSimple().getConcurrency());
        factory.setMaxConcurrentConsumers(rabbitProperties.getListener().getSimple().getMaxConcurrency());
        factory.setMessageConverter(messageConverter);

        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(1000L);
        RetryOperationsInterceptor build = RetryInterceptorBuilder.stateless()
                .maxAttempts(5)
                .backOffPolicy(fixedBackOffPolicy)
                .recoverer(new RejectAndDontRequeueRecoverer())
                .build();
        factory.setAdviceChain(build);

        factory.setTaskExecutor(taskExecutor);
        return factory;
    }

相关文章

网友评论

    本文标题:Spring AMQP的使用

    本文链接:https://www.haomeiwen.com/subject/mjeezqtx.html