美文网首页
springboot整合rabbitmq,支持消息确认机制

springboot整合rabbitmq,支持消息确认机制

作者: Quillagua | 来源:发表于2020-04-19 16:04 被阅读0次

    项目结构

    POM.XML

    <?xml version="1.0" encoding="UTF-8"?>
     <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
         <modelVersion>4.0.0</modelVersion>
     
         <groupId>com.example</groupId>
         <artifactId>rabbitmq</artifactId>
         <version>0.0.1-SNAPSHOT</version>
         <packaging>jar</packaging>
     
         <name>rabbitmq</name>
         <description>Spring Boot 整合RabbitMQ</description>
     
         <parent>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-parent</artifactId>
             <version>2.0.5.RELEASE</version>
             <relativePath/> <!-- lookup parent from repository -->
         </parent>
     
         <properties>
             <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
             <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
             <java.version>1.8</java.version>
         </properties>
     
         <dependencies>
             <dependency>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-starter</artifactId>
             </dependency>
     
             <!-- rabbitmq -->
             <dependency>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-starter-amqp</artifactId>
             </dependency>
     
             <dependency>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-starter-test</artifactId>
                 <scope>test</scope>
             </dependency>
         </dependencies>
     
         <build>
             <plugins>
                 <plugin>
                     <groupId>org.springframework.boot</groupId>
                     <artifactId>spring-boot-maven-plugin</artifactId>
                 </plugin>
             </plugins>
         </build>
     
     
     </project>
    

    application.yml


    image.png

    需要将publisher-confrems设为true,启动确认回调, 将 publisher-returns设为true 确认返回回调

    rabbitmq配置类--RabbitConfig
    第一部分, 定义队列


    image.png

    第二部分,设置一些消息处理策略

    image.png

    代码如下

    package com.example.rabbitmq;
     
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
     
    import javax.annotation.Resource;
     
    /**
     * rabbitMq 配置类
     * @author milicool
     * Created on 2018/9/14
     */
    @Configuration
    public class RabbitConfig {
        @Resource
        private RabbitTemplate rabbitTemplate;
     
        /**
         * 定义一个hello的队列
         * Queue 可以有4个参数
         *      1.队列名
         *      2.durable       持久化消息队列 ,rabbitmq重启的时候不需要创建新的队列 默认true
         *      3.auto-delete   表示消息队列没有在使用时将被自动删除 默认是false
         *      4.exclusive     表示该消息队列是否只在当前connection生效,默认是false
         */
        @Bean
        public Queue helloQueue() {
            return new Queue("queue-test");
        }
     
        /** ======================== 定制一些处理策略 =============================*/
     
        /**
         * 定制化amqp模版
         *
         * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调   即消息发送到exchange  ack
         * ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调  即消息发送不到任何一个队列中  ack
         */
        @Bean
        public RabbitTemplate rabbitTemplate() {
            Logger log = LoggerFactory.getLogger(RabbitTemplate.class);
     
            // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
            rabbitTemplate.setMandatory(true);
     
            // 消息返回, yml需要配置 publisher-returns: true
            rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                String correlationId = message.getMessageProperties().getCorrelationIdString();
                log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
            });
     
            // 消息确认, yml需要配置 publisher-confirms: true
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    // log.debug("消息发送到exchange成功,id: {}", correlationData.getId());
                } else {
                    log.debug("消息发送到exchange失败,原因: {}", cause);
                }
            });
     
            return rabbitTemplate;
        }
    }
    

    配置类
    生产者

    /**
     * 生产者
     * @author milicool
     * Created on 2018/9/14
     */
    @Component
    public class Producer {
     
        @Autowired
        private RabbitTemplate rabbitTemplate;
     
        /**
         * 给hello队列发送消息
         */
        public void send() {
            for (int i =0; i< 100; i++) {
                String msg = "hello, 序号: " + i;
                System.out.println("Producer, " + msg);
                rabbitTemplate.convertAndSend("queue-test", msg);
            }
        }
     
    }
    

    消费者

    /**
     * 消费者
     * @author milicool
     * Created on 2018/9/14
     */
    @Component
    public class Comsumer {
        private Logger log = LoggerFactory.getLogger(Comsumer.class);
     
        @RabbitListener(queues = "queue-test")
        public void process(Message message, Channel channel) throws IOException {
            // 采用手动应答模式, 手动确认应答更为安全稳定
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            log.info("receive: " + new String(message.getBody()));
        }
    }
    

    测试类

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitmqApplicationTests {
     
        @Autowired
        private Producer producer;
     
        @Test
        public void contextLoads() {
            producer.send();
        }
     
    }
    

    测试结果
    测试结果太长,没有截取全部,可以查看到消费者接收到了全部消息,如果有的消息在没有接收完,消息将被持久化,下次启动时消费


    image.png

    web端查看

    image.png

    相关文章

      网友评论

          本文标题:springboot整合rabbitmq,支持消息确认机制

          本文链接:https://www.haomeiwen.com/subject/xhnmbhtx.html