美文网首页中间件
springboot rabbitmq不同交换机类型实战

springboot rabbitmq不同交换机类型实战

作者: dayue_ | 来源:发表于2021-02-28 01:23 被阅读0次

    RabbitMQ常用的交换器类型有fanout、direct、topic、headers这四种,其中headers实际很少用到。
    fanout:把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
    direct:匹配规则相对简单,把消息路由到交换机和路由键RoutingKey绑定的队列中。
    topic:匹配规则灵活,路由键RoutingKey可使用通配符" * " 和 “ # ”,代表匹配一个单词和任意单词。

    代码目录结构如图,分别演示三种交换机类型:


    三种交换机类型.png
    一、fanout广播消息模型

    fanout广播交换机,当有多个消息队列需要监听同个消息进行不同的业务处理的时候,那么可以采用广播交换机,一个广播交换机,多个消息队列绑定该交换机,在发送消息的时候把消息发送到该交换机上,那么在监听消息端多个消息队列将监听到该消息,如图所示。


    fanout交换机流程图.jpeg

    (1)首先在RabbitmqConfig中创建两个队列,fanoutQueueOne和fanoutQueueTwo,交换机类型为FanoutExchange,最后是两个队列与FanoutExchange交换机绑定起来。

    @Slf4j
    @Configuration
    public class RabbitmqConfig {
        /**
         * 创建消息模型-fanoutExchange
         */
        //广播fanout消息模型-队列1
        @Bean
        public Queue fanoutQueueOne(){
            return new Queue(RabbitMqConstants.FANOUT_ONE_QUEUE,true);
        }
        //广播fanout消息模型-队列2
        @Bean
        public Queue fanoutQueueTwo(){
            return new Queue(RabbitMqConstants.FANOUT_TWO_QUEUE,true);
        }
        //广播fanout消息模型-创建交换机-fanoutExchange
        @Bean
        public FanoutExchange fanoutExchange(){
            return new FanoutExchange(RabbitMqConstants.FANOUT_EXCHANGE,true,false);
        }
        //广播fanout消息模型-创建绑定1
        @Bean
        public Binding fanoutBindingOne(){
            return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
        }
        //广播fanout消息模型-创建绑定2
        @Bean
        public Binding fanoutBindingTwo(){
            return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
        }
    }
    

    RabbitMqConstants常量值如下:

    @Data
    public class RabbitMqConstants {
        //广播fanoutExchange消息模型
        public static final String FANOUT_ONE_QUEUE = "mq.fanout.one.queue";
        public static final String FANOUT_TWO_QUEUE = "mq.fanout.two.queue";
        public static final String FANOUT_EXCHANGE = "mq.fanout.exchange";
    }
    

    (2)启动项目,访问http://127.0.0.1:15672/可以看到我们设置的队列交换机以及绑定的路由相关信息:

    fanout两个消息队列.png
    fanout广播交换机.png
    fanout广播交换机详情.png
    fanout消息队列详情-绑定关系.png

    (3)fanout广播消息模型-生产者FanoutPublisher,RabbitMQ发送消息的操作组件RabbitTemplate设置fanout广播交换机,最后发送消息。

    @Slf4j
    @Component
    public class FanoutPublisher {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        /**
         * 发送消息
         * @param order 订单消息
         */
        public void sendMsg(Order order){
            try {
                //设置广播式交换机FanoutExchange
                rabbitTemplate.setExchange(RabbitMqConstants.FANOUT_EXCHANGE);
                //发送消息
                rabbitTemplate.convertAndSend(order);
                //打印日志
                log.info("消息模型fanoutExchange-生产者-发送消息:{} ", order);
            }catch (Exception e){
                log.error("消息模型fanoutExchange-生产者-发送消息:{},发生异常: ", order, e);
            }
        }
    }
    

    (4)fanout广播消息模型-消费者FanoutConsumer,前面设置了两个队列,这里设置两个队列进行监听。

    @Slf4j
    @Component
    public class FanoutConsumer {
        /**
         * 监听并消费队列中的消息-fanoutExchange-one-这是第一条队列对应的消费者
         */
        @RabbitListener(queues = RabbitMqConstants.FANOUT_ONE_QUEUE,containerFactory = "singleListenerContainer")
        public void consumeFanoutMsgOne(Order order){
            try {
                log.info("消息模型fanoutExchange-one-消费者-监听消费到消息:{} ",order);
            }catch (Exception e){
                log.error("消息模型-消费者-发生异常:",e);
            }
        }
        /**
         * 监听并消费队列中的消息-fanoutExchange-two-这是第二条队列对应的消费者
         */
        @RabbitListener(queues = RabbitMqConstants.FANOUT_TWO_QUEUE,containerFactory = "singleListenerContainer")
        public void consumeFanoutMsgTwo(Order order){
            try {
                log.info("消息模型fanoutExchange-two-消费者-监听消费到消息:{} ",order);
            }catch (Exception e){
                log.error("消息模型-消费者-发生异常:",e);
            }
        }
    }
    

    (5)最后调用test方法发送消息

        @Test
        public void testFanoutPublish() {
            Order order = new Order();
            order.setOrdernum("123456");
            fanoutPublisher.sendMsg(order);
        }
    
    测试fanout广播模型.png fanout消息队列监听消息.png
    二、direct直连传输消息模型

    direct交换机相对严谨,不像fanout广播交换机,direct交换机发送消息到消息队列的时候有一个路由规则,即路由键,这个路由键将指引交换机把消息指定到对应的队列之中进行消费,在实际开发中,direct交换机比较常用,当有某个特定消息需要被某一个队列进行消费处理的时候,可采用direct交换机。


    direct交换机流程图.jpeg

    (1)同样在RabbitmqConfig 配置类中创建两个队列directQueueOne、directQueueTwo,由directExchange用分别用两个路由键"mq.direct.routing.key.one"和"mq.direct.routing.key.two"绑定起来。

    @Slf4j
    @Configuration
    public class RabbitmqConfig {
        /**
         * 创建消息模型-directExchange
         */
        //直连传输direct消息模型-创建交换机-directExchange
        @Bean
        public DirectExchange directExchange(){
            return new DirectExchange(RabbitMqConstants.DIRECT_EXCHANGE,true,false);
        }
        //直连传输direct消息模型-创建队列1
        @Bean
        public Queue directQueueOne(){
            return new Queue(RabbitMqConstants.DIRECT_ONE_QUEUE,true);
        }
        //直连传输direct消息模型-创建队列2
        @Bean
        public Queue directQueueTwo(){
            return new Queue(RabbitMqConstants.DIRECT_TWO_QUEUE,true);
        }
        //直连传输direct消息模型-创建绑定1
        @Bean
        public Binding directBindingOne(){
            return BindingBuilder.bind(directQueueOne()).to(directExchange()).with(RabbitMqConstants.DIRECT_ONE_ROUTING_KEY);
        }
        //直连传输direct消息模型-创建绑定2
        @Bean
        public Binding directBindingTwo(){
            return BindingBuilder.bind(directQueueTwo()).to(directExchange()).with(RabbitMqConstants.DIRECT_TWO_ROUTING_KEY);
        }
    }
    

    RabbitMqConstants常量值如下:

    @Data
    public class RabbitMqConstants {
        //直连directExchange消息模型
        public static final String DIRECT_ONE_QUEUE = "mq.direct.one.queue";
        public static final String DIRECT_TWO_QUEUE = "mq.direct.two.queue";
        public static final String DIRECT_ONE_ROUTING_KEY = "mq.direct.routing.key.one";
        public static final String DIRECT_TWO_ROUTING_KEY = "mq.direct.routing.key.two";
        public static final String DIRECT_EXCHANGE = "mq.direct.exchange";
    }
    

    (2)启动项目,访问http://127.0.0.1:15672/可以看到我们设置的队列交换机以及绑定的路由相关信息:


    direct两个消息队列.png direct直连传输交换机.png direct直连传输交换机详情.png direct消息队列详情-绑定关系.png

    (3)directExchange直连传输消息模型-生产者DirectPublisher

    @Slf4j
    @Component
    public class DirectPublisher {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 发送消息-基于DirectExchange消息模型-one
         */
        public void sendMsgDirectOne(Order order){
            try {
                //设置交换机
                rabbitTemplate.setExchange(RabbitMqConstants.DIRECT_EXCHANGE);
                //设置路由1
                rabbitTemplate.setRoutingKey(RabbitMqConstants.DIRECT_ONE_ROUTING_KEY);
                //发送消息
                rabbitTemplate.convertAndSend(order);
                //打印日志
                log.info("消息模型DirectExchange-one-生产者-发送消息:{} ",order);
            }catch (Exception e){
                log.error("消息模型DirectExchange-one-生产者-发送消息:{},发生异常:{} ",order, e);
            }
        }
        /**
         * 发送消息-基于DirectExchange消息模型-two
         */
        public void sendMsgDirectTwo(Order order){
            try {
                //设置交换机
                rabbitTemplate.setExchange(RabbitMqConstants.DIRECT_EXCHANGE);
                //设置路由2
                rabbitTemplate.setRoutingKey(RabbitMqConstants.DIRECT_TWO_ROUTING_KEY);
                //发送消息
                rabbitTemplate.convertAndSend(order);
                //打印日志
                log.info("消息模型DirectExchange-two-生产者-发送消息:{} ",order);
            }catch (Exception e){
                log.error("消息模型DirectExchange-two-生产者-发送消息:{},发生异常:{} ",order, e);
            }
        }
    }
    

    (4)directExchange直连传输消息模型-消费者DirectConsumer

    @Slf4j
    @Component
    public class DirectConsumer {
    
        /** 这是第一个路由绑定的对应队列的消费者方法
         * 监听并消费队列中的消息-directExchange-one
         */
        @RabbitListener(queues = RabbitMqConstants.DIRECT_ONE_QUEUE,containerFactory = "singleListenerContainer")
        public void consumeDirectMsgOne(Order order){
            try {
                //打印日志消息
                log.info("消息模型directExchange-one-消费者-监听消费到消息:{} ",order);
            }catch (Exception e){
                log.error("消息模型directExchange-one-消费者-监听消费发生异常:",e);
            }
        }
    
        /**
         * 这是第二个路由绑定的对应队列的消费者方法
         * 监听并消费队列中的消息-directExchange-two
         */
        @RabbitListener(queues = RabbitMqConstants.DIRECT_TWO_QUEUE, containerFactory = "singleListenerContainer")
        public void consumeDirectMsgTwo(Order order) {
            try {
                //打印日志消息
                log.info("消息模型directExchange-two-消费者-监听消费到消息:{} ", order);
            } catch (Exception e) {
                log.error("消息模型directExchange-two-消费者-监听消费发生异常:", e);
            }
        }
    }
    

    (5)最后调用test方法发送消息

        @Test
        public void testDirectPublish() {
            Order order1 = new Order();
            order1.setOrdernum("one-123456");
    
            Order order2 = new Order();
            order2.setOrdernum("tow-123456");
    
            directPublisher.sendMsgDirectOne(order1);
            directPublisher.sendMsgDirectTwo(order2);
        }
    
    测试direct直连传输消息模型.png
    direct消息队列监听消息.png
    三、topic主题消息模型

    topic交换机相对灵活,路由键规则有通配符" * " 和 " # "符号代替了一个单词和零或者多个单词,例如当路由键有用通配符" * "符号的时候,即有一个路由键为“mq.topic.routing.key.*”,那么在发送消息的时候,生产者设置了路由键为“mq.topic.routing.key.one”、“mq.topic.routing.key.two”、“mq.topic.routing.key.three”等等,都可以将该消息发送到topic交换机路由键为“mq.topic.routing.key.”绑定的消息队列中,最终被监听到。

    topic交换机流程图.jpeg
    (1)同样在RabbitmqConfig 配置类中创建两个队列topicQueueOne、topicQueueTwo,由topicExchange用分别用两个路由键"mq.topic.routing.key.
    "和"mq.topic.routing.key.#"绑定起来。
    @Slf4j
    @Configuration
    public class RabbitmqConfig {
        //主题topic消息模型-创建交换机-topicExchange
        @Bean
        public TopicExchange topicExchange(){
            return new TopicExchange(RabbitMqConstants.TOPIC_EXCHANGE,true,false);
        }
        //主题topic消息模型-创建队列1
        @Bean
        public Queue topicQueueOne(){
            return new Queue(RabbitMqConstants.TOPIC_ONE_QUEUE,true);
        }
        //主题topic消息模型-创建队列2
        @Bean
        public Queue topicQueueTwo(){
            return new Queue(RabbitMqConstants.TOPIC_TWO_QUEUE,true);
        }
        //主题topic消息模型-创建绑定-通配符为*的路由
        @Bean
        public Binding topicBindingOne(){
            return BindingBuilder.bind(topicQueueOne()).to(topicExchange()).with(RabbitMqConstants.TOPIC_ONE_ROUTING_KEY);
        }
    
        //主题topic消息模型-创建绑定-通配符为#的路由
        @Bean
        public Binding topicBindingTwo(){
            return BindingBuilder.bind(topicQueueTwo()).to(topicExchange()).with(RabbitMqConstants.TOPIC_TWO_ROUTING_KEY);
        }
    }
    

    RabbitMqConstants常量值如下:

    @Data
    public class RabbitMqConstants {
        //主题topicExchange消息模型
        public static final String TOPIC_ONE_QUEUE = "mq.topic.one.queue";
        public static final String TOPIC_TWO_QUEUE = "mq.topic.two.queue";
        public static final String TOPIC_ONE_ROUTING_KEY = "mq.topic.routing.key.*";
        public static final String TOPIC_TWO_ROUTING_KEY = "mq.topic.routing.key.#";
        public static final String TOPIC_EXCHANGE = "mq.topic.exchange";
    }
    

    (2)启动项目,访问http://127.0.0.1:15672/可以看到我们设置的队列交换机以及绑定的路由相关信息:

    topic两个消息队列.png
    topic交换机.png
    topic交换机详情.png
    topic消息队列绑定关系.png

    (3)topicExchange消息模型-生产者topicPublisher

    @Slf4j
    @Component
    public class TopicPublisher {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 发送消息-基于TopicExchange消息模型
         */
        public void sendMsgTopic(Order order, String routingKey){
            try {
                //指定交换机
                rabbitTemplate.setExchange(RabbitMqConstants.TOPIC_EXCHANGE);
                //指定路由的实际取值,根据不同取值,RabbitMQ将自行进行匹配通配符,从而路由到不同的队列中
                rabbitTemplate.setRoutingKey(routingKey);
                //发送消息
                rabbitTemplate.convertAndSend(order);
                //打印日志
                log.info("消息模型TopicExchange-生产者-发送消息:{},路由:{} ", order, routingKey);
            } catch (Exception e) {
                log.error("消息模型TopicExchange-生产者-发送消息:{},发生异常:{} ", order, e);
            }
        }
    }
    

    (4)topicExchange消息模型-消费者topicConsumer

    @Slf4j
    @Component
    public class TopicConsumer {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 监听并消费队列中的消息-topicExchange-*通配符
         */
        @RabbitListener(queues = RabbitMqConstants.TOPIC_ONE_QUEUE, containerFactory = "singleListenerContainer")
        public void consumeTopicMsgOne(Order order) {
            try {
                log.info("消息模型topicExchange-*-消费者-监听消费到消息:{} ", order);
            } catch (Exception e) {
                log.error("消息模型topicExchange-*-消费者-监听消费发生异常:", e);
            }
        }
    
        /**
         * 监听并消费队列中的消息-topicExchange-#通配符
         */
        @RabbitListener(queues = RabbitMqConstants.TOPIC_TWO_QUEUE, containerFactory = "singleListenerContainer")
        public void consumeTopicMsgTwo(Order order) {
            try {
                log.info("消息模型topicExchange-#-消费者-监听消费到消息:{} ", order);
            } catch (Exception e) {
                log.error("消息模型topicExchange-#-消费者-监听消费发生异常:", e);
            }
        }
    }
    

    (5)最后调用test方法发送消息,路由键有:mq.topic.routing.key.java、mq.topic.routing.key.php.python、mq.topic.routing.key。

        @Test
        public void testTopicPublish() {
            //此时相当于*,即java替代了*的位置
            //当然由于#表示任意单词,因而也将路由到#表示的路由和对应的队列中
            String routingKeyOne="mq.topic.routing.key.java";
            //此时相当于#:即 php.python 替代了#的位置
            String routingKeyTwo="mq.topic.routing.key.php.python";
            //此时相当于#:即0个单词
            String routingKeyThree="mq.topic.routing.key";
            Order order = new Order();
            order.setOrdernum("123456");
            topicPublisher.sendMsgTopic(order,routingKeyOne);
            //topicPublisher.sendMsgTopic(order,routingKeyTwo);
            //topicPublisher.sendMsgTopic(order,routingKeyThree);
        }
    
    topic test方法.png topic test结果.png

    上一篇博客:springboot rabbitmq入门使用
    下一篇博客:springboot rabbitmq高可用消息确认消费实战

    参考资料:
    《分布式中间件实战》
    《rabbitmq实战指南》

    相关文章

      网友评论

        本文标题:springboot rabbitmq不同交换机类型实战

        本文链接:https://www.haomeiwen.com/subject/mrqbfltx.html