美文网首页
【Spring Cloud Stream】Rabbit Bind

【Spring Cloud Stream】Rabbit Bind

作者: 伊丽莎白2015 | 来源:发表于2023-02-04 21:21 被阅读0次

【前置文章】

【官方文档】

【本文内容】
主要是基于官方文档进行的整合。

  • 默认生成的exchange为topic类型。
  • 解释了group的配置,不配即为publish/subscribe模式,配了即为Work Queues模式。
  • 使用已经存在的exchange和queue。
  • RabbitMQ Binder的重试机制,以及如何配置死信队列。
  • 消费端设置并发数。
  • 消费端手动应答模式。

另外,没有介绍到Error Channels,可以参考官方文档:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream-binder-rabbit.html#rabbit-error-channels


1. Rabbit Binder

image.png

默认情况下RabbitMQ Binder创建的是TopicExchange。每个consumer group中,会有一个queue绑定到这个TopicExchange上。

image.png 对于匿名的consumers(即没有group属性的),会创建一个随机的queue,程序stop后这个queue会自动删除。如下图,AD的意思是auto-delete=true: image.png

默认情况下routingKey=#,即匹配所有:

image.png 比如发送消息的时候,我们按routingKey=abc发送,程序的consumer端照样能收到消息: image.png 收到的消息: image.png

2. 有无配置group导致的两种模式

2.1 Publish/Subscribe模式(fanout)

官方的Publish/Subscribe模式:https://www.rabbitmq.com/tutorials/tutorial-three-java.html

Publish/Subscribe模式,如需要更新所有节点上的基于内存的缓存: image.png

如果用Spring Boot实现,可以参考上述第4个前置文章(即:【RabbitMQ的那点事】订阅/发布模式下,动态新建queue,实现所有节点都能收到消息)。

用Spring Cloud Stream实现的话,只要不声明group即可(也就是第1章截图的匿名的consumers)。

2.2 Work Queues模式

官方的Work Queues模式:https://www.rabbitmq.com/tutorials/tutorial-two-java.html

用图表示,可以看到多个Consumer监听同一个Queue,依次拿到消息: image.png

用Spring Boot写的话,可以新建DirectExchange,再新建Queue,这时候的程序可以布署两个instance节点,那么在实际环境中两个节点就会依次收到消息。

用Spring Cloud Stream写的话,只需要加上group属性即可,比如我们有一个Consumer:

    @Bean
    public Consumer<String> log() {
        return person -> {
            System.out.println("Received: " + person);
        };
    }

那么在application.properties中定义:

spring.cloud.stream.bindings.log-in-0.destination=log-in-0
spring.cloud.stream.bindings.log-in-0.group=log

此时自动创建的exchange=log-in-0,queue=log-in-0.log,这时候的queue则为durable的queue,即程序down后queue还是存在,并不会被删除。

定义的方式也可按binding name进行配置,即先给log-in-0的绑定起个描述性的name,再根据该name进行定义group,这样自动创建的exchange则会=mylog,queue=mylog.log

spring.cloud.stream.function.bindings.log-in-0=mylog
spring.cloud.stream.bindings.mylog.group=log

3. 使用已经存在的exchange和queue

我们先在RabbitMQ Broker中新建以下关系:

  • exchange=order.in,queue= order,routingKey=order.in.routing,截图如下:

    image.png
  • exchange=order.out,queue=order.out.test,routingKey=order.out.routing,截图如下:

    image.png

我们在代码是创建了一个Function,用来接收来自queue=order,处理完成后发送给exchange=order.out

    @Bean
    public Function<String, String> handleOrder() {
        return order -> {
            System.out.println("Received: " + order);
            return "finished for " + order;
        };
    }

那么,在application.yaml中配置如下:

spring.cloud.stream.function.bindings.handleOrder-in-0=orderIn
spring.cloud.stream.bindings.orderIn.destination=order.in
spring.cloud.stream.bindings.orderIn.group=order
spring.cloud.stream.rabbit.bindings.orderIn.consumer.bindQueue=false
spring.cloud.stream.rabbit.bindings.orderIn.consumer.declareExchange=false
spring.cloud.stream.rabbit.bindings.orderIn.consumer.queueNameGroupOnly=true
spring.cloud.stream.rabbit.bindings.orderIn.consumer.bindingRoutingKey=order.in.routing

spring.cloud.stream.function.bindings.handleOrder-out-0=orderOut
spring.cloud.stream.bindings.orderOut.destination=order.out
spring.cloud.stream.rabbit.bindings.orderOut.producer.bindQueue=false
spring.cloud.stream.rabbit.bindings.orderOut.producer.declareExchange=false
spring.cloud.stream.rabbit.bindings.orderOut.producer.routingKeyExpression='order.out.routing'

【解释】
首先是通用配置,以spring.cloud.stream.bindings开头:

  • spring.cloud.stream.function.bindings.handleOrder-in-0=orderIn:这个在第1篇前置文章#4.3有讲到,为了配置方便,我们特定加了bindings的命名。
  • <通用配置>.<bindingsName>.destination表示exchnge的名字。是Spring Cloud Stream的通用配置。
  • <通用配置>.<bindingsName>.group,在#2.2有解释。

其次是RabbitMQ的专有配置,以spring.cloud.stream.rabbit.bindings开头,consumer端所有的配置,可以参考官方文档:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream-binder-rabbit.html#_rabbitmq_consumer_properties

  • <专有配置>.<bindingsName>.consumer.bindQueue:表示是否要创建queue并绑定到相应的exchange上。默认为true,即会创建并绑定。false表示不会新建。
  • <专有配置>.<bindingsName>.consumer.declareExchange:表示是否按destination的值创建exchange(默认为true,并与destination同名)。false表示不会新建。
  • <专有配置>.<bindingsName>.consumer.queueNameGroupOnly:默认为false。当值为true表示在consumer端的queue的名字等于我们定义的group的名字,即当我们想要使用已经存在的queue时,我们可以设置这个值为true,并把group的名字定义为queue的名字。
  • <专有配置>.<bindingsName>.consumer.bindingRoutingKey:默认为#(即all match,匹配所有的字符)。自定义的时候可以设置多个值,想要多个值时可配合bindingRoutingKeyDelimiter使用。

在Producer端的配置,可以参考官方文档:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream-binder-rabbit.html#rabbit-prod-props

  • <专有配置>.<bindingsName>.producer.routingKeyExpression:值得注意的是这个配置,它是SpEL表达式,对于已经确定的routingKey,需要用单引号标记,以表示是字符串,如我们的例子中的'order.out.routing'

4. RabbitMQ Binder的重试机制

当开启了重试机制,如果有消息被退回,那么将阻塞消费端(会一直重复消费该消息),通常情况下,可以使用Dead Letter Queue(死信队列)来解决这个问题。

#disable binder retries
spring.cloud.stream.bindings.orderIn.consumer.max-attempts=5
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.orderIn.consumer.dlq-ttl=50000
spring.cloud.stream.rabbit.bindings.orderIn.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.orderIn.consumer.dead-letter-queue-name=myDlq

【解释】

  • max-attempts表示重试次数,默认为3。如果要关闭重试,可以将value置为1。如果值>1,在遇到错误时,会先尝试重试,然后才是将信息放到Dead Letter Queue中。以下是max-attempts=5时的日志: image.png
    关于max-attempts,每重试一次,会在message的header中deliveryAttempt的值+1,比如第一次: image.png
    第二次重试时: image.png
  • dlq-ttl表示queue中的message的time-to-live的时间,即过期时间,这里的意思是生成的死信队列myDlx中的消息过期时间为5秒。
  • auto-bind-dlq=true表示会创建一个Dead Letter Queue并进行绑定,默认情况下生成的死信队列名为{destination}.dlq,我们上述的例子,默认的DLQ为order.dlq。也可通过dead-letter-queue-name进行重新命名,即我们的myDlq。
  • 如果没有特别定义,那么默认生成的Dead Letter Exchange名字则为DLX。也可通过dead-letter-exchange进行定义。另外默认生成的DLX类型为direct,可通过dead-letter-exchange-type配置成其它类型如fanout或topic。
  • 也可定义dead-letter-routing-key,即DLX和DLQ进行绑定时的routingKey,默认为{destination},我们的case中为order。截图如下: image.png

ps. 上述的DLQ和DLX默认的命名可以再通过配置prefix来加上前缀,如我们定义了prefix=apple,那么如果我们没有自己定义命名,默认的Queue则为apple.{destination}.dlq,默认的Exchange则为apple.DLX。

生成的DLX和DLQ如下:

order是原本的queue,myDlq是本章节配置生成的DLQ。 image.png DLX如下: image.png

【测试】
想要测试消息到达死信队列,那么可以在正常消费端的代码中抛出异常:AmqpRejectAndDontRequeueException。这个异常的名字已经很直白了,即否定应答(basic.reject)以及requeue=false

还有一种方式即先设置requeue-rejected值等于false(默认即为false),然后抛出任意错误即可。

spring.cloud.stream.rabbit.bindings.orderIn.consumer.requeue-rejected=false

比如我们定义Function如下,即收到消息等于字符串error,则报异常。

    @Bean
    public Function<String, String> handleOrder() {
        return order -> {
            log.info("handleOrder Received: " + order);

            if (order.equals("error")) {
                throw new AmqpRejectAndDontRequeueException("error");
            }
            return "finished for " + order;
        };
    }

同时我们给DLQ也设置了监听:

    @RabbitListener(queues = {"myDlq"})
    public void deadLetterListener(String message) {
        log.info("Received Dead Letter message: {}", message);
    }
运行程序,在RabbitMQ Console中发送消息=error: image.png

查看日志,首先是5次retry尝试,然后由于消费端抛出异常AmqpRejectAndDontRequeueException,消息转而通过DLX发送到了我们的死信队列中,即myDlq:

image.png

5. 消费端设置并发数

在消费端设置并发数,这个并不是RabbitMQ的参数,而是Spring Cloud Stream的通用参数,详见:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#_consumer_properties

比如我们设置了并发数为5:

spring.cloud.stream.bindings.orderIn.consumer.concurrency=5
那么消费端则会开5个线程来消费消息: image.png 默认并发数为1,即: image.png

6. 消费端手动应答模式

可通过设置acknowledge-mode,默认为AUTO

设置手动应答模式:

spring.cloud.stream.rabbit.bindings.orderIn.consumer.acknowledge-mode=MANUAL

消费端代码,代码还是通过channel和delivertyTag进行应答,和Spring Boot的代码差不多。

@Bean
    public Function<Message<String>, Message<String>> handleOrder() {
        return message -> {
            log.info("handleOrder Received: " + message.getPayload());

            Channel channel = message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
            Long deliveryTag = message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);

            try {
                if (message.getPayload().equals("error")) {
                    channel.basicNack(deliveryTag, false, false);
                } else {
                    channel.basicAck(deliveryTag, false);
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }

            return new GenericMessage<>("finished for " + message, message.getHeaders());
        };
    }

相关文章

网友评论

      本文标题:【Spring Cloud Stream】Rabbit Bind

      本文链接:https://www.haomeiwen.com/subject/oxlshdtx.html