美文网首页互联网科技
SpringBoot 使用RabbitMQ 做延时队列

SpringBoot 使用RabbitMQ 做延时队列

作者: 问题_解决_分享_讨论_最优 | 来源:发表于2019-12-23 10:53 被阅读0次

    SpringBoot 使用RabbitMQ 做延时队列

    1.下载并安装erlang和RabbitMQ

    erlang下载地址 :http://www.erlang.org/downloads
    RabbitMQ下载地址:http://www.rabbitmq.com/download.htm

    2.SpringBoot整合RabbitMQ

    1.创建一个maven项目
    2.在application.yml文件添加

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
    

    3.在pom.xml添加依赖

    <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
        </dependencies>
    
    1. 实体类
    package com.rabbit.demos.pojo;
    
    import lombok.Data;
    
    import java.io.Serializable;
    
    @Data
    public class Order implements Serializable {
    
    
        private static final long serialVersionUID = -2221214252163879885L;
    
        private String orderId; // 订单id
    
        private Integer orderStatus; // 订单状态 0:未支付,1:已支付,2:订单已取消
    
        private String orderName; // 订单名字
    
    }
    
    1. 配置队列
    package com.rabbit.demos.config;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /*
    .配置队列
     */
    @Configuration
    @Slf4j
    public class DelayRabbitConfig {
    
    
        /**
         * 延迟队列 TTL 名称
         */
        private static final String ORDER_DELAY_QUEUE = "user.order.delay.queue";
        /**
         * DLX,dead letter发送到的 exchange
         * 延时消息就是发送到该交换机的
         */
        public static final String ORDER_DELAY_EXCHANGE = "user.order.delay.exchange";
        /**
         * routing key 名称
         * 具体消息发送在该 routingKey 的
         */
        public static final String ORDER_DELAY_ROUTING_KEY = "order_delay";
    
        public static final String ORDER_QUEUE_NAME = "user.order.queue";
        public static final String ORDER_EXCHANGE_NAME = "user.order.exchange";
        public static final String ORDER_ROUTING_KEY = "order";
    
        /**
         * 延迟队列配置
         * <p>
         * 1、params.put("x-message-ttl", 5 * 1000);
         * 第一种方式是直接设置 Queue 延迟时间 但如果直接给队列设置过期时间,这种做法不是很灵活,(当然二者是兼容的,默认是时间小的优先)
         * 2、rabbitTemplate.convertAndSend(book, message -> {
         * message.getMessageProperties().setExpiration(2 * 1000 + "");
         * return message;
         * });
         * 第二种就是每次发送消息动态设置延迟时间,这样我们可以灵活控制
         **/
        @Bean
        public Queue delayOrderQueue() {
            Map<String, Object> params = new HashMap<>();
            // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
            params.put("x-dead-letter-exchange", ORDER_EXCHANGE_NAME);
            // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
            params.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY);
            return new Queue(ORDER_DELAY_QUEUE, true, false, false, params);
        }
        /**
         * 需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
         * 这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,
         * 不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
         * @return DirectExchange
         */
        @Bean
        public DirectExchange orderDelayExchange() {
            return new DirectExchange(ORDER_DELAY_EXCHANGE);
        }
        @Bean
        public Binding dlxBinding() {
            return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);
        }
    
        @Bean
        public Queue orderQueue() {
            return new Queue(ORDER_QUEUE_NAME, true);
        }
        /**
         * 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
         * 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
         **/
        @Bean
        public TopicExchange orderTopicExchange() {
            return new TopicExchange(ORDER_EXCHANGE_NAME);
        }
    
        @Bean
        public Binding orderBinding() {
            // TODO 如果要让延迟队列之间有关联,这里的 routingKey 和 绑定的交换机很关键
            return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY);
        }
    
    }
    

    6.发送消息

    package com.rabbit.demos.config;
    
    
    import com.rabbit.demos.pojo.Order;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    /*
    发送者
     */
    @Component
    @Slf4j
    public class DelaySender {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        public void sendDelay(Order order) {
            log.info("【订单生成时间】" + new Date().toString() +"【1分钟后检查订单是否已经支付】" + order.toString() );
            this.amqpTemplate.convertAndSend(DelayRabbitConfig.ORDER_DELAY_EXCHANGE, DelayRabbitConfig.ORDER_DELAY_ROUTING_KEY, order, message -> {
                // 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么这一句也可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间
                message.getMessageProperties().setExpiration(1 * 1000 * 60 + "");
                return message;
            });
        }
    }
    

    7.消费消息

    package com.rabbit.demos.config;
    
    import com.rabbit.demos.pojo.Order;
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    /*
    接收者
     */
    @Component
    @Slf4j
    public class DelayReceiver {
    
        @RabbitListener(queues = {DelayRabbitConfig.ORDER_QUEUE_NAME})
        public void orderDelayQueue(Order order, Message message, Channel channel) {
            log.info("###########################################");
            log.info("【orderDelayQueue 监听的消息】 - 【消费时间】 - [{}]- 【订单内容】 - [{}]",  new Date(), order.toString());
            if(order.getOrderStatus() == 0) {
                order.setOrderStatus(2);
                log.info("【该订单未支付,取消订单】" + order.toString());
            } else if(order.getOrderStatus() == 1) {
                log.info("【该订单已完成支付】");
            } else if(order.getOrderStatus() == 2) {
                log.info("【该订单已取消】");
            }
            log.info("###########################################");
        }
    }
    

    8.测试队列

    package com.rabbit.demos.Controller;
    
    import com.rabbit.demos.config.DelaySender;
    import com.rabbit.demos.pojo.Order;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.concurrent.DelayQueue;
    
    @RestController
    public class TestController {
    
        @Autowired
        private DelaySender delaySender;
    
        @GetMapping("/sendDelay")
        public Object sendDelay() {
            Order order1 = new Order();
            order1.setOrderStatus(0);
            order1.setOrderId("123456");
            order1.setOrderName("小米6");
    
            Order order2 = new Order();
            order2.setOrderStatus(1);
            order2.setOrderId("456789");
            order2.setOrderName("小米8");
    
            delaySender.sendDelay(order1);
            delaySender.sendDelay(order2);
            return "ok";
        }
    }
    

    9.运行结果

    • 刚开始运行


      20191220142015916.png
    • 一分钟后


      一分钟后

    打个广告,本人博客地址是:风吟个人博客

    相关文章

      网友评论

        本文标题:SpringBoot 使用RabbitMQ 做延时队列

        本文链接:https://www.haomeiwen.com/subject/chqznctx.html