Spring Boot RabbitMQ 优先级队列

作者: Anoyi | 来源:发表于2019-02-23 22:14 被阅读332次

    Docker With RabbitMQ

    官方 Docker 镜像仓库地址

    本地运行 RabbitMQ

    docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
    

    访问可视化面板

    Spring Boot With RabbitMQ

    Spring Boot 集成 RabbitMQ

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    

    基本参数配置

    # host & port
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    

    Queue / Exchange / Routing 配置

    /**
     * RabbitMQ 配置
     */
    @Configuration
    public class RabbitMQConfig {
    
        private static final String EXCHANGE = "priority-exchange";
    
        public static final String QUEUE = "priority-queue";
    
        private static final String ROUTING_KEY = "priority.queue.#";
    
        /**
         * 定义优先级队列
         */
        @Bean
        Queue queue() {
            Map<String, Object> args= new HashMap<>();
            args.put("x-max-priority", 100);
            return new Queue(QUEUE, false, false, false, args);
        }
    
        /**
         * 定义交换器
         */
        @Bean
        TopicExchange exchange() {
            return new TopicExchange(EXCHANGE);
        }
    
        @Bean
        Binding binding(Queue queue, TopicExchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
        }
    
    }
    

    priority queue 定义参考官方文档:https://www.rabbitmq.com/priority.html

    Spring Boot 应用启动后,会自动创建 Queue 和 Exchange ,并相互绑定,优先级队列会有如图所示标识。

    RabbitMQ Publisher

    Spring Boot 相关配置

    # 是否开启消息发送到交换器(Exchange)后触发回调
    spring.rabbitmq.publisher-confirms=false
    # 是否开启消息发送到队列(Queue)后触发回调
    spring.rabbitmq.publisher-returns=false
    # 消息发送失败重试相关配置
    spring.rabbitmq.template.retry.enabled=true
    spring.rabbitmq.template.retry.initial-interval=3000ms
    spring.rabbitmq.template.retry.max-attempts=3
    spring.rabbitmq.template.retry.max-interval=10000ms
    spring.rabbitmq.template.retry.multiplier=1
    

    发送消息

    @Component
    @AllArgsConstructor
    public class FileMessageSender {
    
        private static final String EXCHANGE = "priority-exchange";
    
        private static final String ROUTING_KEY_PREFIX = "priority.queue.";
    
        private final RabbitTemplate rabbitTemplate;
    
        /**
         * 发送设置有优先级的消息
         *
         * @param priority 优先级
         */
        public void sendPriorityMessage(String content, Integer priority) {
            rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY_PREFIX + "test", content,
                    message -> {
                        message.getMessageProperties().setPriority(priority);
                        return message;
                    });
        }
    
    }
    

    RabbitMQ Consumer

    Spring Boot 相关配置

    # 消息接收确认,可选模式:NONE(不确认)、AUTO(自动确认)、MANUAL(手动确认)
    spring.rabbitmq.listener.simple.acknowledge-mode=AUTO
    # 最小线程数量
    spring.rabbitmq.listener.simple.concurrency=10
    # 最大线程数量
    spring.rabbitmq.listener.simple.max-concurrency=10
    # 每个消费者可能未完成的最大未确认消息数量
    spring.rabbitmq.listener.simple.prefetch=1
    

    消费者执行耗时较长的话,建议 spring.rabbitmq.listener.simple.prefetch 设置为较小数值,让优先级越高的消息更快加入到消费者线程。

    监听消息

    @Slf4j
    @Component
    public class MessageListener {
    
        /**
         * 处理消息
         */
        @RabbitListener(queues = "priority-queue")
        public void listen(String message) {
            log.info(message);
        }
    
    }
    

    番外补充

    1、自定义消息发送确认的回调

    • 配置如下:
    # 开启消息发送到交换器(Exchange)后触发回调
    spring.rabbitmq.publisher-confirms=true
    # 开启消息发送到队列(Queue)后触发回调
    spring.rabbitmq.publisher-returns=true
    
    • 自定义 RabbitTemplate.ConfirmCallback 实现类
    @Slf4j
    public class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback{
    
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            log.info("消息唯一标识: {}", correlationData);
            log.info("确认状态: {}", ack);
            log.info("造成原因: {}", cause);
        }
    
    }
    
    • 自定义 RabbitTemplate.ConfirmCallback 实现类
    @Slf4j
    public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback {
    
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            log.info("消息主体: {}", message);
            log.info("回复编码: {}", replyCode);
            log.info("回复内容: {}", replyText);
            log.info("交换器: {}", exchange);
            log.info("路由键: {}", routingKey);
        }
    
    }
    
    • 配置 rabbitTemplate
    @Component
    @AllArgsConstructor
    public class RabbitTemplateInitializingBean implements InitializingBean {
    
        private final RabbitTemplate rabbitTemplate;
    
        @Override
        public void afterPropertiesSet() {
            rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack());
            rabbitTemplate.setReturnCallback(new RabbitReturnCallback());
        }
        
    }
    

    2、RabbitMQ Exchange 类型

    相关文章

      网友评论

        本文标题:Spring Boot RabbitMQ 优先级队列

        本文链接:https://www.haomeiwen.com/subject/hqgpyqtx.html