【前置文章】
- 【Spring Cloud Stream】入门,并简单的配置Consumer进行消息接收
- 【Spring Cloud Stream】发送和接收消息
- 【RabbitMQ的那点事】Exchange类型(超详细)
- 【RabbitMQ的那点事】订阅/发布模式下,动态新建queue,实现所有节点都能收到消息
- 【RabbitMQ的那点事】死信列队(Dead Letter Queue)
【官方文档】
【本文内容】
主要是基于官方文档进行的整合。
- 默认生成的exchange为
topic
类型。 - 解释了
group
的配置,不配即为publish/subscribe模式,配了即为Work Queues模式。 - 使用已经存在的exchange和queue。
- RabbitMQ Binder的重试机制,以及如何配置死信队列。
- 消费端设置并发数。
- 消费端手动应答模式。
另外,没有介绍到Error Channels,可以参考官方文档:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream-binder-rabbit.html#rabbit-error-channels
1. Rabbit Binder

默认情况下RabbitMQ Binder创建的是TopicExchange
。每个consumer group中,会有一个queue绑定到这个TopicExchange上。


默认情况下routingKey=#
,即匹配所有:



2. 有无配置group导致的两种模式
2.1 Publish/Subscribe模式(fanout)
官方的Publish/Subscribe模式:https://www.rabbitmq.com/tutorials/tutorial-three-java.html
Publish/Subscribe模式,如需要更新所有节点上的基于内存的缓存:
如果用Spring Boot实现,可以参考上述第4个前置文章(即:【RabbitMQ的那点事】订阅/发布模式下,动态新建queue,实现所有节点都能收到消息)。
用Spring Cloud Stream实现的话,只要不声明group即可(也就是第1章截图的匿名的consumers)。
2.2 Work Queues模式
官方的Work Queues模式:https://www.rabbitmq.com/tutorials/tutorial-two-java.html
用图表示,可以看到多个Consumer监听同一个Queue,依次拿到消息:
用Spring Boot写的话,可以新建DirectExchange,再新建Queue,这时候的程序可以布署两个instance节点,那么在实际环境中两个节点就会依次收到消息。
用Spring Cloud Stream写的话,只需要加上group属性即可,比如我们有一个Consumer:
@Bean
public Consumer<String> log() {
return person -> {
System.out.println("Received: " + person);
};
}
那么在application.properties
中定义:
spring.cloud.stream.bindings.log-in-0.destination=log-in-0
spring.cloud.stream.bindings.log-in-0.group=log
此时自动创建的exchange=log-in-0
,queue=log-in-0.log
,这时候的queue则为durable的queue,即程序down后queue还是存在,并不会被删除。
定义的方式也可按binding name进行配置,即先给log-in-0
的绑定起个描述性的name,再根据该name进行定义group,这样自动创建的exchange则会=mylog
,queue=mylog.log
:
spring.cloud.stream.function.bindings.log-in-0=mylog
spring.cloud.stream.bindings.mylog.group=log
3. 使用已经存在的exchange和queue
我们先在RabbitMQ Broker中新建以下关系:
-
exchange=
order.in
,queue=order
,routingKey=order.in.routing
,截图如下:image.png
-
exchange=
order.out
,queue=order.out.test
,routingKey=order.out.routing
,截图如下:image.png
我们在代码是创建了一个Function,用来接收来自queue=order
,处理完成后发送给exchange=order.out
:
@Bean
public Function<String, String> handleOrder() {
return order -> {
System.out.println("Received: " + order);
return "finished for " + order;
};
}
那么,在application.yaml
中配置如下:
spring.cloud.stream.function.bindings.handleOrder-in-0=orderIn
spring.cloud.stream.bindings.orderIn.destination=order.in
spring.cloud.stream.bindings.orderIn.group=order
spring.cloud.stream.rabbit.bindings.orderIn.consumer.bindQueue=false
spring.cloud.stream.rabbit.bindings.orderIn.consumer.declareExchange=false
spring.cloud.stream.rabbit.bindings.orderIn.consumer.queueNameGroupOnly=true
spring.cloud.stream.rabbit.bindings.orderIn.consumer.bindingRoutingKey=order.in.routing
spring.cloud.stream.function.bindings.handleOrder-out-0=orderOut
spring.cloud.stream.bindings.orderOut.destination=order.out
spring.cloud.stream.rabbit.bindings.orderOut.producer.bindQueue=false
spring.cloud.stream.rabbit.bindings.orderOut.producer.declareExchange=false
spring.cloud.stream.rabbit.bindings.orderOut.producer.routingKeyExpression='order.out.routing'
【解释】
首先是通用配置,以spring.cloud.stream.bindings
开头:
-
spring.cloud.stream.function.bindings.handleOrder-in-0=orderIn
:这个在第1篇前置文章#4.3有讲到,为了配置方便,我们特定加了bindings的命名。 -
<通用配置>.<bindingsName>.destination
表示exchnge的名字。是Spring Cloud Stream的通用配置。 -
<通用配置>.<bindingsName>.group
,在#2.2有解释。
其次是RabbitMQ的专有配置,以spring.cloud.stream.rabbit.bindings
开头,consumer端所有的配置,可以参考官方文档:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream-binder-rabbit.html#_rabbitmq_consumer_properties:
-
<专有配置>.<bindingsName>.consumer.bindQueue
:表示是否要创建queue并绑定到相应的exchange上。默认为true,即会创建并绑定。false表示不会新建。 -
<专有配置>.<bindingsName>.consumer.declareExchange
:表示是否按destination的值创建exchange(默认为true,并与destination同名)。false表示不会新建。 -
<专有配置>.<bindingsName>.consumer.queueNameGroupOnly
:默认为false。当值为true表示在consumer端的queue的名字等于我们定义的group的名字,即当我们想要使用已经存在的queue时,我们可以设置这个值为true,并把group的名字定义为queue的名字。 -
<专有配置>.<bindingsName>.consumer.bindingRoutingKey
:默认为#(即all match,匹配所有的字符)。自定义的时候可以设置多个值,想要多个值时可配合bindingRoutingKeyDelimiter
使用。
在Producer端的配置,可以参考官方文档:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream-binder-rabbit.html#rabbit-prod-props
-
<专有配置>.<bindingsName>.producer.routingKeyExpression
:值得注意的是这个配置,它是SpEL表达式,对于已经确定的routingKey,需要用单引号标记,以表示是字符串,如我们的例子中的'order.out.routing'
。
4. RabbitMQ Binder的重试机制
当开启了重试机制,如果有消息被退回,那么将阻塞消费端(会一直重复消费该消息),通常情况下,可以使用Dead Letter Queue(死信队列)来解决这个问题。
#disable binder retries
spring.cloud.stream.bindings.orderIn.consumer.max-attempts=5
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.orderIn.consumer.dlq-ttl=50000
spring.cloud.stream.rabbit.bindings.orderIn.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.orderIn.consumer.dead-letter-queue-name=myDlq
【解释】
-
max-attempts
表示重试次数,默认为3。如果要关闭重试,可以将value置为1。如果值>1,在遇到错误时,会先尝试重试,然后才是将信息放到Dead Letter Queue中。以下是max-attempts=5
时的日志:image.png
关于max-attempts
,每重试一次,会在message的header中deliveryAttempt
的值+1,比如第一次:image.png
第二次重试时:image.png
-
dlq-ttl
表示queue中的message的time-to-live的时间,即过期时间,这里的意思是生成的死信队列myDlx中的消息过期时间为5秒。 -
auto-bind-dlq=true
表示会创建一个Dead Letter Queue并进行绑定,默认情况下生成的死信队列名为{destination}.dlq
,我们上述的例子,默认的DLQ为order.dlq。也可通过dead-letter-queue-name
进行重新命名,即我们的myDlq。 - 如果没有特别定义,那么默认生成的Dead Letter Exchange名字则为DLX。也可通过
dead-letter-exchange
进行定义。另外默认生成的DLX类型为direct,可通过dead-letter-exchange-type
配置成其它类型如fanout或topic。 - 也可定义
dead-letter-routing-key
,即DLX和DLQ进行绑定时的routingKey,默认为{destination},我们的case中为order。截图如下:image.png
ps. 上述的DLQ和DLX默认的命名可以再通过配置prefix
来加上前缀,如我们定义了prefix=apple,那么如果我们没有自己定义命名,默认的Queue则为apple.{destination}.dlq,默认的Exchange则为apple.DLX。
生成的DLX和DLQ如下:


【测试】
想要测试消息到达死信队列,那么可以在正常消费端的代码中抛出异常:AmqpRejectAndDontRequeueException
。这个异常的名字已经很直白了,即否定应答(basic.reject
)以及requeue=false
。
还有一种方式即先设置requeue-rejected
值等于false(默认即为false),然后抛出任意错误即可。
spring.cloud.stream.rabbit.bindings.orderIn.consumer.requeue-rejected=false
比如我们定义Function如下,即收到消息等于字符串error,则报异常。
@Bean
public Function<String, String> handleOrder() {
return order -> {
log.info("handleOrder Received: " + order);
if (order.equals("error")) {
throw new AmqpRejectAndDontRequeueException("error");
}
return "finished for " + order;
};
}
同时我们给DLQ也设置了监听:
@RabbitListener(queues = {"myDlq"})
public void deadLetterListener(String message) {
log.info("Received Dead Letter message: {}", message);
}
运行程序,在RabbitMQ Console中发送消息=error:

查看日志,首先是5次retry尝试,然后由于消费端抛出异常AmqpRejectAndDontRequeueException
,消息转而通过DLX发送到了我们的死信队列中,即myDlq:

5. 消费端设置并发数
在消费端设置并发数,这个并不是RabbitMQ的参数,而是Spring Cloud Stream的通用参数,详见:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#_consumer_properties
比如我们设置了并发数为5:
spring.cloud.stream.bindings.orderIn.consumer.concurrency=5
那么消费端则会开5个线程来消费消息:


6. 消费端手动应答模式
可通过设置acknowledge-mode
,默认为AUTO
。
设置手动应答模式:
spring.cloud.stream.rabbit.bindings.orderIn.consumer.acknowledge-mode=MANUAL
消费端代码,代码还是通过channel和delivertyTag进行应答,和Spring Boot的代码差不多。
@Bean
public Function<Message<String>, Message<String>> handleOrder() {
return message -> {
log.info("handleOrder Received: " + message.getPayload());
Channel channel = message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
Long deliveryTag = message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
try {
if (message.getPayload().equals("error")) {
channel.basicNack(deliveryTag, false, false);
} else {
channel.basicAck(deliveryTag, false);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return new GenericMessage<>("finished for " + message, message.getHeaders());
};
}
网友评论