【前置文章】
【本文内容】
本文主要介绍了通过AmqpAdmin
进行动态创建Queue,以使得程序的所有的instance都能接收到消息。
1. 订阅/发布模式下,固定queue存在的问题
我们通过下面的代码新建了一个fanout exchange,并新建了queue绑定到了该exchange上。
假设,我们的需求是通过某一个节点修改的值,这个事件需要发布到所有的节点上。
同一份代码,我们需要部署在2个服务器上。
本地测试:instance1使用的是8080端口。instance2使用的是8081端口。
@Configuration
public class FanoutConfig {
@Bean
public Declarables fanoutExchange() {
FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange");
Queue fanoutQueue = new Queue("fanout.queue", true);
return new Declarables(
fanoutQueue,
fanoutExchange,
bind(fanoutQueue).to(fanoutExchange));
}
@RabbitListener(queues = {"fanout.queue"})
public void receiveMessageFromFanout(String message) {
System.out.println("Received message from fanout.exchange: " + message);
}
}
因为是两个instance,我们需要在application.yaml
中配置端口,以便动态传入:
server.port=${PORT}
启动的时候需要在ide中配置VM Options=-DPORT=8080
或-DPORT=8081
,那么整个订阅模式的图如下:
data:image/s3,"s3://crabby-images/fb27a/fb27a2b51611c5acccb3c8b0ba09e4f24633bd23" alt=""
主要原因还是instance1或是instance2用的是同一份源代码,那么这里的queue都是fanout.queue,即只有一个queue。
【我们都知道同一个queue中的消息,只能被消费一次。也就是说上述的修改事件,在这个模式下只能被其中一个instance消费。】
【测试】
data:image/s3,"s3://crabby-images/186f9/186f9e9b74dd5936d4d7dfe086cc8c8ee6d147ba" alt=""
data:image/s3,"s3://crabby-images/7de49/7de49fd4b9d4fdbb090cd83e0db20158dd6cc6a7" alt=""
data:image/s3,"s3://crabby-images/9880b/9880b562e46a4f5de6596354c5cb152e21201bcb" alt=""
data:image/s3,"s3://crabby-images/ca174/ca1740b8d22da25a396e6f23315f35a61c3ed73f" alt=""
data:image/s3,"s3://crabby-images/8d2cd/8d2cdea1252ae4541876953d1d349c7ba73401bc" alt=""
2. 尝试动态建立Queue并进行绑定
动态queue的意思是每一份源代码在新建Queue的时候,并不是以固定的名字创建queue,而是会以随机的名字生成Queue使得运行时每个instance都有自己的Queue,具体拓扑图如下:data:image/s3,"s3://crabby-images/42976/42976c54e37c082984d7815a752297bd1e872557" alt=""
想要实现动态创建Queue,在Spring Boot中可以使用AmqpAdmin
。
@Configuration
public class AmqpAdminConfig {
@Autowired
private AmqpAdmin amqpAdmin;
@Autowired
private FanoutMessageService fanoutMessageService;
@Bean
public FanoutExchange fanoutAdminExchange() {
return new FanoutExchange("fanout.admin.exchange");
}
@Bean
public Queue fanoutQueue(FanoutExchange fanoutExchange) {
Queue randomQueue = amqpAdmin.declareQueue();
Binding binding = new Binding(randomQueue.getName(), Binding.DestinationType.QUEUE, fanoutExchange.getName(), "", null);
amqpAdmin.declareBinding(binding);
return randomQueue;
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, Queue fanoutQueue) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(fanoutQueue.getName());
container.setMessageListener(fanoutMessageService);
return container;
}
}
@Service
public class FanoutMessageService implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("received message from fanout.admin.exchange: " + new String(message.getBody()));
}
}
打开RabbitMQ Console,创建的exchange,queue如下:
data:image/s3,"s3://crabby-images/825a6/825a6307c5657eba2fb62986b2c755cae7c5217e" alt=""
通过amqpAdmin.declareQueue()
创建出来的Queue,auto-delete=true,即当程序down掉后这个queue会自动删除:
data:image/s3,"s3://crabby-images/b87fa/b87fabaa440c9ca09cffafdd27476c18c061d5e9" alt=""
data:image/s3,"s3://crabby-images/165f8/165f8d9774b02f609f2a897516a56153bac1899c" alt=""
【测试】
data:image/s3,"s3://crabby-images/85d9c/85d9cac104521cf222bf6c33c44287f20c1d4aeb" alt=""
data:image/s3,"s3://crabby-images/a01e2/a01e2100499bb8762df92bc2b49fd0bf8f19f690" alt=""
data:image/s3,"s3://crabby-images/1c723/1c7233e0fed572e32ba4dd383a45d63744104593" alt=""
网友评论