美文网首页
SpringBoot 集成 RabbitMQ 从基础到进阶

SpringBoot 集成 RabbitMQ 从基础到进阶

作者: qyfl | 来源:发表于2019-10-16 18:17 被阅读0次

    SpringBoot 集成 RabbitMQ 分为 5 个部分:增加 pom 依赖,增加 yml 配置,新建 MQ 配置,生产者开发,消费者开发。

    基础版

    1. 新增 pom 依赖

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    2. 新增 yml 依赖

    注意!yml 文件的依赖,生产者和消费者的配置有些不一样。

    生产者

    spring.rabbitmq.addresses=192.168.11.76:5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.template.mandatory=true
    

    消费者

    spring.rabbitmq.addresses=192.168.11.76:5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    spring.rabbitmq.listener.simple.concurrency=5
    spring.rabbitmq.listener.simple.max-concurrency=10
    

    3. MQ 初始化配置

    注意!MQ 初始化有很多种实现方式,不止这一种。

    1. 注意!微服务下,MQ 初始化配置如果只在生产者或者消费者一端实现,则该端需要先启动。如果可以,放到公共配置服务下,启动的时候最先启动配置服务。然后在启动生产者和消费者。
    2. 在 springboot 启动类上加上 @ComponentScan 注解,并扫描到初始化配置的类。
    3. 编写初始化配置的类。代码作用请看注释,示例如下:
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MainConfig {
    
        // 队列名称
        public final static String Q_NAME = "spring-boot-queue-1";
        // 交换机名称
        public final static String EXCHANG_NAME = "spring-boot-exchange-1";
        // 绑定的值
        public static final String BIND_KEY = "spring-boot-bind-key-1";
    
        // 定义队列,第二个参数是表示持久化
        @Bean
        Queue queue() {
            return new Queue(Q_NAME, true);
        }
    
        // 定义交换机,第二个参数是表示持久化,第三个参数表示当最后一个绑定被解绑的时候,不会自动删除该 exchange
        @Bean
        TopicExchange exchange() {
            return new TopicExchange(EXCHANG_NAME, true, false);
        }
    
        // 绑定队列和交换机,因为是 TopicExchange
        @Bean
        Binding binding(Queue queue, TopicExchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(BIND_KEY);
        }
    }
    
    1. 关于什么是 TopicExchange,为什么队列和交换机要绑定,这是基础。如果看不同可以看看我写的入门的文章。
    2. MQ 初始化好之后,就可以编写生产者和开发者的时候使用这些。
    3. 这只是一个配置实例,实际开发中可能需要将 exchange 和 Q 的信息从配置文件中获取,而不是在代码中写死。

    4. 生产者

    @Component
    public class SendMsg1 {
        // 此接口的默认实现是RabbitTemplate,目前只有一个实现,
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        // 发送消息
        public void send_1(String msgContent) {
            amqpTemplate.convertAndSend(MainConfig.EXCHANG_NAME, MainConfig.BIND_KEY, msgContent);
        }
    }
    

    5. 消费者

    @Component
    public class ReceiveMsg1 {
    
        /**
         * 获取信息: queue也可以支持RabbitMQ中对队列的模糊匹配
         * @param content
         */
        @RabbitListener(queues = MainConfig.Q_NAME)
        public void receive_1(String content) {
            // ...
            System.out.println("[ReceiveMsg-1] receive msg: " + content);
        }
    }
    

    6. 使用

    public class SimpleTest {
    
        @Autowired
        private SendMsg1 sendMsg1;
    
        @Test
        public void sendAndReceive_1(){
            String testContent = "send msg via spring boot - 1";
            sendMsg1.send_1(testContent);
            try {
                Thread.sleep(1000 * 10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    进阶版

    基础版代码接近一个 demo,在开发中还要考虑很多其他的东西。进阶版就是在基础版的基础上进行完善,保证消息 100% 投递。

    进阶版的 pom,yml,MQ 初始化都和基础版一样,无需更改。

    知识背景

    1. 首先要知道 confirmCallbackreturnCallback 分别是什么。rabbitmq 整个消息投递的路径为:
      生产者->rabbitmq broker cluster->exchange->Q->消费者。这个路径在另一篇《RabbitMQ 核心概念与命令》中有图片清晰展示出来。

      消息从生产者到 rabbitmq broker cluster 成功则会返回一个 confirmCallback 。
      消息从 exchange 到 queue 投递失败则会返回一个 returnCallback 。

    第一次升级,解决消息可靠性投递的问题。

    生产者升级

    生产者在投递消息的时候,能够知道消息是否成功投递,成功投递怎么处理,投递失败怎么补偿就要看具体的业务逻辑了。因为业务不同,实现不同。可以参考另一篇《如何保障消息 100% 投递成功?》的文章。

    @Component
    public class RabbitSender {
        @Autowired
        private RabbitTemplate rabbitTemplate;  
        
        // ack 为 true 表示正常,false 表示异常
        final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.err.println("correlationData: " + correlationData);
                System.err.println("ack: " + ack);
                if(!ack){
                    System.err.println("异常处理....");
                }
            }
        };
        
        // 消息没有正确送到 Q 里,需要做额外的处理,这里的实现只是打印。
        final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                    String exchange, String routingKey) {
                System.err.println("return exchange: " + exchange + ", routingKey: " 
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
            }
        };
        
        //发送消息方法调用: 构建Message消息
        public void send(Object message, Map<String, Object> properties) throws Exception {
            MessageHeaders mhs = new MessageHeaders(properties);
            Message msg = MessageBuilder.createMessage(message, mhs);
    
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setReturnCallback(returnCallback);
            
            //id + 时间戳 全局唯一。 没有投递成功的时候可以通过这个值判断是哪个消息。
            CorrelationData correlationData = new CorrelationData("1234567890");
    
            rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
        }
    }
    

    消费者升级

    消费者拿到消息具体的业务处理,各有不同。

    import com.bfxy.springboot.MainConfig;
    import com.rabbitmq.client.Channel;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;
    
    @Component
    public class RabbitReceiver {
    
        /**
         * 这是消费者监听消息的方法
         * @RabbitListener 是监听哪一个队列
         * @param message
         * @param channel
         * @throws Exception
         */
        @RabbitListener(queues=MainConfig.Q_NAME)
        @RabbitHandler
        public void onMessage(Message message, Channel channel) throws Exception {
            System.err.println("--------------------------------------");
            System.err.println("消费端Payload: " + message.getPayload());
            
            //手工ACK,这一步尤为重要,因为在配置文件中配置了手动 ACK,所有需要有 ACK 的动作。
            Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
            channel.basicAck(deliveryTag, false);
        }
    }
    

    第二次升级,解决 MQ 传递 java 对象的问题。

    如果 MQ 中传递 java 对象,则需要生产者和消费者都定义好这个对象。尤其是在 SpringCloud 中,生产者和消费者很可能不在一个服务中。

    以下是对象示例:

    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    import java.io.Serializable;
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class Order implements Serializable {
    
        private String id;
        private String name;
    }
    

    一定要实现序列化接口,因为 MQ 只可以投递 String 和 byte[] 类型。

    生产者修改

    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
    import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import com.bfxy.springboot.entity.Order;
    
    @Component
    public class RabbitSender {
        @Autowired
        private RabbitTemplate rabbitTemplate;  
        
        // ack 为 true 表示正常,false 表示异常
        final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.err.println("correlationData: " + correlationData);
                System.err.println("ack: " + ack);
                if(!ack){
                    System.err.println("异常处理....");
                }
            }
        };
        
        // 消息没有正确送到 Q 里,需要做额外的处理,这里的实现只是打印。
        final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                    String exchange, String routingKey) {
                System.err.println("return exchange: " + exchange + ", routingKey: " 
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
            }
        };
    
        //发送消息方法调用: 构建Message消息
        public void sendOrder(Order order) throws Exception {
            // 如果不需要设置 Headers 则没有必要加。MQ 有默认的 Headers 设置。
            // MessageHeaders mhs = new MessageHeaders(properties);
            // Message msg = MessageBuilder.createMessage(order, mhs);
    
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setReturnCallback(returnCallback);
    
            //id + 时间戳 全局唯一。 没有投递成功的时候可以通过这个值判断是哪个消息。
            CorrelationData correlationData = new CorrelationData("1234567890");
            rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", order, correlationData);
        }
    }
    

    消费者修改

    import com.bfxy.springboot.MainConfig;
    import com.bfxy.springboot.entity.Order;
    import com.rabbitmq.client.Channel;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    
    @Component
    public class RabbitReceiver {
        /**
         * 将 Message 拆成 Payload 和 Headers
         * @param order Q 里传递的 java 对象
         * @param channel
         * @param headers
         * @throws Exception
         */
        @RabbitListener(queues = MainConfig.Q_NAME)
        @RabbitHandler
        public void onOrderMessage(@Payload Order order, Channel channel, @Headers Map<String, Object> headers) throws Exception {
            System.err.println("--------------------------------------");
            System.err.println("消费端 orderID: " + order.getId());
    
            //手工ACK,这一步尤为重要,因为在配置文件中配置了手动 ACK,所有需要有 ACK 的动作。
            Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
            channel.basicAck(deliveryTag, false);
        }
    }
    

    使用

    public class ApplicationTests {
        @Autowired
        private RabbitSender rabbitSender;
    
        public void testSender2() throws Exception {
             Order order = new Order("001", "第一个订单");
             rabbitSender.sendOrder(order);
        }
    }
    

    相关文章

      网友评论

          本文标题:SpringBoot 集成 RabbitMQ 从基础到进阶

          本文链接:https://www.haomeiwen.com/subject/vzsumctx.html