美文网首页
Springboot集成RabbitMQ

Springboot集成RabbitMQ

作者: 呆叔么么 | 来源:发表于2020-01-17 00:30 被阅读0次

    一.简单队列

    1.配置pom文件,主要是添加spring-boot-starter-amqp的支持

    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    2.配置application.properties文件

    spring:
      application:
        name: spirng-boot-rabbitmq
      rabbitmq:
        host: 49.235.110.134
        port: 5672
        username: root
        password: root
    

    3.配置队列

    package cn.lovingliu.rabbitmq.config;
    
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @Author:LovingLiu
     * @Description: 配置队列
     * @Date:Created in 2020-01-16
     */
    @Configuration
    public class RabbitConfig {
        @Bean
        public Queue queue() {
            return new Queue("q_rabbit");
        }
    }
    

    4.生产者

    package cn.lovingliu.rabbitmq.producer;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * @Author:LovingLiu
     * @Description: 生产者
     * @Date:Created in 2020-01-16
     */
    @Component
    public class Producer {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); //24小时制
            String context = "hello " + date;
            System.out.println("Sender : " + context);
            //简单对列的情况下routingKey即为Queue名
            this.rabbitTemplate.convertAndSend("q_rabbit", context);
        }
    }
    

    5.接收者

    package cn.lovingliu.rabbitmq.consumer;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @Author:LovingLiu
     * @Description: 消费者
     * @Date:Created in 2020-01-16
     */
    @Component
    @RabbitListener(queues = "q_rabbit")
    public class Receiver {
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver  : " + hello);
        }
    }
    

    6.测试

    package cn.lovingliu.rabbitmq.test;
    
    import cn.lovingliu.rabbitmq.producer.Producer;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    /**
     * @Author:LovingLiu
     * @Description:
     * @Date:Created in 2020-01-16
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class Test {
        @Autowired
        private Producer producer;
    
        @org.junit.Test
        public void oneToOne() throws Exception {
            producer.send();
        }
    }
    
    简单队列

    二.工作队列(work)

    1.创建两个消费者
    cn.lovingliu.rabbitmq_work.consumer.ReceiverWork1

    package cn.lovingliu.rabbitmq_work.consumer;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @Author:LovingLiu
     * @Description: 消费者1
     * @Date:Created in 2020-01-16
     */
    @Component
    @RabbitListener(queues = "q_rabbit_work")
    public class ReceiverWork1 {
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver1  : " + hello);
        }
    }
    

    cn.lovingliu.rabbitmq_work.consumer.ReceiverWork2

    package cn.lovingliu.rabbitmq_work.consumer;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @Author:LovingLiu
     * @Description: 消费者2
     * @Date:Created in 2020-01-16
     */
    @Component
    @RabbitListener(queues = "q_rabbit_work")
    public class ReceiverWork2 {
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver2  : " + hello);
        }
    }
    

    2.消费者

    package cn.lovingliu.rabbitmq_work.producer;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * @Author:LovingLiu
     * @Description: 生产者
     * @Date:Created in 2020-01-16
     */
    @Component
    public class Producer {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send(int i) {
            String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
            String context = "hello " + i + " " + date;
            System.out.println("Sender : " + context);
            //简单对列的情况下routingKey即为Q名
            this.rabbitTemplate.convertAndSend("q_rabbit_work", context);
        }
    }
    

    3.测试

    package cn.lovingliu.rabbitmq_work.test;
    
    
    import cn.lovingliu.rabbitmq_work.producer.Producer;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    /**
     * @Author:LovingLiu
     * @Description: work 工作队列测试
     * @Date:Created in 2020-01-16
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class Test {
        @Autowired
        private Producer producer;
    
        @org.junit.Test
        public void oneToMany() throws Exception {
            for (int i=0;i<100;i++){
                producer.send(i);
                Thread.sleep(300);
            }
        }
    }
    

    三.Topic Exchange(主题模式)

    topicRabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列.
    1.配置队列,绑定交换机

    package cn.lovingliu.rabbitmq_topic.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @Author:LovingLiu
     * @Description: 配置队列,绑定交换机
     * @Date:Created in 2020-01-16
     */
    @Configuration
    public class TopicRabbitConfig {
        public final static String QUEUE_NAME_1 = "q_rabbit_topic_1";
        public final static String QUEUE_NAME_2 = "q_rabbit_topic_2";
        public final static String EXCHANGE_NAME = "my_exchange";
    
        @Bean
        public Queue queue1() {
            return new Queue(TopicRabbitConfig.QUEUE_NAME_1);
        }
    
        @Bean
        public Queue queue2() {
            return new Queue(TopicRabbitConfig.QUEUE_NAME_2);
        }
    
        /**
         * 声明一个Topic类型的交换机
         * @return
         */
        @Bean
        TopicExchange exchange() {
            return new TopicExchange(EXCHANGE_NAME);
        }
    
        /**
         * 绑定Q到交换机,并且指定routingKey
         * @param queue1
         * @param exchange
         * @return
         */
        @Bean
        Binding bindingExchangeMessage(Queue queue1, TopicExchange exchange) {
            return BindingBuilder.bind(queue1).to(exchange).with("topic.1.*");
        }
    
        @Bean
        Binding bindingExchangeMessages(Queue queue2, TopicExchange exchange) {
            return BindingBuilder.bind(queue2).to(exchange).with("topic.2.*");
        }
    }
    

    2.创建2个消费者

    package cn.lovingliu.rabbitmq_topic.consumer;
    
    import cn.lovingliu.rabbitmq_topic.config.TopicRabbitConfig;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @Author:LovingLiu
     * @Description: 消费者1
     * @Date:Created in 2020-01-16
     */
    @Component
    @RabbitListener(queues = TopicRabbitConfig.QUEUE_NAME_1)
    public class ReceiverTopic1 {
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver1  : " + hello);
        }
    }
    
    package cn.lovingliu.rabbitmq_topic.consumer;
    
    import cn.lovingliu.rabbitmq_topic.config.TopicRabbitConfig;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @Author:LovingLiu
     * @Description: 消费者2
     * @Date:Created in 2020-01-16
     */
    @Component
    @RabbitListener(queues = TopicRabbitConfig.QUEUE_NAME_2)
    public class ReceiverTopic2 {
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver2 : " + hello);
        }
    }
    

    3.消息发送者(生产者)

    package cn.lovingliu.rabbitmq_topic.producer;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * @Author:LovingLiu
     * @Description: 生产者
     * @Date:Created in 2020-01-16
     */
    @Component
    public class Producer {
        private static final String EXCHANGE_NAME = "my_exchange";
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send1() {
            String context = "hi, queue 1 message";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend(EXCHANGE_NAME, "topic.1.message", context);
        }
    
    
        public void send2() {
            String context = "hi, queue 2 message";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend(EXCHANGE_NAME, "topic.2.messages", context);
        }
    }
    

    4.测试

    package cn.lovingliu.rabbitmq_topic.test;
    
    
    import cn.lovingliu.rabbitmq_topic.producer.Producer;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    /**
     * @Author:LovingLiu
     * @Description: work 工作队列测试
     * @Date:Created in 2020-01-16
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class Test {
        @Autowired
        private Producer producer;
    
        @org.junit.Test
        public void send1() throws Exception {
            producer.send1();
        }
    
        @org.junit.Test
        public void send2() throws Exception {
            producer.send2();
        }
    }
    
    运行结果

    4.Fanout Exchange(订阅模式)

    Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

    1.配置队列,绑定交换机

    package cn.lovingliu.rabbitmq_fanout.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @Author:LovingLiu
     * @Description: 配置队列,绑定交换机
     * @Date:Created in 2020-01-16
     */
    @Configuration
    public class FanoutRabbitConfig {
        public final static String QUEUE_NAME_A = "q_rabbit_fanout_a";
        public final static String QUEUE_NAME_B = "q_rabbit_fanout_b";
        public final static String QUEUE_NAME_C = "q_rabbit_fanout_c";
        public final static String EXCHANGE_NAME = "my_fanout_exchange";
    
    
        @Bean
        public Queue aQueue() {
            return new Queue(QUEUE_NAME_A);
        }
    
        @Bean
        public Queue bQueue() {
            return new Queue(QUEUE_NAME_B);
        }
    
        @Bean
        public Queue cQueue() {
            return new Queue(QUEUE_NAME_C);
        }
    
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange(EXCHANGE_NAME);
        }
    
        @Bean
        Binding bindingExchangeA(Queue aQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(aQueue).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingExchangeB(Queue bQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(bQueue).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingExchangeC(Queue cQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(cQueue).to(fanoutExchange);
        }
    }
    

    2.创建3个消费者

    cn.lovingliu.rabbitmq_fanout.consumer.ReceiverFanoutA

    package cn.lovingliu.rabbitmq_fanout.consumer;
    
    import cn.lovingliu.rabbitmq_fanout.config.FanoutRabbitConfig;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @Author:LovingLiu
     * @Description: 消费者A
     * @Date:Created in 2020-01-17
     */
    @Component
    @RabbitListener(queues = FanoutRabbitConfig.QUEUE_NAME_A)
    public class ReceiverFanoutA {
        @RabbitHandler
        public void process(String hello) {
            System.out.println("AReceiver  : " + hello + "/n");
        }
    }
    

    cn.lovingliu.rabbitmq_fanout.consumer.ReceiverFanoutB

    package cn.lovingliu.rabbitmq_fanout.consumer;
    
    import cn.lovingliu.rabbitmq_fanout.config.FanoutRabbitConfig;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @Author:LovingLiu
     * @Description: 消费者B
     * @Date:Created in 2020-01-17
     */
    @Component
    @RabbitListener(queues = FanoutRabbitConfig.QUEUE_NAME_B)
    public class ReceiverFanoutB {
        @RabbitHandler
        public void process(String hello) {
            System.out.println("BReceiver  : " + hello + "/n");
        }
    }
    

    cn.lovingliu.rabbitmq_fanout.consumer.ReceiverFanoutC

    package cn.lovingliu.rabbitmq_fanout.consumer;
    
    import cn.lovingliu.rabbitmq_fanout.config.FanoutRabbitConfig;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @Author:LovingLiu
     * @Description: 消费者C
     * @Date:Created in 2020-01-17
     */
    @Component
    @RabbitListener(queues = FanoutRabbitConfig.QUEUE_NAME_C)
    public class ReceiverFanoutC {
        @RabbitHandler
        public void process(String hello) {
            System.out.println("CReceiver  : " + hello + "/n");
        }
    }
    

    3.生产者

    cn.lovingliu.rabbitmq_fanout.producer.Producer

    package cn.lovingliu.rabbitmq_fanout.producer;
    
    import cn.lovingliu.rabbitmq_fanout.config.FanoutRabbitConfig;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * @Author:LovingLiu
     * @Description:
     * @Date:Created in 2020-01-17
     */
    
    @Component
    public class Producer {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String context = "hi, fanout msg ";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend(FanoutRabbitConfig.EXCHANGE_NAME,"", context);
        }
    }
    

    4.测试

    package cn.lovingliu.rabbitmq_fanout.test;
    
    import cn.lovingliu.rabbitmq_fanout.producer.Producer;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    /**
     * @Author:LovingLiu
     * @Description: 测试
     * @Date:Created in 2020-01-17
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class Test {
        @Autowired
        private Producer producer;
    
        @org.junit.Test
        public void send1() throws Exception {
            producer.send();
        }
    }
    

    运行截图

    生产者
    消费者

    相关文章

      网友评论

          本文标题:Springboot集成RabbitMQ

          本文链接:https://www.haomeiwen.com/subject/adpqzctx.html