美文网首页JavaJava 程序员
SpringBoot与RabbitMQ整合,发送和接收消息实战(

SpringBoot与RabbitMQ整合,发送和接收消息实战(

作者: 程序花生 | 来源:发表于2022-06-15 15:38 被阅读0次

    1、RabbitMQ 消息中间件

    大榜:小汪你怎么了,我看你脸色不太好呀。

    小汪:最近团队正在使用消息队列实现业务模块之间的解耦,昨天熬了一晚上,也是云里雾里。哎,都快熬成熊猫眼了。

    大榜:哦哦。消息队列的使用,可以对多个模块之间进行解耦,并实现异步通信,降低系统整体的响应时间。你们团队的想法挺好的,那采用的是哪种消息队列呢?

    小汪:昨天上网搜索了半天,有ActiveMQ、RabbitMQ、RocketMQ、Kafka这四种,它们之间的区别,我当时还找了一张图,我发过来给你看看,帮我参谋下

    大榜:上面这四种消息队列的对比,我之前也看过,你们公司100多号人,应该属于中小型公司,使用RabbitMQ作为消息队列应该是很好的选择,而且RabbitMQ社区很活跃,我上次使用RabbitMQ作为消息中间件,发生了消息丢失的现象,最后依靠RabbitMQ社区的力量解决的。

    小汪:可以啊,榜哥,教教我。我现在脑子里面只知道有几个概念:生产者、消费者、消息模型啥的

    2、RabbitMQ核心概念(类比实际生活场景)

    大榜:嗯嗯,你说的很对,RabbitMQ的核心概念就是生产者、消费者、消息模型,而消息模型是由交换机、路由、队列组成。

    小汪:你这么一说,我想起来了,昨晚的书里面介绍了交换机、路由的概念,但我还是觉得太抽象了,不太好理解。

    大榜:确实不好理解,我刚开始学习时,也是不理解,后来我想到了一个实际生活中寄快递、取快递的场景,正好可以解释交换机、路由、队列。我们就举个栗子,假设生活在武汉的小芹,想寄一个爱心包裹,发送给远在深圳的你(小汪)。

    小汪:小芹给我寄爱心包裹,说得我心里暖暖的。哎,但实际情况是我寄包裹给小芹,呜呜呜。

    大榜:小汪,面包会有的,牛奶也会有的,只要我们向阳成长。扯远了,我们假设小芹寄爱心包裹给你(小汪)。具体流程应该如下:小芹将包裹给快递员,然后快递员通过某种运输方式将包裹从武汉运到深圳,小汪开开心心地从深圳快递点取走爱心包裹。

    类比下,可以得到如下的关系:

    • 小芹:生产者
    • 爱心包裹:消息
    • 快递员:交换机
    • 某种运输方式:路由
    • 武汉到深圳:队列名称
    • 小汪:消费者

    总的来说,小芹将包裹给快递员,然后快递员通过某种运输方式将包裹从武汉运到深圳,小汪开开心心地从深圳快递点取走爱心包裹。我们就可以看成:生产者产生消息,然后消息到达交换机,再通过路由,将消息发送至指定的队列中,最后消费者监听该队列中的消息,进行消费处理。


    小汪:你这一说,我好像有点懂了。寄快递、取快递这种实际生活场景,可以看成如下的消息传输过程,生产者将消息交给交换机,然后交换机通过路由,将消息发送武汉到深圳的队列,消费者取出该条消息进行处理。RabbitMQ中生产者、消息、交换机、路由、队列、消费者之间的关系,可以画成下面这张图

    大榜:嗯嗯,你这理解力可以啊!

    小汪:过奖过奖,这都是昨晚熬成熊猫眼的成果,而且刚刚和你讨论后,感觉进一步理解了RabbitMQ的概念。我现在懂了RabbitMQ的生产者、消息模型、消费者的大致原理,但团队要搭建一个RabbitMQ与SpringBoot整合的一个框架,昨晚搭建了半宿,也没干完,我这一时半天也搭不出来了,下午开周例会,等待被老大捶了,呜呜呜。

    3、RabbitMQ发送和接收消息实战

    大榜:RabbitMQ与SpringBoot整合,不算很难,因为SpringBoot是集成了Spring的,比Spring框架更灵活。

    3.1、pom文件中引入RabbitMQ的起步依赖

    我们只需要在pom文件中引入RabbitMQ的起步依赖

      <!-- RabbitMQ的起步依赖,和spring-boot整合成一个Jar包了 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>1.5.6.RELEASE</version>
    </dependency>
    

    3.2、配置RabbitMQ的host、端口等信息

    在application.properties配置文件中,增加host主机地址、port端口号、用户名、密码

    # RabbitMQ配置
    spring.rabbitmq.virtual-host=/
    # 配置host、端口号、用户名、密码
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    

    接着,在配置文件中声明RabbitMQ的队列、交换机、路由的信息:

    # 设置交换机、路由、队列,使用directExchange消息模型
    # 自定义变量,表示本地开发环境
    mq.env=local
    
    # 设置direct消息模型中队列、交换机、路由
    mq.basic.info.queue.name=${mq.env}.middleware.mq.basic.info.queue.demo1
    mq.basic.info.exchange.name=${mq.env}.middleware.mq.basic.info.exchange.demo1
    mq.basic.info.routing.key.name=${mq.env}.middleware.mq.basic.info.routing.key.demo1
    

    小汪:榜哥,3.1中在pom文件引入RabbitMQ的依赖,3.2中在配置文件里面设置了RabbitMQ服务器的地址为localhost,用户名、密码都为guest。这个我懂,然后你设置了队列、交换机、路由的名称,再往下应该要把队列、交换机、路由注入到Spring容器中进行管理,是吧?

    大榜:是的,思维转得很快嘛。RabbitsMQ在实际项目的应用过程中,如果配置和使用不当,则会出现各种令人头疼的问题,而且面试官经常考察的问题:如何防止消息丢失?如何保证消费不被重复消费?我当时就出现了消息丢失,熬出了好几个熊猫眼才解决消息丢失的问题。

    小汪:榜哥,别卖关子了,你要是和盘托出,小弟请你喝冰可乐!

    3.3、自定义注入Bean组件

    大榜:我当时查阅和学习了RabbitMQ的相关教程和视频,为了保证消息的高可用和确认消费,RabbitMQ官方给我们提供了三条准则。也就是如果我们想要保证消息的高可用和确认消费,需要遵守这3条准则:生产者的发送确认机制;创建队列、交换机消息时设置持久化模式;消费者的确认消费ACK机制。

    小汪:榜哥牛逼啊,堪称RabbitMQ实战经验的总结!

    大榜:别给我带高帽子,我只是比你早学习RabbitMQ半年。上面3条准则对应的代码,我之前配置好了,可以直接拿来用。项目链接放在码云仓库: gitee.com/qinstudy/sp…

    3.3.1、生产者的发送确认机制

    RabbitMQ要求生产者在发送完消息之后进行”发送确认“,当生产者确认成功时即代表消息已经完成发送出去了!其中的代码如下:

     /**
         * 构建RabbitMQ发送消息的操作组件实例
         * 生产者的发送确认机制
         */
        @Bean(name = "rabbitMQTemplate")
        public RabbitTemplate rabbitTemplate() {
            // 生产者确认消息是否发送过去了
            connectionFactory.setPublisherConfirms(true);
    
            // 生产者发送消息后,返回反馈消息
            connectionFactory.setPublisherReturns(true);
    
            // 构建rabbitTemlate操作模板
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMandatory(true);
    
            // 生产者发送消息后,如果发送成功,则打印“消息发送成功”的日志信息
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
                }
            });
    
            // 生产者发送消息后,若发送失败,则输出“消息发送失败”的日志信息
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
                }
            });
    
            return rabbitTemplate;
        }
    
    3.3.2、创建队列、交换机、消息,设置持久化模式

    第二条准则,就是如何保证RabbitMQ队列中的消息”不丢失“,RabbitMQ官方建议开发者在创建队列、交换机设置持久化参数为true,也就是durable参数的值为true。同时,官方强烈建议开发者在创建消息时设置消息的持久化模式为”持久化“,这样就可以保证RabbitMQ服务器崩溃并执行重启操作后,队列、交换机仍然存在,而且该消息不会丢失。

    创建队列、交换机设置持久化参数为true,也就是durable参数的值为true,代码如下:

     /**
         * 创建direct消息模型:队列、交换机、路由
         */
        // 1.1、创建队列
        @Bean(name = "basicQueue")
        public Queue basicQueue() {
            return new Queue(env.getProperty("mq.basic.info.queue.name"), true);
        }
    
        // 1.2、创建交换机
        @Bean
        public DirectExchange basicExchange() {
            return new DirectExchange(env.getProperty("mq.basic.info.exchange.name"), true, false);
        }
    
        // 1.3、创建绑定关系
        @Bean
        public Binding basicBinding() {
            return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(env.getProperty("mq.basic.info.routing.key.name"));
        }
    

    在创建消息时设置消息的持久化模式为”持久化“,代码如下:

    @Component
    @Slf4j
    public class BasicPublisher {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Autowired
        Environment env;
    
        // 发送字符串类型的消息
        public void sendMsg(String messageStr) {
            if (!Strings.isNullOrEmpty(messageStr)) {
                try {
                    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                    rabbitTemplate.setExchange(env.getProperty("mq.basic.info.exchange.name"));
                    rabbitTemplate.setRoutingKey(env.getProperty("mq.basic.info.routing.key.name"));
    
                    // 2创建队列、交换机、消息 设置持久化模式
                    // 设置消息的持久化模式
                    Message message = MessageBuilder.withBody(messageStr.getBytes("utf-8")).
                            setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
                    rabbitTemplate.convertAndSend(message);
                    log.info("基本消息模型-生产者-发送消息:{}", messageStr);
    
                } catch (UnsupportedEncodingException e) {
                    log.error("基本消息模型-生产者-发送消息发生异常:{}", messageStr, e.fillInStackTrace());
                }
            }
        }
    
    }
    
    3.3.3、消费者的确认消费ACK机制

    消费者的确认消费机制有三种:None、Auto、Manual

    None:不进行确认消息,也就是消费者发送任何反馈信息给MQ服务端;

    Auto:消费者自动确认消费。消费者处理该消息后,需要发送一个自动的ack反馈信息给MQ服务端,之后该消息从MQ的队列中移除掉。其底层的实现逻辑是由RabbitMQ内置的相关组件实现自动发送确认反馈信息。

    Manual:人为手动确认消费机制。消费者处理该消息后,需要手动地“以代码的形式”发送给一个ack反馈信息给MQ服务端。

    RabbitmqConfig#listenerContainer(),代码如下:

     /**
         * 设置单个消费者
         * 消费者的Ack确认机制为AUTO
         */
        @Bean(name = "singleListenerContainer")
        public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
            SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
            containerFactory.setConnectionFactory(connectionFactory);
            containerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
    
            // 设置消费者的个数
            containerFactory.setConcurrentConsumers(1);
            // 设置消费者的最大值
            containerFactory.setMaxConcurrentConsumers(1);
            // 设置消费者每次拉取的消息数量,即消费者一次拉取几条消息
            containerFactory.setPrefetchCount(1);
    
            // 设置确认消息模型为自动确认消费AUTO,目的是防止消息丢失和消息被重复消费
            containerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
            return containerFactory;
        }
    

    小汪:哦哦,我懂了。为了保证消息的高可用和确认消费,需要遵守这3条准则。3.3.1是生产者的发送确认机制;3.3.2中的代码是告诉我们创建队列、交换机消息时,要设置持久化模式;3.3.3是消费者的确认消费ACK机制。那RabbitMQ的发送、接收代码该如何实现呢?

    大榜:上面我们已经配置了RabbitMQ的队列、交换机、路由,而且为了保证消息的高可用和确认消费,我们遵守3条准则:生产者的发送确认机制;创建队列、交换机消息时设置持久化模式;消费者的确认消费ACK机制。我们解决了大难点,发送、接收代码实现就简单了。

    3.4、RabbitMQ发送、接收实战

    3.4.1、定义生产者
    @Component
    @Slf4j
    public class BasicPublisher {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Autowired
        Environment env;
    
        // 发送字符串类型的消息
        public void sendMsg(String messageStr) {
            if (!Strings.isNullOrEmpty(messageStr)) {
                try {
                    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                    rabbitTemplate.setExchange(env.getProperty("mq.basic.info.exchange.name"));
                    rabbitTemplate.setRoutingKey(env.getProperty("mq.basic.info.routing.key.name"));
    
                    // 2创建队列、交换机、消息 设置持久化模式
                    // 设置消息的持久化模式
                    Message message = MessageBuilder.withBody(messageStr.getBytes("utf-8")).
                            setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
                    rabbitTemplate.convertAndSend(message);
                    log.info("基本消息模型-生产者-发送消息:{}", messageStr);
    
                } catch (UnsupportedEncodingException e) {
                    log.error("基本消息模型-生产者-发送消息发生异常:{}", messageStr, e.fillInStackTrace());
                }
            }
        }
    
    }
    
    3.4.2、创建消费者
    @Component
    @Slf4j
    public class BasicConsumer {
    
        /**
         * 监听并消费队列中的消息
         */
        @RabbitListener(queues = "${mq.basic.info.queue.name}", containerFactory = "singleListenerContainer")
        public void consumerMsg(@Payload byte[] msg) {
            try {
                String messageStr = new String(msg, "utf-8");
                log.info("基本消息模型-消费者-监听并消费到的消息:{}", messageStr);
    
            } catch (UnsupportedEncodingException e) {
    
                log.error("基本消息模型-消费者-发生异常:", e.fillInStackTrace());
            }
        }
    
    }
    
    3.4.3、编写单元测试,将一串字符串发送到队列中,消费者监听并处理
    @Slf4j
    @RunWith(SpringJUnit4ClassRunner.class)
    @SpringBootTest
    public class RabbitMQTest {
    
        @Autowired
        private BasicPublisher basicPublisher;
    
        // 测试 基本消息模型,消息内容为 字符串
        @Test
        public void testBasicMessageModel() {
            String msgStr = "~~~这是一串字符串消息~~~~";
            basicPublisher.sendMsg(msgStr);
        }
    }
    
    3.4.4、单元测试的结果分析

    对编写的单元测试,结果如下图:

    从图中可以看到,生产者发送了一条消息:"~~这是一串字符串消息~~",放到了消息队列中;消费者监听并处理了该条消息,然后消费者打印了该消息内容。

    小汪:懂了,对于我这样的刚刚接触RabbitMQ的来说,都是干货啊。榜哥,我梳理下,看看理解的对不对。首先,我们从团队的业务需要实现解耦说起,引出了消息中间件;接着比较了4种消息队列的区别,根据公司的实际情况选择了RabbitMQ;然后将生产者、消息、交换机、路由、队列、消费者类比于寄快递、取快递这一实际场景(小芹给小汪的寄快递);最后,我们将RabbitMQ与流行的SpringBoot整合起来,遵守3条准则,保证RabbitMQ的高可用和确认消费,并实现了RabbitMQ的发送和接收消息实战。

    大榜:是的,看来你已经入门了RabbitMQ,RabbitMQ发送和接收消息实战,该项目链接放在码云仓库,你为团队引入RabbitMQ时可以直接拿来用,链接地址: gitee.com/qinstudy/sp…

    小汪:昨晚看书,RabbitMQ还有死信队列的概念,能解下惑吗?

    大榜:到饭点了,要不咱们先吃个午饭,边吃边聊.....

    作者:麻雀学习
    链接:https://juejin.cn/post/6958421370623492132
    来源:稀土掘金

    相关文章

      网友评论

        本文标题:SpringBoot与RabbitMQ整合,发送和接收消息实战(

        本文链接:https://www.haomeiwen.com/subject/kjexvrtx.html