美文网首页应用开发与调优
RabbitMQ与SpringBoot2.0整合

RabbitMQ与SpringBoot2.0整合

作者: 匆匆岁月 | 来源:发表于2018-10-22 16:20 被阅读149次

    application.properties:

    spring.rabbitmq.addresses=192.
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123456
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.template.mandatory=true
    

    RabbitMQ与SpringBoot整合配置详解:

    1. 生产端核心配置


    • publisher-confirms,实现一个监听器用于监听Broker端为我们返回的确认请求:
      RabbitTemplate.ConfirmCallback

    • publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功:
      RabbitTemplate.ReturnCallback

    • 注意一点,在发送消息时候对template进行设置mandatory=true保证监听有效

    • 生产端还可以配置其他属性,比如发送重试、超时时间、次数、间隔等。

    RabbitSender:

    package com.pyy.springboot.producer;
    
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageHeaders;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    
    @Component
    public class RabbitSender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.err.println("correlationData:" + correlationData);
                System.err.println("ack:" + ack);
                if(!ack) {
                    System.err.println("异常处理...");
                }else {
                    // 更新数据库对应的消息状态:已发送
                }
            }
        };
    
    
        final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.err.println("return exchange:" + exchange + " , routingKey:" + routingKey + ", replyCode:" + replyCode + ", replyText:" + replyText);
            }
        };
    
        public void send(Object message, Map<String, Object> headerProperties) throws Exception {
            MessageHeaders messageHeaders = new MessageHeaders(headerProperties);
            Message msg = MessageBuilder.createMessage(message, messageHeaders);
    
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setReturnCallback(returnCallback);
    
            CorrelationData correlationData = new CorrelationData();
            correlationData.setId("userid" + System.currentTimeMillis());// id + 时间戳 全局唯一 实际消息的id
            //rabbitTemplate.convertAndSend("pyy.exchange", "springboot.hello", msg, correlationData);
    
            rabbitTemplate.convertAndSend("pyy.exchange", "fasdfsf.hello", msg, correlationData);
    
        }
    }
    

    2. 消费端核心配置


    spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL
    spring.rabbitmq.listener.simple.concurrency=1
    spring.rabbitmq.listener.simple.max-concurrency=5
    
    • 首先配置ACK手工确认模式,用于ACK的手工处理,这样我可以保证消息的可靠性送达,或者在消费失败时候可以做到重回队列、根据业务记录日志等处理。

    • 可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况

    @RabbitListener注解使用

    • 消费端监听@RabbitMQListener注解,这个对于在实际工作中非常的好用

    • @RabbitListener只一个组合的注解,里面可以注解配置@QueueBinding@Queue@Exchange直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并且配置监听功能等

    package com.pyy.mq.service;
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Component;
    
    /**
     * 消息接收者
     * @RabbitListener bindings:绑定队列
     *   @QueueBinding  value:绑定队列的名称
     *                exchange:配置交换器
     * 
     *     @Queue value:配置队列名称
     *        autoDelete:是否是一个可删除的临时队列
     * 
     *     @Exchange value:为交换器起个名称
     *           type:指定具体的交换器类型
     */
    @Component
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${mq.config.queue.info}", autoDelete = "true"),
                    exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
                    key = "${mq.config.queue.info.routing.key}"
            )
    )
    public class InfoReceiver {
    
        /**
         * 接收消息方法,采用消息队列监听机制
         * @param msg
         */
        @RabbitHandler
        public void process(String msg) {
            System.out.println("Info receiver:" + msg);
        }
    }
    

    @RabbitListener注解如果没有存在exchange和queue会自动创建

    案例详细代码:https://github.com/pyygithub/springboot-rabbitmq

    相关文章

      网友评论

        本文标题:RabbitMQ与SpringBoot2.0整合

        本文链接:https://www.haomeiwen.com/subject/vglyzftx.html