扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
1、Maven依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
2、配置文件
server:
port: 30200
spring:
rabbitmq:
host: 148.70.153.63
port: 5672
username: libai
password: password
3、生产者
3.1、配置文件
声明队列和交换机。
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue queueA() {
return new Queue("fanoutA", true, false, false);
}
@Bean
public Queue queueB() {
return new Queue("fanoutB", true, false, false);
}
@Bean
public Queue queueC() {
return new Queue("fanoutC", true, false, false);
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange", true, false);
}
@Bean
public Binding bindingExchangeA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}
@Bean
public Binding bindingExchangeB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}
@Bean
public Binding bindingExchangeC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
}
创建三个队列 :fanoutA
、fanoutB
、fanoutC
,将三个队列都绑定在交换机fanoutExchange
上。
因为是扇型交换机, 路由键无需配置,配置也不起作用。
3.2、发送消息
@RestController
public class SendMessageController {
@Autowired
private RabbitTemplate rabbitTemplate; // 使用RabbitTemplate,这提供了接收/发送等等方法
@PostMapping("/sendFanoutMessage")
public String sendFanoutMessage() {
Map<String, Object> map = new HashMap<>();
map.put("messageId", String.valueOf(UUID.randomUUID()));
map.put("messageData", "testFanoutMessage");
map.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
rabbitTemplate.convertAndSend("fanoutExchange", null, map);
return "ok";
}
}
发送到指定的交换机上,因为是扇形交换机,所以会把消息广播到所有和该交换机绑定的队列上。
启动服务,用postman
调用发送消息接口。
3.3、查看RabbitMQ
的后台管理界面
可以看到已经有3条消息推送到队列中,等待被消费。
消息预览
3.4、查看交换机
可以看到和3个队列绑定,每发到该交换机上的一条消息都会被广播到这3个队列上。
查看交换机
4、消费者
通过注解@RabbitListener
指定要消费的队列。
@Component
@RabbitListener(queues = "fanoutA")
@Slf4j
public class FanoutReceiverA {
@RabbitHandler
public void process(Map testMessage) {
log.info("FanoutReceiverA消费者收到消息:{}", JSONUtil.toJsonPrettyStr(testMessage));
}
}
@Component
@RabbitListener(queues = "fanoutB")
@Slf4j
public class FanoutReceiverB {
@RabbitHandler
public void process(Map testMessage) {
log.info("FanoutReceiverB消费者收到消息:{}", JSONUtil.toJsonPrettyStr(testMessage));
}
}
@Component
@RabbitListener(queues = "fanoutC")
@Slf4j
public class FanoutReceiverC {
@RabbitHandler
public void process(Map testMessage) {
log.info("FanoutReceiverC消费者收到消息:{}", JSONUtil.toJsonPrettyStr(testMessage));
}
}
重新启动服务,可以看到控制台打印输出,说明该条消息已经被3个消费者消费成功了。
2020-09-13 21:47:56,509 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] [net.zhaoxiaobin.rabbitmq.consumer.fanout.FanoutReceiverA:25] [] FanoutReceiverA消费者收到消息:{
"createTime": "2020-09-13 21:38:57",
"messageId": "765c1218-6c4c-4a72-af2c-6f386ff8e0d2",
"messageData": "testFanoutMessage"
}
2020-09-13 21:47:56,678 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#2-1] [net.zhaoxiaobin.rabbitmq.consumer.fanout.FanoutReceiverB:25] [] FanoutReceiverB消费者收到消息:{
"createTime": "2020-09-13 21:38:57",
"messageId": "765c1218-6c4c-4a72-af2c-6f386ff8e0d2",
"messageData": "testFanoutMessage"
}
2020-09-13 21:47:56,885 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#3-1] [net.zhaoxiaobin.rabbitmq.consumer.fanout.FanoutReceiverC:25] [] FanoutReceiverC消费者收到消息:{
"createTime": "2020-09-13 21:38:57",
"messageId": "765c1218-6c4c-4a72-af2c-6f386ff8e0d2",
"messageData": "testFanoutMessage"
}
Springboot整合RabbitMQ(一)——Direct直连交换机
Springboot整合RabbitMQ(三)——Topic主题交换机
网友评论