美文网首页
【RabbitMQ的那点事】订阅/发布模式下,动态新建queue

【RabbitMQ的那点事】订阅/发布模式下,动态新建queue

作者: 伊丽莎白2015 | 来源:发表于2023-01-27 21:41 被阅读0次

    【前置文章】

    【本文内容】
    本文主要介绍了通过AmqpAdmin进行动态创建Queue,以使得程序的所有的instance都能接收到消息。


    1. 订阅/发布模式下,固定queue存在的问题

    我们通过下面的代码新建了一个fanout exchange,并新建了queue绑定到了该exchange上。
    假设,我们的需求是通过某一个节点修改的值,这个事件需要发布到所有的节点上。
    同一份代码,我们需要部署在2个服务器上。

    本地测试:instance1使用的是8080端口。instance2使用的是8081端口。

    @Configuration
    public class FanoutConfig {
    
        @Bean
        public Declarables fanoutExchange() {
            FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange");
            Queue fanoutQueue = new Queue("fanout.queue", true);
            return new Declarables(
                    fanoutQueue,
                    fanoutExchange,
                    bind(fanoutQueue).to(fanoutExchange));
        }
    
        @RabbitListener(queues = {"fanout.queue"})
        public void receiveMessageFromFanout(String message) {
            System.out.println("Received message from fanout.exchange: " + message);
        }
    }
    

    因为是两个instance,我们需要在application.yaml中配置端口,以便动态传入:

    server.port=${PORT}
    

    启动的时候需要在ide中配置VM Options=-DPORT=8080-DPORT=8081,那么整个订阅模式的图如下:

    未命名文件.jpg

    主要原因还是instance1或是instance2用的是同一份源代码,那么这里的queue都是fanout.queue,即只有一个queue。

    【我们都知道同一个queue中的消息,只能被消费一次。也就是说上述的修改事件,在这个模式下只能被其中一个instance消费。】

    【测试】


    image.png image.png 尝试在RabbitMQ Console中发布消息: image.png instance-8080没有收到消息: image.png instance-8081收到消息: image.png

    2. 尝试动态建立Queue并进行绑定

    动态queue的意思是每一份源代码在新建Queue的时候,并不是以固定的名字创建queue,而是会以随机的名字生成Queue使得运行时每个instance都有自己的Queue,具体拓扑图如下: 未命名文件 (1).png

    想要实现动态创建Queue,在Spring Boot中可以使用AmqpAdmin

    
    @Configuration
    public class AmqpAdminConfig {
    
        @Autowired
        private AmqpAdmin amqpAdmin;
    
        @Autowired
        private FanoutMessageService fanoutMessageService;
    
        @Bean
        public FanoutExchange fanoutAdminExchange() {
            return new FanoutExchange("fanout.admin.exchange");
        }
    
        @Bean
        public Queue fanoutQueue(FanoutExchange fanoutExchange) {
            Queue randomQueue = amqpAdmin.declareQueue();
            Binding binding = new Binding(randomQueue.getName(), Binding.DestinationType.QUEUE, fanoutExchange.getName(), "", null);
            amqpAdmin.declareBinding(binding);
            return randomQueue;
        }
    
        @Bean
        SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, Queue fanoutQueue) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames(fanoutQueue.getName());
            container.setMessageListener(fanoutMessageService);
            return container;
        }
    }
    
    @Service
    public class FanoutMessageService implements MessageListener {
        @Override
        public void onMessage(Message message) {
            System.out.println("received message from fanout.admin.exchange: " + new String(message.getBody()));
        }
    }
    
    打开RabbitMQ Console,创建的exchange,queue如下: image.png

    通过amqpAdmin.declareQueue()创建出来的Queue,auto-delete=true,即当程序down掉后这个queue会自动删除:

    image.png 绑定关系: 绑定关系

    【测试】

    尝试在exchange=fanout.admin.exchange中发布一条消息: image.png 这时候instance-8080以及instance-8081都能收到消息,因为它们所订阅的queue是不同的: instance-8080接收到的消息 instance-8081接收到的消息

    相关文章

      网友评论

          本文标题:【RabbitMQ的那点事】订阅/发布模式下,动态新建queue

          本文链接:https://www.haomeiwen.com/subject/flnehdtx.html