【前置文章】
【本文内容】
本文主要介绍了通过AmqpAdmin
进行动态创建Queue,以使得程序的所有的instance都能接收到消息。
1. 订阅/发布模式下,固定queue存在的问题
我们通过下面的代码新建了一个fanout exchange,并新建了queue绑定到了该exchange上。
假设,我们的需求是通过某一个节点修改的值,这个事件需要发布到所有的节点上。
同一份代码,我们需要部署在2个服务器上。
本地测试:instance1使用的是8080端口。instance2使用的是8081端口。
@Configuration
public class FanoutConfig {
@Bean
public Declarables fanoutExchange() {
FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange");
Queue fanoutQueue = new Queue("fanout.queue", true);
return new Declarables(
fanoutQueue,
fanoutExchange,
bind(fanoutQueue).to(fanoutExchange));
}
@RabbitListener(queues = {"fanout.queue"})
public void receiveMessageFromFanout(String message) {
System.out.println("Received message from fanout.exchange: " + message);
}
}
因为是两个instance,我们需要在application.yaml
中配置端口,以便动态传入:
server.port=${PORT}
启动的时候需要在ide中配置VM Options=-DPORT=8080
或-DPORT=8081
,那么整个订阅模式的图如下:

主要原因还是instance1或是instance2用的是同一份源代码,那么这里的queue都是fanout.queue,即只有一个queue。
【我们都知道同一个queue中的消息,只能被消费一次。也就是说上述的修改事件,在这个模式下只能被其中一个instance消费。】
【测试】





2. 尝试动态建立Queue并进行绑定
动态queue的意思是每一份源代码在新建Queue的时候,并不是以固定的名字创建queue,而是会以随机的名字生成Queue使得运行时每个instance都有自己的Queue,具体拓扑图如下:
想要实现动态创建Queue,在Spring Boot中可以使用AmqpAdmin
。
@Configuration
public class AmqpAdminConfig {
@Autowired
private AmqpAdmin amqpAdmin;
@Autowired
private FanoutMessageService fanoutMessageService;
@Bean
public FanoutExchange fanoutAdminExchange() {
return new FanoutExchange("fanout.admin.exchange");
}
@Bean
public Queue fanoutQueue(FanoutExchange fanoutExchange) {
Queue randomQueue = amqpAdmin.declareQueue();
Binding binding = new Binding(randomQueue.getName(), Binding.DestinationType.QUEUE, fanoutExchange.getName(), "", null);
amqpAdmin.declareBinding(binding);
return randomQueue;
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, Queue fanoutQueue) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(fanoutQueue.getName());
container.setMessageListener(fanoutMessageService);
return container;
}
}
@Service
public class FanoutMessageService implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("received message from fanout.admin.exchange: " + new String(message.getBody()));
}
}
打开RabbitMQ Console,创建的exchange,queue如下:

通过amqpAdmin.declareQueue()
创建出来的Queue,auto-delete=true,即当程序down掉后这个queue会自动删除:


【测试】



网友评论