美文网首页
RabbtiMQ系列-5.SpringBoot集成RabbitM

RabbtiMQ系列-5.SpringBoot集成RabbitM

作者: Tian_Peng | 来源:发表于2020-12-14 16:27 被阅读0次

    一.概述

    本文介绍Springboot如何集成RabbitMQ
    本文内容较多,开始为最简单的自动确认类的消息发送和消费,后续有发送方确认和接收方确认机制示例
    关于RabbitMQ安装就不说了,网上有很多
    关于RabbitMQ的一些基础概念和知识请查看前几篇文章

    二.RabbitMQ管理后台

    安装好RabbitMQ并启用管理后台,访问localhost:15672,输入默认的用户密码guest/guest得到如下界面


    三.SpringBoot集成

    生产者集成

    1.创建项目

    我们首先创建一个Springboot项目demo-rabbitmq-producer


    项目的pom.xml文件中引入依赖:
    <!--springboot整合rabbitMQ只需引入amqp起步依赖-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>      
    

    完整的pom.xml文件如下(注意demo-rabbitmq-common是上面项目截图的自己定义的模块项目,里面只有一个User实体类,为了后续测试发送实体类消息增加的):

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.4.1</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.tp</groupId>
        <artifactId>demo-rabbitmq-producer</artifactId>
        <version>1.0.0-SNAPSHOT</version>
        <name>demo-rabbit-mq-producer</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <!--web-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!--springboot整合rabbitMQ只需引入amqp起步依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <!--lombok-->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <!--demo-rabbitmq-common-->
            <dependency>
                <groupId>com.tp</groupId>
                <artifactId>demo-rabbitmq-common</artifactId>
                <version>1.0.0-SNAPSHOT</version>
            </dependency>
            <!--test-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>
    

    application.yml文件中进行配置:

    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest
        virtual-host: TP-HOST #虚拟主机,可以不设置使用server默认host
    

    注意:要保证guest用户对虚拟主机有读写操作权限,具体去RabbitMQ后台管理系统配置

    2.配置序列化策略

    RabbitMQ序列化的选择可以是jdk序列化,hessian,jackson,protobuf等
    而对于Java应用默认的序列化采用的是jdk序列化
    SimpleMessageConverter对于要发送的消息体body为字节数组时,不进行处理。
    消息本身假设是String,则将String转成字节数组,假设是Java对象,则使用jdk序列化将消息转成字节数组,转出来的结果较大,含class类名、类对应方法等信息,因此性能较差。
    hessian、protobuf等都是基于压缩反复字段的思想,降低数据传输量以提高性能。
    jackson是以json表示来数据传输,性能优于jdk序列化
    所以使用RabbitMq作为中间件时,数据量比较大,此时就要考虑使用类似Jackson2JsonMessageConverter、hessian等序列化形式,以此提高性能。

    在生产者端增加RabbitMQConfig.java:

    package com.tp.demo.rabbitmq.producer.config;
    
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    
    /**
     * FileName:   RabbitMQConfig
     * Author:     TP
     * Date:       12/13/20 4:31 PM
     * Description:开启消息发送消息确认
     */
    @Configuration
    public class RabbitMQConfig {
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate();
            rabbitTemplate.setConnectionFactory(connectionFactory);
            // 设置序列化策略
            rabbitTemplate.setMessageConverter(jsonMessageConverter());
            return rabbitTemplate;
        }
    
        @Bean
        public MessageConverter jsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    }
    
    3.声明交换机、队列信息

    需要注意的是,虽然官方推荐在生产者和消费者都增加交换机、队列和绑定关系声明,但个人更推荐生产环境通过管理后台创建交换机、队列和声明绑定关系,一方面可以防止因为开发人员的代码错误引发不必要的问题,另一方面也可以防止每次启动创建上述这些东西
    而如果我们无论在生产者端还是在消费者端进行声明交换机、队列、将交换机和队列进行绑定,都会自动向MQ申请上述声明,MQ中如果有相同配置的声明则自动返回成功,如果没有则新建一个
    本例为了方便,暂时只在生产者端进行声明,当第一次发送消息的时候会去MQ服务器申请声明信息
    首先我们来一个直连交换机的例子:

    package com.tp.demo.rabbitmq.producer.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * FileName:   DirectRabbitConfig
     * Author:     TP
     * Date:       12/13/20 12:01 PM
     * Description: 直连交换机Config
     */
    @Configuration
    public class DirectRabbitConfig {
    
        /**
         * 定义队列,名字为testDirectQueue
         */
        @Bean
        public Queue testDirectQueue() {
            // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
            // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
            // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
            // return new Queue("testDirectQueue", true, true, false);
    
            //一般设置一下队列的持久化就好,其余两个就是默认false
            return new Queue("testDirectQueue", true);
        }
    
        /**
         * 定义Direct类型交换机,名字为testDirectExchange
         */
        @Bean
        DirectExchange testDirectExchange() {
            return new DirectExchange("testDirectExchange", true, false);
        }
    
        /**
         * 将队列和交换机绑定,并设置路由键:testDirectRouting
         */
        @Bean
        Binding bindingDirect() {
            return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with("testDirectRouting");
        }
    }
    
    4.发送消息
    package com.tp.demo.rabbitmq.producer.controller.simple;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * FileName:   SendDirectMessageController
     * Author:     TP
     * Date:       12/13/20 12:11 PM
     * Description:直连交换机消息发送Controller
     */
    @RestController
    public class SendDirectMessageController {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @GetMapping("/sendDirectMessage")
        public String sendDirectMessage() {
            String message = "你好,我是直连交换机过来的一条消息";
            //将消息携带绑定键值:testDirectRouting 发送到交换机testDirectExchange
            rabbitTemplate.convertAndSend("testDirectExchange", "testDirectRouting", message);
            return "ok";
        }
    }
    

    好了,发送方就是这么简单,我们可以往MQ发送一条消息了

    观察管理后台,发现队列中已经有消息了:

    消费者集成

    1.创建项目

    新建一个Springboot项目:demo-rabbitmq-consumer

    项目的pom.xml文件中引入依赖:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.4.1</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.tp</groupId>
        <artifactId>demo-rabbitmq-consumer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>demo-rabbitmq-consumer</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <!--web-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!--springboot整合rabbitMQ只需引入amqp起步依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <!--lombok-->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <!--demo-rabbitmq-common-->
            <dependency>
                <groupId>com.tp</groupId>
                <artifactId>demo-rabbitmq-common</artifactId>
                <version>1.0.0-SNAPSHOT</version>
            </dependency>
            <!--test-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    application.yml文件中做如下配置:

    server:
      port: 7268
    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest
        virtual-host: TP-HOST #虚拟主机,可以不设置使用server默认host
    

    注意:生产者和消费者的虚拟主机要对应

    2.配置序列化策略

    因为生产者我们用了Jackson2JsonMessageConverter进行序列化,所以消费者端我们也使用Jackson2JsonMessageConverter进行反序列化

    新建一个配置类RabbitMQConfig如下:

    package com.tp.demo.rabbitmq.consumer.config;
    
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * FileName:   RabbitMQConfig
     * Author:     TP
     * Date:       12/14/20 9:03 AM
     * Description:消息消费者配置
     */
    @Configuration
    public class RabbitMQConfig {
    
        /**
         * 消息消费者配置JSON反序列化使用Jackson2JsonMessageConverter,与消息生产者保持一致
         */
        @Bean
        public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            return factory;
        }
    }
    
    3.配置监听器

    消费者端是通过监听器监听消息的,我们配置一个监听器用于接收上面的队列消息:

    package com.tp.demo.rabbitmq.consumer.listener.simple;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * FileName:   DirectListener
     * Author:     TP
     * Date:       12/13/20 12:26 PM
     * Description:直连交换机消费者监听器
     * 直连交换机默认轮询所有消费者
     * 如果我们定义了多个消费者监听了同一个队列,会以轮询的方式消费,且不存在重复消费
     */
    @Component
    public class DirectListener {
    
        @RabbitListener(queues = "testDirectQueue")
        public void onMessage(String message) {
            System.out.println("DirectListener收到消息:" + message);
        }
    }
    

    扩展

    • Spring对amqp的支持很灵活,在消费者端我们可以使用org.springframework.amqp.core.Message对象统一接收消息,也可以使用你喜欢的任意类型进行接收,但要保证发送时候和接收时候的对象类型要保持一致。(例如StringMapJavaBean都可以)
    • 注解@RabbitListener可以定义在方法上,也可以定义在类上,用以声明一个消息监听器。
      -- 如果定义在类上,需要配合@RabbitHandler标注在对应的方法上,指明具体使用哪个方法做监听
      -- 如果定义在方法上,则可以省略@RabbitHandler
    • 如果我们定义了多个相同配置的消息监听器,消费者会轮询消费,且不会重复消费

    启动消费者项目,在控制台会得到如下输出:

    Connected to the target VM, address: '127.0.0.1:52798', transport: 'socket'
    
      .   ____          _            __ _ _
     /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
    ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
     \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |_\__, | / / / /
     =========|_|==============|___/=/_/_/_/
     :: Spring Boot ::                (v2.4.1)
    
    2020-12-14 15:29:50.092  INFO 98654 --- [           main] .t.d.r.c.DemoRabbitmqConsumerApplication : Starting DemoRabbitmqConsumerApplication using Java 1.8.0_191 on tianpengdeMacBook-Pro.local with PID 98654 (/Users/tianpeng/workspace/tp/my-boot-rabbitmq/demo-rabbitmq-consumer/target/classes started by tianpeng in /Users/tianpeng/workspace/tp/my-boot-rabbitmq)
    2020-12-14 15:29:50.094  INFO 98654 --- [           main] .t.d.r.c.DemoRabbitmqConsumerApplication : No active profile set, falling back to default profiles: default
    2020-12-14 15:29:50.805  INFO 98654 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 7268 (http)
    2020-12-14 15:29:50.812  INFO 98654 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
    2020-12-14 15:29:50.812  INFO 98654 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.41]
    2020-12-14 15:29:50.861  INFO 98654 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
    2020-12-14 15:29:50.861  INFO 98654 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 735 ms
    2020-12-14 15:29:51.184  INFO 98654 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
    2020-12-14 15:29:51.396  INFO 98654 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 7268 (http) with context path ''
    2020-12-14 15:29:51.398  INFO 98654 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [127.0.0.1:5672]
    2020-12-14 15:29:51.425  INFO 98654 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#37c36608:0/SimpleConnection@7e62cfa3 [delegate=amqp://guest@127.0.0.1:5672/TP-HOST, localPort= 52811]
    DirectListener收到消息:你好,我是直连交换机过来的一条消息
    2020-12-14 15:29:51.555  INFO 98654 --- [           main] .t.d.r.c.DemoRabbitmqConsumerApplication : Started DemoRabbitmqConsumerApplication in 1.777 seconds (JVM running for 2.34)
    

    由此看到,消息被正确消费了,又由于我们采用的是RabbitMQ的默认消息确认机制:自动确认,所以此条消息会被RabbitMQ从队列中移除:

    上述示例是直连交换机的演示,关于扇形交换机和主题交换机,这里分别给出消息生产者和消费者的代码,就不进行演示和后台截图了,相信大家自己将下面代码copy到自己项目里,操作一下就能实现

    生产者端扇形交换机声明:

    package com.tp.demo.rabbitmq.producer.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * FileName:   FanoutRabbitConfig
     * Author:     TP
     * Date:       12/13/20 3:58 PM
     * Description:
     */
    @Configuration
    public class FanoutRabbitConfig {
    
        /**
         * 创建三个队列 :fanout.A   fanout.B  fanout.C
         * 将三个队列都绑定在交换机fanoutExchange上
         * 因为是扇型交换机,路由键无需配置,配置也不起作用
         */
        @Bean
        public Queue queueA() {
            return new Queue("fanout.A");
        }
    
        @Bean
        public Queue queueB() {
            return new Queue("fanout.B");
        }
    
        @Bean
        public Queue queueC() {
            return new Queue("fanout.C");
        }
    
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }
    
        @Bean
        Binding bindingExchangeA() {
            return BindingBuilder.bind(queueA()).to(fanoutExchange());
        }
    
        @Bean
        Binding bindingExchangeB() {
            return BindingBuilder.bind(queueB()).to(fanoutExchange());
        }
    
        @Bean
        Binding bindingExchangeC() {
            return BindingBuilder.bind(queueC()).to(fanoutExchange());
        }
    }
    

    生产者端主题交换机声明:

    package com.tp.demo.rabbitmq.producer.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * FileName:   TopicRabbitConfig
     * Author:     TP
     * Date:       12/13/20 12:52 PM
     * Description:主题交换机Config
     */
    @Configuration
    public class TopicRabbitConfig {
        // 绑定键
        private final static String man = "topic.man";
        private final static String woman = "topic.woman";
    
        @Bean
        public Queue firstQueue() {
            return new Queue(TopicRabbitConfig.man);
        }
    
        @Bean
        public Queue secondQueue() {
            return new Queue(TopicRabbitConfig.woman);
        }
    
        @Bean
        TopicExchange exchange() {
            return new TopicExchange("topicExchange");
        }
    
    
        // 将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
        // 这样只要是消息携带的路由键是topic.man,才会分发到该队列
        @Bean
        Binding bindingExchangeMessage() {
            return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
        }
    
        // 将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
        // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
        @Bean
        Binding bindingExchangeMessage2() {
            return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
        }
    }
    

    生产者端发送消息

    package com.tp.demo.rabbitmq.producer.controller.simple;
    
    import com.fasterxml.jackson.core.JsonProcessingException;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * FileName:   SendFanoutMessageController
     * Author:     TP
     * Date:       12/13/20 4:02 PM
     * Description:扇形交换机消息发送Controller
     */
    @RestController
    public class SendFanoutMessageController {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("/sendFanoutMessage")
        public String sendFanoutMessage() throws JsonProcessingException {
            String message = "message: testFanoutMessage...";
            for (int i = 0; i < 200; i++) {
                rabbitTemplate.convertAndSend("fanoutExchange", null, message);
            }
            return "ok";
        }
    }
    
    package com.tp.demo.rabbitmq.producer.controller.simple;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * FileName:   SendTopicMessageController
     * Author:     TP
     * Date:       12/13/20 12:56 PM
     * Description:主题交换机消息发送Controller
     */
    @RestController
    public class SendTopicMessageController {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @GetMapping("/sendTopicMessage1")
        public String sendTopicMessage1() {
            String message = "message: MAN ";
            rabbitTemplate.convertAndSend("topicExchange", "topic.man", message);
            return "ok";
        }
    
        @GetMapping("/sendTopicMessage2")
        public String sendTopicMessage2() {
            String message = "message: woman will all in";
            rabbitTemplate.convertAndSend("topicExchange", "topic.woman", message);
            return "ok";
        }
    }
    

    消费者端消息监听器:

    package com.tp.demo.rabbitmq.consumer.listener.simple;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * FileName:   FanoutListenerA
     * Author:     TP
     * Date:       12/13/20 4:07 PM
     * Description:
     */
    @Component
    public class FanoutListenerA {
    
        @RabbitListener(queues = "fanout.A")
        public void onMessage(String message) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("FanoutListenerA收到消息:" + message);
        }
    }
    
    package com.tp.demo.rabbitmq.consumer.listener.simple;
    
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * FileName:   FanoutListenerC
     * Author:     TP
     * Date:       12/13/20 4:08 PM
     * Description:
     */
    @Component
    public class FanoutListenerB {
    
        @RabbitListener(queues = "fanout.B")
        public void onMessage(String message) {
            System.out.println("FanoutListenerB收到消息:" + message);
        }
    }
    
    package com.tp.demo.rabbitmq.consumer.listener.simple;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * FileName:   FanoutListenerC
     * Author:     TP
     * Date:       12/13/20 4:08 PM
     * Description:
     */
    @Component
    public class FanoutListenerC {
    
        @RabbitListener(queues = "fanout.C")
        public void onMessage(String message) {
            System.out.println("FanoutListenerC收到消息:" + message);
        }
    }
    
    package com.tp.demo.rabbitmq.consumer.listener.simple;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * FileName:   TopicListener1
     * Author:     TP
     * Date:       12/13/20 2:32 PM
     * Description:
     */
    @Component
    public class TopicListener1 {
    
        @RabbitListener(queues = "topic.man")
        public void onMessage(String message) {
            System.out.println("TopicListener1收到消息:" + message);
        }
    }
    
    package com.tp.demo.rabbitmq.consumer.listener.simple;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * FileName:   TopicListener
     * Author:     TP
     * Date:       12/13/20 2:32 PM
     * Description:
     */
    @Component
    public class TopicListener2 {
    
        @RabbitListener(queues = "topic.woman")
        public void process(String message) {
            System.out.println("TopicListener2收到消息:" + message);
        }
    }
    

    三.消息可靠投递:消息确认机制

    RabbitMQ中生产者端和消费者端都有消息确认机制,已尽量避免消息的丢失

    消息生产者投递确认

    ====================================================
    生产者发送确认机制共有2种回调:ConfirmCallback、ReturnCallback
    两种回调函数都是在什么情况会触发呢?
    总体来说,推送消息存在以下四种情况:
    ①消息推送到server,但是在server里找不到交换机
    ②消息推送到server,找到交换机了,但是没找到队列
    ③消息推送到sever,交换机和队列啥都没找到
    ④消息推送成功
    ====================================================
    ①这种情况触发的是 ConfirmCallback回调函数
    ②这种情况触发的是 ConfirmCallback和ReturnCallback两个回调函数
    ③这种情况触发的是 ConfirmCallback回调函数
    ④这种情况触发的是 ConfirmCallback回调函数
    ====================================================
    我们可以在回调函数根据需求做对应的扩展或者业务数据处理

    为了支持生产者消息投递确认,我们需要作如下内容:

    • application.yml中增加如下配置:
    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest
        virtual-host: TP-HOST #虚拟主机,可以不设置使用server默认host
        publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
        publisher-returns: true #确认消息已发送到队列(Queue)
    
    • 对之前生产者端的RabbitMQConfig.java进行改造如下:
    package com.tp.demo.rabbitmq.producer.config;
    
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * FileName:   RabbitMQConfig
     * Author:     TP
     * Date:       12/13/20 4:31 PM
     * Description:开启消息发送消息确认
     */
    @Configuration
    public class RabbitMQConfig {
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate();
            rabbitTemplate.setConnectionFactory(connectionFactory);
            // 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
            rabbitTemplate.setMandatory(true);
    
            // 设置发送方确认
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                System.out.println("ConfirmCallback >>> " + "相关数据:" + correlationData);
                System.out.println("ConfirmCallback >>> " + "确认情况:" + ack);
                System.out.println("ConfirmCallback >>> " + "原因:" + cause);
            });
    
            rabbitTemplate.setReturnsCallback(e -> {
                System.out.println("ReturnCallback >>> " + "消息:" + e.getMessage());
                System.out.println("ReturnCallback >>> " + "回应码:" + e.getReplyCode());
                System.out.println("ReturnCallback >>> " + "回应信息:" + e.getReplyText());
                System.out.println("ReturnCallback >>> " + "交换机:" + e.getExchange());
                System.out.println("ReturnCallback >>> " + "路由键:" + e.getRoutingKey());
            });
    
            // 设置序列化策略
            rabbitTemplate.setMessageConverter(jsonMessageConverter());
            return rabbitTemplate;
        }
    
        @Bean
        public MessageConverter jsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    }
    

    为了方便,我们封装了一个消息发送工具:

    package com.tp.demo.rabbitmq.producer.sender;
    
    import com.tp.demo.rabbitmq.producer.utils.RandomUtils;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * FileName:   RabbitSender
     * Author:     TP
     * Date:       12/14/20 9:16 AM
     * Description:封装一个RabbitMQ发送消息对象,方便使用
     * 当然,你也可以直接使用RabbitTemplate
     */
    @Component
    public class RabbitSender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        //发送消息方法调用: 构建Message消息
        public void convertAndSend(String exchange, String routingKey, Object message) {
            // 时间戳+6位随机字符保证全局唯一
            // 用于ack保证唯一一条消息(在做补偿策略的时候,必须保证这是全局唯一的消息)
            // 在消费方可以通过message.getMessageProperties().getHeaders().get("spring_returned_message_correlation")获取到该CorrelationData
            CorrelationData correlationData = new CorrelationData(RandomUtils.UUID());
            // 发送消息
            rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
        }
    }
    

    这个方法中,每次发送消息的时候会生成一个全局唯一标识放入CorrelationDataCorrelationData我们也可以封装业务ID信息(把send方法增加个参数,在发送消息的时候指定),这样我们就可以在消息发送失败的时候,根据业务ID进行自己的补偿机制

    我们发送一个不存在的交换机进行测试:

    package com.tp.demo.rabbitmq.producer.controller.simple;
    
    import com.tp.demo.rabbitmq.producer.sender.RabbitSender;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * FileName:   TestSendFailCallbackController
     * Author:     TP
     * Date:       12/14/20 10:22 AM
     * Description:
     */
    @RestController
    public class TestSendFailCallbackController {
    
        @Autowired
        RabbitSender rabbitSender;
    
        @GetMapping("/sendMessageFail")
        public String sendDirectMessage() {
            String message = "Hello,This will fail...";
            //将消息携带绑定键值:testDirectRouting 发送到交换机testDirectExchange
            rabbitSender.convertAndSend("none_exchange", "testDirectRouting", message);
            return "ok";
        }
    }
    

    查看控制台输出:

    Connected to the target VM, address: '127.0.0.1:53731', transport: 'socket'
    
      .   ____          _            __ _ _
     /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
    ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
     \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |_\__, | / / / /
     =========|_|==============|___/=/_/_/_/
     :: Spring Boot ::                (v2.4.1)
    
    2020-12-14 16:01:15.954  INFO 5063 --- [           main] .t.d.r.p.DemoRabbitMqProducerApplication : Starting DemoRabbitMqProducerApplication using Java 1.8.0_191 on tianpengdeMacBook-Pro.local with PID 5063 (/Users/tianpeng/workspace/tp/my-boot-rabbitmq/demo-rabbitmq-producer/target/classes started by tianpeng in /Users/tianpeng/workspace/tp/my-boot-rabbitmq)
    2020-12-14 16:01:15.957  INFO 5063 --- [           main] .t.d.r.p.DemoRabbitMqProducerApplication : No active profile set, falling back to default profiles: default
    2020-12-14 16:01:16.791  INFO 5063 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 7269 (http)
    2020-12-14 16:01:16.800  INFO 5063 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
    2020-12-14 16:01:16.800  INFO 5063 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.41]
    2020-12-14 16:01:16.854  INFO 5063 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
    2020-12-14 16:01:16.855  INFO 5063 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 862 ms
    2020-12-14 16:01:17.204  INFO 5063 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
    2020-12-14 16:01:17.362  INFO 5063 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 7269 (http) with context path ''
    2020-12-14 16:01:17.369  INFO 5063 --- [           main] .t.d.r.p.DemoRabbitMqProducerApplication : Started DemoRabbitMqProducerApplication in 1.72 seconds (JVM running for 2.142)
    2020-12-14 16:02:26.993  INFO 5063 --- [nio-7269-exec-2] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
    2020-12-14 16:02:26.993  INFO 5063 --- [nio-7269-exec-2] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
    2020-12-14 16:02:26.994  INFO 5063 --- [nio-7269-exec-2] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
    2020-12-14 16:02:27.074  INFO 5063 --- [nio-7269-exec-2] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [127.0.0.1:5672]
    2020-12-14 16:02:27.100  INFO 5063 --- [nio-7269-exec-2] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#7bab5898:0/SimpleConnection@1759112d [delegate=amqp://guest@127.0.0.1:5672/TP-HOST, localPort= 53771]
    2020-12-14 16:02:27.139 ERROR 5063 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'none_exchange' in vhost 'TP-HOST', class-id=60, method-id=40)
    ConfirmCallback >>> 相关数据:CorrelationData [id=20201214160227064961897]
    ConfirmCallback >>> 确认情况:false
    ConfirmCallback >>> 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'none_exchange' in vhost 'TP-HOST', class-id=60, method-id=40)
    
    

    可以看到CorrelationData中取到了我们生成的UUID,我们可以根据这个UUID做自己的要业务补偿

    顺便贴一下自定义的RandomUtils:

    package com.tp.demo.rabbitmq.producer.utils;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.Random;
    
    /**
     * FileName:   RandomUtils
     * Author:     TP
     * Date:       12/14/20 9:36 AM
     * Description:
     */
    public class RandomUtils {
    
        public synchronized static String UUID() {
            DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS");
            String current = LocalDateTime.now().format(dtf);
            return current + getRandomString(6);
        }
    
        public synchronized static String UUID(String prefix) {
            DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS");
            String current = LocalDateTime.now().format(dtf);
            return prefix + current + getRandomString(6);
        }
    
        private static String getRandomString(int length) {
            StringBuffer sb = new StringBuffer();
            if (length > 0) {
                for (int i = 0; i < length; i++) {
                    sb.append(new Random().nextInt(10));
                }
                return sb.toString();
            }
            return null;
        }
    }
    

    到此,消息生产者确认机制就算完成了

    消息消费者投递确认

    重头戏来了,消息消费者端RabbitMQ默认是自动确认的,只要消息发送到了消费者,则认为消息已被消费,消息在RabbitMQ服务器会被移除,这显然在生产环境上是极度危险的,所以我们都会设置消息消费者端的消费确认为手动,具体步骤如下:

    • 改造消息消费者端的RabbitMQConfig.java:
    package com.tp.demo.rabbitmq.consumer.config;
    
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * FileName:   RabbitMQConfig
     * Author:     TP
     * Date:       12/14/20 9:03 AM
     * Description:消息消费者配置
     */
    @Configuration
    public class RabbitMQConfig {
    
        /**
         * 消息消费者配置JSON反序列化使用Jackson2JsonMessageConverter,与消息生产者保持一致
         */
        @Bean
        public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            return factory;
        }
    
    }
    

    注意:对于没有自己声明上述SimpleRabbitListenerContainerFactory的同学,可以在yml中直接配置:

    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest
        virtual-host: TP-HOST #虚拟主机,可以不设置使用server默认host
        listener:
          simple:
            acknowledge-mode: manual
    

    而我们自己声明了SimpleRabbitListenerContainerFactory,这时如果在yml中增加上述配置是无效的

    • 监听器内进行手动消息确认
    package com.tp.demo.rabbitmq.consumer.listener.ack;
    
    import com.rabbitmq.client.Channel;
    import com.tp.demo.rabbitmq.common.entity.User;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    /**
     * FileName:   DirectAckListener
     * Author:     TP
     * Date:       12/13/20 7:02 PM
     * Description:
     * 注解@RabbitListener可以定义在方法上也可以定义在类上
     * -- 如果定义在类上,需要配合@RabbitHandler标注在方法上,指明具体使用哪个方法做监听
     * -- 如果定义在方法上,则可以省略@RabbitHandler
     */
    @Component
    public class DirectAckListener {
    
        @RabbitListener(queues = "testDirectAckQueue")
        public void process(Message message, Channel channel) throws IOException {
            System.out.println("接收到消息总体内容:" + message);
            System.out.println("实际消息内容:" + new String(message.getBody()));
    
            // TODO 业务逻辑
            // 1.获取message中的body,解析消息内容
            // 2.其他业务逻辑......
    
            // 回执情形1:消费成功
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    
            // 回执情形2:消费成功消费处理失败,重新放入队列(一定要慎用,防止造成无限返回队列->消费者->返回队列.....造成消息积压)
            // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    
            // 回执情形3:消费处理失败,拒绝接收(可以指定是否重新放入队列,如果消息不重新放入队列,RabbitMQ服务端会将消息移除)
            // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        }
    
        /**
         * 如果你很懒,不想从message中获取body再自己反序列化为想要的实体bean怎么办?
         * Spring对rabbitMQ的集成允许我们直接使用bean接收,如下:直接可以用形参封装
         * 扩展:我们可以在生产者端发送任意类型的消息,并且在消费者端直接用形参封装,但你必须保证用的是同一种数据类型
         * 注意:如果想测试这种快捷方式,请将注解注释放开,并将上面的process全部注释掉
         */
        // @RabbitListener(queues = "testDirectAckQueue")
        public void process(User user, Message message, Channel channel) throws IOException {
            System.out.println(user);
    
            // TODO 业务逻辑
            // ......
    
            // 回执情形1:进行消息回执
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    
            // 回执情形2:消费成功消费处理失败,重新放入队列(一定要慎用,防止造成无限返回队列->消费者->返回队列.....造成消息积压)
            // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    
            // 回执情形3:消费处理失败,拒绝接收(可以指定是否重新放入队列,如果消息不重新放入队列,RabbitMQ服务端会将消息移除)
            // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    
        }
    }
    

    当我们设置了消息手动确认,如果消息到了消费者而消费者一直不确认,在RabbitMQ中这条消息将会一直处于unacked待确认状态,直到消费者与RabbitMQ断开连接,这条消息又会重新变成ready状态,消费者重启后会重新消费消息,对于消费者手动确认,其回执方式有3种,详见上述代码的注释,这里就不再说明了

    测试:
    发送一条需要手动确认的消息如下:

    package com.tp.demo.rabbitmq.producer.controller.ack;
    
    import com.tp.demo.rabbitmq.common.entity.User;
    import com.tp.demo.rabbitmq.producer.sender.RabbitSender;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.time.LocalDateTime;
    
    /**
     * FileName:   SendDirectMessageController
     * Author:     TP
     * Date:       12/13/20 12:11 PM
     * Description:直连交换机消息发送Controller
     */
    @RestController
    public class SendDirectAckMessageController {
    
        @Autowired
        RabbitSender rabbitSender;
    
        @GetMapping("/sendDirectMessageAck")
        public String sendDirectMessage() {
            User user = new User();
            user.setId(1);
            user.setUserName("TP");
            user.setPassWord("pwd123456");
            user.setAge(18);
            user.setCreateTime(LocalDateTime.now());
            rabbitSender.convertAndSend("testDirectAckExchange", "testDirectAckRouting", user);
            return "ok";
        }
    
        @GetMapping("/sendDirectMessageAck2")
        public String sendDirectMessage2() {
            for (int i = 20; i <= 30; i++) {
                rabbitSender.convertAndSend("testDirectAckExchange", "testDirectAckRouting", "我是一条需要确认的消息");
            }
            return "ok";
        }
    }
    

    我们在消费者端加上debug,让消息先不走回执,观察效果:

    放开debu后:
    控制台输出:

    接收到消息总体内容:(Body:'{"id":1,"userName":"TP","passWord":"pwd123456","age":18,"createTime":[2020,12,14,16,35,16,705000000]}' MessageProperties [headers={spring_listener_return_correlation=97fb83de-fd43-4529-9d24-abef559a9fcb, spring_returned_message_correlation=20201214163516705225241, __TypeId__=com.tp.demo.rabbitmq.common.entity.User}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=testDirectAckExchange, receivedRoutingKey=testDirectAckRouting, deliveryTag=2, consumerTag=amq.ctag-F5bUnBjCf-umNuRMKLLUAg, consumerQueue=testDirectAckQueue])
    实际消息内容:{"id":1,"userName":"TP","passWord":"pwd123456","age":18,"createTime":[2020,12,14,16,35,16,705000000]}
    

    其他2种回执,请自行测试

    本示例发送实体类消息用的User类如下:

    package com.tp.demo.rabbitmq.common.entity;
    
    import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
    import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
    import lombok.Data;
    
    import java.time.LocalDateTime;
    
    /**
     * FileName:   User
     * Author:     TP
     * Date:       12/14/20 11:33 AM
     * Description:
     */
    @Data
    public class User {
    
        private Integer id;
    
        private String userName;
    
        private String passWord;
    
        private Integer age;
    
        @JsonDeserialize(using = LocalDateTimeDeserializer.class)
        @JsonSerialize(using = LocalDateTimeSerializer.class)
        private LocalDateTime createTime;
    }
    

    至此,Springboot整合RabbitMQ完毕

    问题

    鄙人在测试中遇到如下问题:消费者端已经开启了消费手动确认,如果我们发送一条消息,消息内容为一个JavaBean,如果在消费者端监听器进行反序列化消息内容到Message参数时失败抛出异常了,则RabbitMQ会直接将消息移除,而不会将这条消息标记为unacked,这会导致消息丢失
    为什么会这样我也不知道,如果有高人看到此篇文章并对这种情形有理解,请留言,不胜感激!

    当然了,我们可以转JSON发送String消息,然后自己接收后再解析,也可以生产者和消费者引用maven私服内的同一个jar包,同一个实体类不会出现反序列化失败的问题,不理解的是如果Message就是封装失败了,为什么会将这条消息移除呢,而不是标记为未确认呢???

    后续

    针对上面的问题,通过艰难的Spring-amqp源码debug,定位了问题所在:
    消费端监听器如果封装参数失败会抛出:org.springframework.amqp.rabbit.support.ListenerExecutionFailedException,这个异常被认为为fatal异常,也就是致命异常

    图中可以看出,这种情形下,Spring-amqp在进行nack的时候,是否requeue最终为false,所以不会重新放入队列中

    那么针对这种情形,我们怎么解决呢?
    我们可以使用死信队列,这种情况消息会被认为是死信并发送到死信队列里(如果已经配置)

    并且强烈推荐生产环境为自己的所有业务队列配置上死信队列,能保证消息的可靠性,通过死信队列进行业务补偿。

    相关文章

      网友评论

          本文标题:RabbtiMQ系列-5.SpringBoot集成RabbitM

          本文链接:https://www.haomeiwen.com/subject/wqpvgktx.html