美文网首页
Springboot整合RabbitMQ(二)——Fanout扇

Springboot整合RabbitMQ(二)——Fanout扇

作者: 砒霜拌辣椒 | 来源:发表于2020-09-13 23:00 被阅读0次

    扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

    1、Maven依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>
    

    2、配置文件

    server:
      port: 30200
    
    spring:
      rabbitmq:
        host: 148.70.153.63
        port: 5672
        username: libai
        password: password
    

    3、生产者

    3.1、配置文件

    声明队列和交换机。

    @Configuration
    public class FanoutRabbitConfig {
        @Bean
        public Queue queueA() {
            return new Queue("fanoutA", true, false, false);
        }
    
        @Bean
        public Queue queueB() {
            return new Queue("fanoutB", true, false, false);
        }
    
        @Bean
        public Queue queueC() {
            return new Queue("fanoutC", true, false, false);
        }
    
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange", true, false);
        }
    
        @Bean
        public Binding bindingExchangeA() {
            return BindingBuilder.bind(queueA()).to(fanoutExchange());
        }
    
        @Bean
        public Binding bindingExchangeB() {
            return BindingBuilder.bind(queueB()).to(fanoutExchange());
        }
    
        @Bean
        public Binding bindingExchangeC() {
            return BindingBuilder.bind(queueC()).to(fanoutExchange());
        }
    }
    

    创建三个队列 :fanoutAfanoutBfanoutC,将三个队列都绑定在交换机fanoutExchange上。

    因为是扇型交换机, 路由键无需配置,配置也不起作用。

    3.2、发送消息

    @RestController
    public class SendMessageController {
        @Autowired
        private RabbitTemplate rabbitTemplate;  // 使用RabbitTemplate,这提供了接收/发送等等方法
    
        @PostMapping("/sendFanoutMessage")
        public String sendFanoutMessage() {
            Map<String, Object> map = new HashMap<>();
            map.put("messageId", String.valueOf(UUID.randomUUID()));
            map.put("messageData", "testFanoutMessage");
            map.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            rabbitTemplate.convertAndSend("fanoutExchange", null, map);
            return "ok";
        }
    }
    

    发送到指定的交换机上,因为是扇形交换机,所以会把消息广播到所有和该交换机绑定的队列上。

    启动服务,用postman调用发送消息接口。

    3.3、查看RabbitMQ的后台管理界面

    可以看到已经有3条消息推送到队列中,等待被消费。


    消息预览

    3.4、查看交换机

    可以看到和3个队列绑定,每发到该交换机上的一条消息都会被广播到这3个队列上。


    查看交换机

    4、消费者

    通过注解@RabbitListener指定要消费的队列。

    @Component
    @RabbitListener(queues = "fanoutA")
    @Slf4j
    public class FanoutReceiverA {
        @RabbitHandler
        public void process(Map testMessage) {
            log.info("FanoutReceiverA消费者收到消息:{}", JSONUtil.toJsonPrettyStr(testMessage));
        }
    }
    
    @Component
    @RabbitListener(queues = "fanoutB")
    @Slf4j
    public class FanoutReceiverB {
        @RabbitHandler
        public void process(Map testMessage) {
            log.info("FanoutReceiverB消费者收到消息:{}", JSONUtil.toJsonPrettyStr(testMessage));
        }
    }
    
    @Component
    @RabbitListener(queues = "fanoutC")
    @Slf4j
    public class FanoutReceiverC {
        @RabbitHandler
        public void process(Map testMessage) {
            log.info("FanoutReceiverC消费者收到消息:{}", JSONUtil.toJsonPrettyStr(testMessage));
        }
    }
    

    重新启动服务,可以看到控制台打印输出,说明该条消息已经被3个消费者消费成功了。

    2020-09-13 21:47:56,509 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] [net.zhaoxiaobin.rabbitmq.consumer.fanout.FanoutReceiverA:25] [] FanoutReceiverA消费者收到消息:{
        "createTime": "2020-09-13 21:38:57",
        "messageId": "765c1218-6c4c-4a72-af2c-6f386ff8e0d2",
        "messageData": "testFanoutMessage"
    }
    2020-09-13 21:47:56,678 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#2-1] [net.zhaoxiaobin.rabbitmq.consumer.fanout.FanoutReceiverB:25] [] FanoutReceiverB消费者收到消息:{
        "createTime": "2020-09-13 21:38:57",
        "messageId": "765c1218-6c4c-4a72-af2c-6f386ff8e0d2",
        "messageData": "testFanoutMessage"
    }
    2020-09-13 21:47:56,885 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#3-1] [net.zhaoxiaobin.rabbitmq.consumer.fanout.FanoutReceiverC:25] [] FanoutReceiverC消费者收到消息:{
        "createTime": "2020-09-13 21:38:57",
        "messageId": "765c1218-6c4c-4a72-af2c-6f386ff8e0d2",
        "messageData": "testFanoutMessage"
    }
    

    Springboot整合RabbitMQ(一)——Direct直连交换机
    Springboot整合RabbitMQ(三)——Topic主题交换机

    参考链接

    代码地址

    相关文章

      网友评论

          本文标题:Springboot整合RabbitMQ(二)——Fanout扇

          本文链接:https://www.haomeiwen.com/subject/autaektx.html