美文网首页
spring boot集成RabbitMQ

spring boot集成RabbitMQ

作者: 颇风 | 来源:发表于2019-12-28 23:35 被阅读0次

    原理见:https://www.jianshu.com/p/79ca08116d57

    安装RabbitMQ略过

    1.Springboot引入相关依赖

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    2.application.properties配置

    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    
    ###消费者额外配置
    spring.rabbitmq.listener.concurrency=2  //最小消息监听线程数
    spring.rabbitmq.listener.max-concurrency=2 //最大消息监听线程数
    

    3.RabbitMQ配置类

    @Configuration 
    public class RabbitConfig {
    /**
     * 声明队列
     * @return
     */
    @Bean
    public Queue queue() {
        return new Queue("hello.queue1");
    }
    
    /**
     * 声明交互器
     * @return
     */
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }
    
    /**
     * 绑定
     * @return
     */
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(topicExchange()).with("key");
    }
        
    }
    

    4.消息生产者

    @Component
    @Slf4j
    public class Producer implements RabbitTemplate.ConfirmCallback, ReturnCallback {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
    
        @PostConstruct
        public void init() {
            rabbitTemplate.setConfirmCallback(this);
        }
        @Override
        public void handle(Return aReturn) {
    
            log.info("消息{},发送失败",aReturn.getBody());
        }
    
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String s) {
    
            if(ack){
                log.info("消息发送成功:{}",correlationData);
            }else {
    
                log.info("消息发送失败:{}",s);
            }
        }
    
        public void send(String message){
    
            CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
            log.info("发送消息:{}",message);
            String resp=rabbitTemplate.convertSendAndReceive("topicExchange","key",message,correlationId).toString();
            log.info("消费者响应:{},处理完成",resp);
        }
    

    要点:
    1.注入RabbitTemplate
    2.实现RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback接口(非必须)。
    ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。
    3.实现消息发送方法。调用rabbitTemplate相应的方法即可。rabbitTemplate常用发送方法有:

    rabbitTemplate.send(message);   //发消息,参数类型为org.springframework.amqp.core.Message
    rabbitTemplate.convertAndSend(object); //转换并发送消息。 将参数对象转换为org.springframework.amqp.core.Message后发送
    rabbitTemplate.convertSendAndReceive(message) //转换并发送消息,且等待消息者返回响应消息。
    

    5.消息消费者

    @Componentpublic class Customer {
        
        @RabbitListener(queues = "hello.queue1")
        public String processMessage1(String msg) {
            System.out.println(Thread.currentThread().getName() + " 接收到来自hello.queue1队列的消息:" + msg);        return msg.toUpperCase();    }
    }
    

    要点:
    1.监听器参数类型与消息实际类型匹配。在生产者中发送的消息实际类型是String,所以这里监听器参数类型也是String。
    2.如果监听器需要有响应返回给生产者,直接在监听方法中return即可。
    3.queues必须与生产者中配置的队列名一样。

    6.测试

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitMqApplicationTests {
    
        @Autowired
        Producer producer;
        @Test
        public void contextLoads() throws InterruptedException {
    
            for (int i = 0; i < 100; i++) {
                producer.send("发送第"+i+"条消息");
                Thread.sleep(2000);
            }
        }
    
    }
    

    输出结果如下:


    255BBCFB-5950-4124-BF1D-D7DDDF9EDC70.png255BBCFB-5950-4124-BF1D-D7DDDF9EDC70.png

    相关文章

      网友评论

          本文标题:spring boot集成RabbitMQ

          本文链接:https://www.haomeiwen.com/subject/temeoctx.html