美文网首页
使用SpringBoot测试RabbitMQ的Topic模式

使用SpringBoot测试RabbitMQ的Topic模式

作者: 简辣椒 | 来源:发表于2019-03-24 18:25 被阅读0次

    为了演示Topic模式,所以模拟一个需求:

    用户A想接收开封市的天气
    用户B想接收开封市的新闻
    用户C想接收开封市的新闻和天气

    1.创建基于SpringBoot的生产者模块和消费者模块,两个模块结构相同,在hello包下创建SpringBoot主启动类。
    2.引入starter
    <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
            </dependency>
            <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
        </dependencies>
    
    3.在resources包下创建application.yml文件,内容如下(根据实际情况填写地址用户名密码等等):
    4.在生产者模块创建config包,在此包下创建一个配置类(功能是声明队列,声明交换机,声明RoutingKey,声明队列和交换机的绑定关系),结构和内容如下:
    package com.hello.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitMQConfig {
        //为了方便演示我都使用的public
        //队列
        public static final String QUEUE_NEWS = "queue_news";
        public static final String QUEUE_WEATHER = "queue_weather";
        public static final String QUEUE_NEWS_WEATHER = "queue_news_weather";
        //交换机
        public static final String EXCHANGE_TOPIC_KAIFENG = "exchange_topic_kaifeng";
        //交换机与队列绑定的RoutingKey
        public static final String ROUTINGKEY_NEWS = "*.news";
        public static final String ROUTINGKEY_WEATHER = "*.weather";
        public static final String ROUTINGKEY_ALL = "kaifeng.*";
    
        //声明队列
        @Bean(QUEUE_NEWS)
        public Queue QUEUE_NEWS(){    //新闻的队列
            return new Queue(QUEUE_NEWS);
        }
        @Bean(QUEUE_WEATHER)
        public Queue QUEUE_WEATHER(){  //天气的队列
            return new Queue(QUEUE_WEATHER);
        }
        @Bean(QUEUE_NEWS_WEATHER)
        public Queue QUEUE_NEWS_WEATHER(){  //新闻和天气的队列
            return new Queue(QUEUE_NEWS_WEATHER);
        }
    
        //声明交换机
        @Bean(EXCHANGE_TOPIC_KAIFENG)
        public Exchange EXCHANGE_TOPIC_INFORM(){
            //声明了一个Topic类型的交换机,durable是持久化(重启rabbitmq这个交换机不会被自动删除)
            return ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_KAIFENG).durable(true).build();
        }
    
        //声明news队列和交换机绑定关系,并且指定RoutingKey
        @Bean
        public Binding NEWS_BINDING_TOPIC(@Qualifier(QUEUE_NEWS) Queue queue,
                                          @Qualifier(EXCHANGE_TOPIC_KAIFENG) Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_NEWS).noargs();
        }
        //声明weather队列和交换机绑定关系,并且指定RoutingKey
        @Bean
        public Binding WEATHER_BINDING_TOPIC(@Qualifier(QUEUE_WEATHER) Queue queue,
                                          @Qualifier(EXCHANGE_TOPIC_KAIFENG) Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_WEATHER).noargs();
        }
        //声明news+weather队列和交换机绑定关系,并且指定RoutingKey
        @Bean
        public Binding NEWS_WEATHER_BINDING_TOPIC(@Qualifier(QUEUE_NEWS_WEATHER) Queue queue,
                                           @Qualifier(EXCHANGE_TOPIC_KAIFENG) Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_ALL).noargs();
        }
    }
    
    5. 在测试类发送消息
    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class RabbitTest {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void sendNews(){
            //指定交换机,指定routing key,发送消息的内容
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPIC_KAIFENG,"kaifeng.news","开封今年粮食产量提升10%");
        }
        @Test
        public void sendWeather(){
            //指定交换机,指定routing key,发送消息的内容
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPIC_KAIFENG,"kaifeng.weather","开封明天白天多云15℃");
        }
    }
    
    6.当前整个结构如下
    目的.png
    7.我们也可以Rabbit网页管理平台上面查看队列和交换机都已经创建,交换机绑定的队列关系也已经绑定,消息因为没有消费者所以也是存储在MQ中。
    交换机和队列的绑定关系
    7.在消费者模块创建一个类MQConsumer接收消息
    123.png
    内容如下:
    package com.hello.mq;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MQConsumer {
       //模拟三个用户接收
       @RabbitListener(queues = {"queue_weather"})
       public void getMessageA(String msg){
           System.out.println("A用户想看天气接收到:" + msg);
       }
       @RabbitListener(queues = {"queue_news"})
       public void getMessageB(Message message){  //我们也可以用Message作为参数接收
           byte[] body = message.getBody();
           String msg = new String(body);
           System.out.println("B用户想看新闻接收到:" + msg);
       }
       @RabbitListener(queues = {"queue_news_weather"})
       public void getMessageC(String msg){
           System.out.println("C用户想看新闻和天气接收到:" + msg);
       }
    }
    
    运行消费者程序收到消息打印的结果:
    打印结果

    相关文章

      网友评论

          本文标题:使用SpringBoot测试RabbitMQ的Topic模式

          本文链接:https://www.haomeiwen.com/subject/mawuvqtx.html