springboot使用rabbitMQ(带回调)

作者: 小尘哥 | 来源:发表于2018-05-24 17:29 被阅读100次

    springboot提供了各类东西的简单集成,rabbitMQ也不例外,本文重点介绍如何集成rabbitMQ以及如何使用带回调的rabbitMQ

    万年不变的第一步:pom

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    

    生产者

    配置文件1:RabbitConfig

    package com.mos.eboot.web.config;
    
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.config.ConfigurableBeanFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Scope;
    
    import javax.annotation.Resource;
    
    /**
     * @author 小尘哥
     */
    @Configuration
    public class RabbitConfig {
    
        @Resource
        private RabbitConstants rabbitConstants;
    
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses(rabbitConstants.getHost());
            connectionFactory.setUsername(rabbitConstants.getUsername());
            connectionFactory.setVirtualHost(rabbitConstants.getVirtualHost());
            connectionFactory.setPassword(rabbitConstants.getPassword());
    //        * 如果要进行消息回调,则这里必须要设置为true
            connectionFactory.setPublisherConfirms(rabbitConstants.getPublisherConfirms());
            return connectionFactory;
        }
    
        /**
         * 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置
         */
        @Bean
        @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
        public RabbitTemplate rabbitTemplate() {
            return new RabbitTemplate(connectionFactory());
        }
    
    }
    
    

    配置文件2:RabbitConstants(主要用于用户名、密码等值从配置文件获取,也可以用@Value方式)

    package com.mos.eboot.web.config;
    
    import org.springframework.boot.context.properties.ConfigurationProperties;
    
    /**
     * rabbit配置
     * @author 小尘哥
     */
    @ConfigurationProperties(prefix = "spring.rabbitmq")
    public class RabbitConstants {
    
        public static final String EXCHANGE   = "bootExchange";
        public static final String ROUTINGKEY = "routingkey";
        public static final String QUEUE      = "bootQueue";
    
        private String host;
    
        private Integer port;
    
        private String username;
    
        private String password;
    
        private Boolean publisherConfirms;
    
        private String virtualHost;
    
        public String getHost() {
            return host;
        }
    
        public void setHost(String host) {
            this.host = host;
        }
    
        public Integer getPort() {
            return port;
        }
    
        public void setPort(Integer port) {
            this.port = port;
        }
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        public String getPassword() {
            return password;
        }
    
        public void setPassword(String password) {
            this.password = password;
        }
    
        public Boolean getPublisherConfirms() {
            return publisherConfirms;
        }
    
        public void setPublisherConfirms(Boolean publisherConfirms) {
            this.publisherConfirms = publisherConfirms;
        }
    
        public String getVirtualHost() {
            return virtualHost;
        }
    
        public void setVirtualHost(String virtualHost) {
            this.virtualHost = virtualHost;
        }
    }
    
    

    配置文件3:DemoSender,即实际的消息发送者

    package com.mos.eboot.web.sender;
    
    import com.mos.eboot.tools.util.IDGen;
    import com.mos.eboot.web.config.RabbitConstants;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * @author 小尘哥
     */
    @Component
    public class DemoSender implements RabbitTemplate.ConfirmCallback{
    
        private static final Logger LOGGER = LoggerFactory.getLogger(DemoSender.class);
    
        private RabbitTemplate rabbitTemplate;
    
        @Autowired
        public DemoSender(RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
            this.rabbitTemplate.setConfirmCallback(this);
        }
    
        public void send(String msg) {
            CorrelationData correlationData = new CorrelationData(IDGen.genId());
            LOGGER.info("send: " + correlationData.getId());
            this.rabbitTemplate.convertAndSend(RabbitConstants.EXCHANGE, RabbitConstants.ROUTINGKEY, msg, correlationData);
        }
    
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    
            System.out.println("confirm: " + correlationData.getId());
    
        }
    }
    
    

    测试:DemoController

    package com.mos.eboot.web.controller;
    
    
    import com.mos.eboot.tools.controller.BaseController;
    import com.mos.eboot.tools.result.ResultModel;
    import com.mos.eboot.web.config.RabbitConstants;
    import com.mos.eboot.web.sender.DemoSender;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
    import javax.servlet.http.HttpSession;
    
    /**
     * @author 小尘哥
     */
    @RestController
    @RequestMapping("demo")
    public class DemoController extends BaseController  {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(DemoController.class);
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @Resource
        private DemoSender demoSender;
    
    
        @RequestMapping("amqp")
        public ResultModel amqp(){
            rabbitTemplate.convertAndSend(RabbitConstants.QUEUE,"1message from web");
            rabbitTemplate.convertAndSend("exchange","topic.messages","2message from web for exchage");
              rabbitTemplate.convertAndSend(RabbitConstants.EXCHANGE,RabbitConstants.ROUTINGKEY,"3message from web for fanoutExchange");
    
          //主要是下面这个
            demoSender.send("message from web for fanoutExchange1234234");
            return ResultModel.defaultSuccess(null);
        }
    }
    
    

    消费者

    配置都相同,添加一个Listener,用来接收消息

    package com.mos.eboot.consumer.config;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.messaging.handler.annotation.Payload;
    
    /**
     * @author 小尘哥
     */
    @Configuration
    @RabbitListener(queues = RabbitConstants.QUEUE)
    public class Listener {
    
        /** 设置交换机类型  */
        @Bean
        public DirectExchange defaultExchange() {
            /**
             * DirectExchange:按照routingkey分发到指定队列
             * TopicExchange:多关键字匹配
             * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
             * HeadersExchange :通过添加属性key-value匹配
             */
            return new DirectExchange(RabbitConstants.EXCHANGE);
        }
    
        @Bean
        public Queue fooQueue() {
            return new Queue(RabbitConstants.QUEUE);
        }
    
        @Bean
        public Binding binding() {
            /** 将队列绑定到交换机 */
            return BindingBuilder.bind(fooQueue()).to(defaultExchange()).with(RabbitConstants.ROUTINGKEY);
        }
    
        @RabbitHandler
        public void process(@Payload String foo) {
            System.out.println("Listener: " + foo);
        }
    }
    
    

    yml配置

    spring:
      redis:
        database: 0
        # Redis服务器地址
        host: 127.0.0.1
        # Redis服务器连接端口
        port: 6379
        # Redis服务器连接密码(默认为空)
        password: 123456789
      rabbitmq:
          host: 172.16.14.93
          port: 5672
          username: dreamer
          password: dreamer
          virtualHost: eboot
          publisherConfirms: true
    

    测试结果

    访问http://localhost:8881/demo/amqp(根据你的实际情况)

    生产者 消费者

    可以看到消费者接收到了所发送的三个消息,但是其中只有第三个demoSender.send()发送的有回调,而在DemoSender中重写的confirm里也接收到了回调信息。

    完整代码已上传码云,戳【eboot】获取源码

    相关文章

      网友评论

        本文标题:springboot使用rabbitMQ(带回调)

        本文链接:https://www.haomeiwen.com/subject/beisjftx.html