美文网首页
SpringBoot笔记--RabbitMQ发布订阅模式

SpringBoot笔记--RabbitMQ发布订阅模式

作者: 有活就干 | 来源:发表于2022-04-18 23:12 被阅读0次

    一、前言

    发布订阅模式,即producer发送者,发布一个消息,多个接收者都能获取到同样的消息。大致流程是,发送者将消息发送到指定的交换机,交换机将消息发布到绑定到该交换的所有队列里,接收者通过队列获取消息。


    发布订阅模式.png

    二、发送者Producer

    1.FanoutExchangeConfig.java

    新建交换机,并将队列A B C绑定到该交换机

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class FanoutExchangeConfig {
    
        /**
         * 新建交换机,并将队列A B C绑定到该交换机
         */
        public static final String FANOUT_EXCHANGE = "weather_fanout_exchange";
    
        public static final String FANOUT_EXCHANGE_QUEUE_A = "fanout_exchange_queue_a";
    
        public static final String FANOUT_EXCHANGE_QUEUE_B = "fanout_exchange_queue_b";
    
        public static final String FANOUT_EXCHANGE_QUEUE_C = "fanout_exchange_queue_c";
    
        /**
         * 创建A队列
         */
        @Bean
        public Queue queueA(){
            return new Queue(FANOUT_EXCHANGE_QUEUE_A);
        }
    
        /**
         * 创建B队列
         */
        @Bean
        public Queue queueB(){
            return new Queue(FANOUT_EXCHANGE_QUEUE_B);
        }
    
        /**
         * 创建C队列
         */
        @Bean
        public Queue queueC(){
            return new Queue(FANOUT_EXCHANGE_QUEUE_C);
        }
    
        /**
         * 创建fanout交换机
         * @return
         */
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange(FANOUT_EXCHANGE);
        }
    
        /**
         * 将A队列绑定到交换机上
         */
        @Bean
        Binding bindingExchangeA() {
            return BindingBuilder.bind(queueA()).to(fanoutExchange());
        }
    
        /**
         *  将B队列绑定到交换机上
         */
        @Bean
        Binding bindingExchangeB() {
        return BindingBuilder.bind(queueB()).to(fanoutExchange());
        }
    
        /**
         * 将C队列绑定到交换机上
         */
        @Bean
        Binding bindingExchangeC() {
            return BindingBuilder.bind(queueC()).to(fanoutExchange());
        }
    }
    
    fanout交换机.png

    2.发送消息到指定交换机

    @RestController
    @RequestMapping(value = "send")
    public class ProducerController {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("fanout-exchange")
        public void sendExchange(){
            String msg = "send msg to fanout exchange" + new Date().toString();
            rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE, null, msg);
        }
    }
    
    队列上的消息.png

    三、Consumer接收者

    1.RabbitMqConfig.java

    @Configuration
    public class RabbitMqConfig {
    
        public static final String FANOUT_EXCHANGE_QUEUE_A = "fanout_exchange_queue_a";
    
        public static final String FANOUT_EXCHANGE_QUEUE_B = "fanout_exchange_queue_b";
    
        public static final String FANOUT_EXCHANGE_QUEUE_C = "fanout_exchange_queue_c";
    }
    

    2.RabbitMqReceiver.java

    @Component
    public class RabbitMqReceiver {
    
        private final static Logger logger = LoggerFactory.getLogger(RabbitMqReceiver.class);
    
        @RabbitListener(queues = RabbitMqConfig.FANOUT_EXCHANGE_QUEUE_A)
        public void receiverQueueA(String msg, Channel channel, Message message) throws IOException {
            logger.info("receiverQueueA 接收到消息为:"+msg);
    }
    
        @RabbitListener(queues = RabbitMqConfig.FANOUT_EXCHANGE_QUEUE_B)
        public void receiverQueueB(String msg, Channel channel, Message message) throws IOException {
            logger.info("receiverQueueB 接收到消息为:"+msg);
    }
    
        @RabbitListener(queues = RabbitMqConfig.FANOUT_EXCHANGE_QUEUE_C)
        public void receiverQueueC(String msg, Channel channel, Message message) throws IOException {
            logger.info("receiverQueueC 接收到消息为:"+msg);
        }
    }
    
    reciver.png

    从日志可以看出来,每个队列都能收到相同的消息

    以上!

    相关文章

      网友评论

          本文标题:SpringBoot笔记--RabbitMQ发布订阅模式

          本文链接:https://www.haomeiwen.com/subject/ypujertx.html