美文网首页
Spring boot 集成rabbitmq

Spring boot 集成rabbitmq

作者: 刘小刀tina | 来源:发表于2020-05-17 11:33 被阅读0次

    1 .pom.xml

     <!-- Spring boot 集成rabbitmq-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
                <version>2.2.2.RELEASE</version>
            </dependency>
    
    

    2 . 配置

    spring:
      application:
        name: demo-mq
    
    
    
    #配置rabbitmq
      rabbitmq:
        addresses: 152.136.27.48:5672
        username: guest
        password: guest
        #虚拟主机地址
        virtual-host: /
        #连接超时时间
        connection-timeout: 15000
        #publisher-confirms: true  #开启发送确认
        publisher-returns: true #开启发送失败退回
        template:
          mandatory: true
        #消费端配置
        listener:
          simple:
            #消费端
            concurrency: 10
            #最大消费端数
            max-concurrency: 20
            #自动签收auto  手动 manual
            acknowledge-mode: manual #开启ACK
            #限流
            prefetch: 50
    

    3 .rabbitmq配置

    package com.example.demo.rabbitmq.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @program: demo-rabbitmq
     * @description
     * @author: tina.liu
     * @create: 2020-05-16 21:36
     **/
    @Configuration
    public class RabbitConfig {
    
        /**
         * 创建广播类型的exchange queue 将二者绑定
         */
    
        //队列名
        public static final String FANOUT_QUEUE_NAME = "test_fanout_queue";
        public static final String FANOUT_QUEUE_NAME1 = "test_fanout_queue1";
        public static final String TEST_FANOUT_EXCHANGE = "testFanoutExchange";
    
        //创建队列
        @Bean
        public Queue createFanoutQueue() {
            return new Queue(FANOUT_QUEUE_NAME);
        }
    
        //创建队列
        @Bean
        public Queue createFanoutQueue1() {
            return new Queue(FANOUT_QUEUE_NAME1);
        }
    
        //创建广播类型的交换机
        @Bean
        public FanoutExchange defFanoutExchange() {
            return new FanoutExchange(TEST_FANOUT_EXCHANGE);
        }
    
        //队列与交换机进行绑定
        @Bean
        Binding bindingFanout() {
            return BindingBuilder.bind(createFanoutQueue()).
                    to(defFanoutExchange());
        }
    
        //队列与交换机进行绑定
        @Bean
        Binding bindingFanout1() {
            return BindingBuilder.bind(createFanoutQueue1()).
                    to(defFanoutExchange());
        }
    
        /**
         * 创建直连的queue exchange 绑定
         * @return
         */
        public static final String DIRECT_QUEUE_NAME = "test_direct_queue"; //queue
        public static final String TEST_DIRECT_EXCHANGE = "testDirectExchange"; //exchange
        public static final String DIRECT_ROUTINGKEY = "test"; //routingKey
    
        /**
         * 创建直连的队列
         * @return
         */
        @Bean
        public Queue createDirectQueue() {
            return new Queue(DIRECT_QUEUE_NAME);
        }
    
        /**
         * 创建直连交换机
         * @return
         */
        @Bean
        DirectExchange directExchange(){
            return new DirectExchange(TEST_DIRECT_EXCHANGE);
        }
    
        /**
         * 将交换机和队列绑定
         * @return
         */
        @Bean
        Binding bindingDirect() {
            return BindingBuilder.bind(createDirectQueue()).
                    to(directExchange()).
                    with(DIRECT_ROUTINGKEY);
        }
    
    
        /**
         * 创建通配符类型的exchange queue 将二者绑定
         */
        public static final String TOPIC_QUEUE_NAME = "test_topic_queue"; //queue
        public static final String TEST_TOPIC_EXCHANGE = "testTopicExchange"; //exchange
        public static final String TOPIC_ROUTINGKEY = "test.*";//routingKey
    
        //创建队列
        @Bean
        public Queue createTopicQueue() {
            return new Queue(TOPIC_QUEUE_NAME);
        }
    
        /**
         * 创建通配符交换机
         * @return
         */
        @Bean
        TopicExchange defTopicExchange(){
            return new TopicExchange(TEST_TOPIC_EXCHANGE);
        }
    
        /**
         * 将交换机和队列进行绑定
         * @return
         */
        @Bean
        Binding bindingTopic() {
            return BindingBuilder.bind(createTopicQueue()).
                    to(defTopicExchange()).
                    with(TOPIC_ROUTINGKEY);
        }
    
    
    }
    
    
    

    4 . 生产者

    package com.example.demo.rabbitmq.rabbitmq;
    
    import com.example.demo.rabbitmq.config.RabbitConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * @program: demo-rabbitmq
     * @description 消息生成者
     * @author: tina.liu
     * @create: 2020-05-16 21:42
     **/
    @Component
    @Slf4j
    public class MsgProducer {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 通过广播类型的交换机生成消息
         * @param massage
         */
        public void send2FanoutTestQueue(String massage){
            //exchange key msg
            rabbitTemplate.convertAndSend(RabbitConfig.TEST_FANOUT_EXCHANGE,"", massage);
            log.info("广播类型的交换机绑定消息队列,生成者发送消息成功");
        }
    
        /**
         * 通过直连类型的交换机生成消息
         * @param massage
         */
        public void send2DirectTestQueue(String massage){
            rabbitTemplate.convertAndSend(RabbitConfig.TEST_DIRECT_EXCHANGE, RabbitConfig.DIRECT_ROUTINGKEY, massage);
            log.info("直连类型的交换机绑定消息队列,生成者发送消息成功");
        }
    
        /**
         * 通过通配符类型的交换机生成消息
         * @param massage
         */
        public void send2TopicTestAQueue(String massage){
            rabbitTemplate.convertAndSend(RabbitConfig.TEST_TOPIC_EXCHANGE, "a.test.aaa", massage);
            log.info("通配符类型A的交换机绑定消息队列,生成者发送消息成功");
        }
    
        public void send2TopicTestBQueue(String massage){
            rabbitTemplate.convertAndSend(RabbitConfig.TEST_TOPIC_EXCHANGE, "test.bbb", massage);
            log.info("通配符类型B的交换机绑定消息队列,生成者发送消息成功");
        }
    
    }
    
    
    

    5 .消费者

    package com.example.demo.rabbitmq.rabbitmq;
    
    import com.example.demo.rabbitmq.config.RabbitConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Component;
    
    import java.nio.charset.StandardCharsets;
    
    /**
     * @program: demo-rabbitmq
     * @description 消息消费者
     * @author: tina.liu
     * @create: 2020-05-16 21:42
     **/
    @Component
    @Slf4j
    public class MsgConsumer {
    
        /**
         * 从跟广播类型的交换机绑定的队列中消费消息
         * @param massage
         */
        @RabbitListener(
                bindings =
                        {
                                @QueueBinding(value = @Queue(value = RabbitConfig.FANOUT_QUEUE_NAME, durable = "true"),
                                        exchange = @Exchange(value = RabbitConfig.TEST_FANOUT_EXCHANGE, type = "fanout"))
                        })
        @RabbitHandler
        public void processFanoutMsg(Message massage) {
            String msg = new String(massage.getBody(), StandardCharsets.UTF_8);
            log.info("从广播类型的交换机绑定的队列中消费消息success, message : {}" + msg);
        }
    
    
        /**
         * 从跟广播类型的交换机绑定的队列中消费消息
         * @param massage
         */
        @RabbitListener(
                bindings =
                        {
                                @QueueBinding(value = @Queue(value = RabbitConfig.FANOUT_QUEUE_NAME1, durable = "true"),
                                        exchange = @Exchange(value = RabbitConfig.TEST_FANOUT_EXCHANGE, type = "fanout"))
    
                        })
        @RabbitHandler
        public void processFanout1Msg(Message massage) {
            String msg = new String(massage.getBody(), StandardCharsets.UTF_8);
            log.info("从广播类型的交换机绑定的队列中消费消息success, message : {}" , msg);
        }
    
        /**
         * 通过直连类型的交换机绑定的消息队列中消费消息
         * @param massage
         */
        @RabbitListener(
                bindings =
                        {
                                @QueueBinding(value = @Queue(value = RabbitConfig.DIRECT_QUEUE_NAME, durable = "true"),
                                        exchange = @Exchange(value = RabbitConfig.TEST_DIRECT_EXCHANGE),
                                        key = RabbitConfig.DIRECT_ROUTINGKEY)
                        })
        @RabbitHandler
        public void processDirectMsg(Message massage) {
            String msg = new String(massage.getBody(), StandardCharsets.UTF_8);
            log.info("从直连类型的交换机绑定的消息队列中消费消息success, message : {}" , msg);
        }
    
    
        /**
         * 通过通配符类型的交换机绑定的消息队列中消费消息
         * @param massage
         */
        @RabbitListener(
                bindings =
                        {
                                @QueueBinding(value = @Queue(value = RabbitConfig.TOPIC_QUEUE_NAME, durable = "true"),
                                        exchange = @Exchange(value = RabbitConfig.TEST_TOPIC_EXCHANGE, type = "topic"),
                                        key = RabbitConfig.TOPIC_ROUTINGKEY)
                        })
        @RabbitHandler
        public void processTopicMsg(Message massage) {
            String msg = new String(massage.getBody(), StandardCharsets.UTF_8);
            log.info("从通配符类型的交换机绑定的消息队列中消费消息success, message : {}" + msg);
        }
    
    }
    
    
    

    6 .controller

    package com.example.demo.rabbitmq.controller;
    
    import com.example.demo.rabbitmq.config.RabbitConfig;
    import com.example.demo.rabbitmq.rabbitmq.MsgProducer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @program: demo-rabbitmq
     * @description
     * @author: tina.liu
     * @create: 2020-05-16 21:49
     **/
    @RestController
    public class TestController {
    
        //注入消息生成者
        @Autowired
        private MsgProducer msgProducer;
    
        /**
         * 测试广播类型的交换器 消息队列
         */
        @GetMapping("/test/fanout")
        public String Test1(){
            msgProducer.send2FanoutTestQueue("借助广播类型的交换机 队列传递信息");
            return "利用广播类型的交换机绑定队列发送消息";
        }
    
        /**
         * 测试直连类型的交换机 消息队列
         */
        @GetMapping("/test/direct")
        public String Test2(){
            msgProducer.send2DirectTestQueue("借助直连类型的交换机 队列传递信息");
            return "利用直连类型的交换机绑定队列发送消息";
        }
    
        /**
         * 测试通配符交换机 消息队列
         */
        @GetMapping(value = "/test/topic/A")
        private  String test3(){
            msgProducer.send2TopicTestAQueue("借助通配符类型的交换机A,队列传递信息");
            return "利用通配符类型的交换机绑定队列发送消息A";
        }
    
        @GetMapping(value = "/test/topic/B")
        private  String test4(){
            msgProducer.send2TopicTestBQueue("借助通配符类型的交换机B,队列传递信息");
            return "利用通配符类型的交换机绑定队列发送消息B";
        }
    
    }
    
    
    
    

    相关文章

      网友评论

          本文标题:Spring boot 集成rabbitmq

          本文链接:https://www.haomeiwen.com/subject/qtwjohtx.html