美文网首页
【RabbitMQ的那点事】订阅/发布模式下,动态新建queue

【RabbitMQ的那点事】订阅/发布模式下,动态新建queue

作者: 伊丽莎白2015 | 来源:发表于2023-01-27 21:41 被阅读0次

【前置文章】

【本文内容】
本文主要介绍了通过AmqpAdmin进行动态创建Queue,以使得程序的所有的instance都能接收到消息。


1. 订阅/发布模式下,固定queue存在的问题

我们通过下面的代码新建了一个fanout exchange,并新建了queue绑定到了该exchange上。
假设,我们的需求是通过某一个节点修改的值,这个事件需要发布到所有的节点上。
同一份代码,我们需要部署在2个服务器上。

本地测试:instance1使用的是8080端口。instance2使用的是8081端口。

@Configuration
public class FanoutConfig {

    @Bean
    public Declarables fanoutExchange() {
        FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange");
        Queue fanoutQueue = new Queue("fanout.queue", true);
        return new Declarables(
                fanoutQueue,
                fanoutExchange,
                bind(fanoutQueue).to(fanoutExchange));
    }

    @RabbitListener(queues = {"fanout.queue"})
    public void receiveMessageFromFanout(String message) {
        System.out.println("Received message from fanout.exchange: " + message);
    }
}

因为是两个instance,我们需要在application.yaml中配置端口,以便动态传入:

server.port=${PORT}

启动的时候需要在ide中配置VM Options=-DPORT=8080-DPORT=8081,那么整个订阅模式的图如下:

未命名文件.jpg

主要原因还是instance1或是instance2用的是同一份源代码,那么这里的queue都是fanout.queue,即只有一个queue。

【我们都知道同一个queue中的消息,只能被消费一次。也就是说上述的修改事件,在这个模式下只能被其中一个instance消费。】

【测试】


image.png image.png 尝试在RabbitMQ Console中发布消息: image.png instance-8080没有收到消息: image.png instance-8081收到消息: image.png

2. 尝试动态建立Queue并进行绑定

动态queue的意思是每一份源代码在新建Queue的时候,并不是以固定的名字创建queue,而是会以随机的名字生成Queue使得运行时每个instance都有自己的Queue,具体拓扑图如下: 未命名文件 (1).png

想要实现动态创建Queue,在Spring Boot中可以使用AmqpAdmin


@Configuration
public class AmqpAdminConfig {

    @Autowired
    private AmqpAdmin amqpAdmin;

    @Autowired
    private FanoutMessageService fanoutMessageService;

    @Bean
    public FanoutExchange fanoutAdminExchange() {
        return new FanoutExchange("fanout.admin.exchange");
    }

    @Bean
    public Queue fanoutQueue(FanoutExchange fanoutExchange) {
        Queue randomQueue = amqpAdmin.declareQueue();
        Binding binding = new Binding(randomQueue.getName(), Binding.DestinationType.QUEUE, fanoutExchange.getName(), "", null);
        amqpAdmin.declareBinding(binding);
        return randomQueue;
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, Queue fanoutQueue) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(fanoutQueue.getName());
        container.setMessageListener(fanoutMessageService);
        return container;
    }
}
@Service
public class FanoutMessageService implements MessageListener {
    @Override
    public void onMessage(Message message) {
        System.out.println("received message from fanout.admin.exchange: " + new String(message.getBody()));
    }
}
打开RabbitMQ Console,创建的exchange,queue如下: image.png

通过amqpAdmin.declareQueue()创建出来的Queue,auto-delete=true,即当程序down掉后这个queue会自动删除:

image.png 绑定关系: 绑定关系

【测试】

尝试在exchange=fanout.admin.exchange中发布一条消息: image.png 这时候instance-8080以及instance-8081都能收到消息,因为它们所订阅的queue是不同的: instance-8080接收到的消息 instance-8081接收到的消息

相关文章

网友评论

      本文标题:【RabbitMQ的那点事】订阅/发布模式下,动态新建queue

      本文链接:https://www.haomeiwen.com/subject/flnehdtx.html