SpringBoot整合多个RabbitMQ

作者: huan1993 | 来源:发表于2020-08-06 18:33 被阅读0次

    一、背景

    ​ 最近项目中需要用到了RabbitMQ来监听消息队列,监听的消息队列的 虚拟主机(virtualHost)和队列名(queueName)是不一致的,但是接收到的消息格式相同的。而且可能还存在程序不停机的情况下,动态的增加新的队列(queue)的监听,因此就需要我们自己在程序中实现一种方法实现动态配置RabbitMQ

    二、需求

    我们有2RabbitMQ的配置,在程序启动的时候,动态的配置好这2个RabbitMQ,实现消息的监听。

    RabbitMQ的配置信息

    host port username password virtualHost queueName
    47.101.130.164 5672 rabbit-multi-01 rabbit-multi-01 /rabbit-multi-01 queue-rabbit-multi-01
    47.101.130.164 5672 rabbit-multi-02 rabbit-multi-02 /rabbit-multi-02 queue-rabbit-multi-02

    三、实现思路

    1、动态配置RabbitMQ

    包括 ConnectionFactory,RabbitAdmin,RabbitTemplate,SimpleMessageListenerContainer

    2、将上方配置好的Bean注入到Spring容器中,之后可能需要用到

    Spring容器中注入Bean的方法

    DefaultListableBeanFactory#registerSingleton
    
    DefaultListableBeanFactory#registerBeanDefinition
    

    四、实现步骤

    1、引入maven依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    

    2、创建RabbitProperties用来表示RabbitMQ的配置信息

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @Builder
    public class RabbitProperties {
        private String host;
        private Integer port;
        private String username;
        private String password;
        private String virtualHost;
        private String queueName;
    }
    

    3、配置RabbitMQ

    配置 ConnectionFactory,RabbitAdmin,RabbitTemplate,SimpleMessageListenerContainer等,并动态注入到Spring容器中

    @Configuration
    @RequiredArgsConstructor
    @Slf4j
    public class MultiRabbitMqConfig {
    
        private final DefaultListableBeanFactory defaultListableBeanFactory;
    
        private static Map<String, RabbitProperties> multiMqPropertiesMap = new HashMap<String, RabbitProperties>() {
            {
                put("first", RabbitProperties.builder()
                        .host("47.101.130.164")
                        .port(5672)
                        .username("rabbit-multi-01")
                        .password("rabbit-multi-01")
                        .virtualHost("/rabbit-multi-01")
                        .queueName("queue-rabbit-multi-01").build());
                put("second", RabbitProperties.builder()
                        .host("47.101.130.164")
                        .port(5672)
                        .username("rabbit-multi-02")
                        .password("rabbit-multi-02")
                        .virtualHost("/rabbit-multi-02")
                        .queueName("queue-rabbit-multi-02").build());
            }
        };
    
        @PostConstruct
        public void initRabbitmq() {
            multiMqPropertiesMap.forEach((key, rabbitProperties) -> {
    
                AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder.genericBeanDefinition(CachingConnectionFactory.class)
                        .addPropertyValue("cacheMode", CachingConnectionFactory.CacheMode.CHANNEL)
                        .addPropertyValue("host", rabbitProperties.getHost())
                        .addPropertyValue("port", rabbitProperties.getPort())
                        .addPropertyValue("username", rabbitProperties.getUsername())
                        .addPropertyValue("password", rabbitProperties.getPassword())
                        .addPropertyValue("virtualHost", rabbitProperties.getVirtualHost())
                        .getBeanDefinition();
                String connectionFactoryName = String.format("%s%s", key, "ConnectionFactory");
                defaultListableBeanFactory.registerBeanDefinition(connectionFactoryName, beanDefinition);
                CachingConnectionFactory connectionFactory = defaultListableBeanFactory.getBean(connectionFactoryName, CachingConnectionFactory.class);
    
                String rabbitAdminName = String.format("%s%s", key, "RabbitAdmin");
                AbstractBeanDefinition rabbitAdminBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(RabbitAdmin.class)
                        .addConstructorArgValue(connectionFactory)
                        .addPropertyValue("autoStartup", true)
                        .getBeanDefinition();
                defaultListableBeanFactory.registerBeanDefinition(rabbitAdminName, rabbitAdminBeanDefinition);
                RabbitAdmin rabbitAdmin = defaultListableBeanFactory.getBean(rabbitAdminName, RabbitAdmin.class);
                log.info("rabbitAdmin:[{}]", rabbitAdmin);
    
                RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
                defaultListableBeanFactory.registerSingleton(String.format("%s%s", key, "RabbitTemplate"), rabbitTemplate);
    
                SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
                // 设置监听的队列
                simpleMessageListenerContainer.setQueueNames(rabbitProperties.getQueueName());
                // 指定要创建的并发使用者的数量,默认值是1,当并发高时可以增加这个的数值,同时下方max的数值也要增加
                simpleMessageListenerContainer.setConcurrentConsumers(3);
                // 最大的并发消费者
                simpleMessageListenerContainer.setMaxConcurrentConsumers(10);
                // 设置是否重回队列
                simpleMessageListenerContainer.setDefaultRequeueRejected(false);
                // 设置签收模式
                simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
                // 设置非独占模式
                simpleMessageListenerContainer.setExclusive(false);
                // 设置consumer未被 ack 的消息个数
                simpleMessageListenerContainer.setPrefetchCount(1);
                // 设置消息监听器
                simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
                    try {
                        log.info("============> Thread:[{}] 接收到消息:[{}] ", Thread.currentThread().getName(), new String(message.getBody()));
                        log.info("====>connection:[{}]", channel.getConnection());
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                    } catch (Exception e) {
                        log.error(e.getMessage(), e);
                        // 发生异常此处需要捕获到
                        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                    }
                });
                defaultListableBeanFactory.registerSingleton(String.format("%s%s", key, "SimpleMessageListenerContainer"), simpleMessageListenerContainer);
            });
            new Thread(() -> {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                RabbitTemplate firstRabbitTemplate = (RabbitTemplate) defaultListableBeanFactory.getBean("firstRabbitTemplate");
                firstRabbitTemplate.convertAndSend("exchange-rabbit-multi-01", "", "first queue message");
                log.info("over...");
            }).start();
        }
    }
    

    五、实现效果

    Multi-RabbitMQ.jpg

    六、代码

    https://gitee.com/huan1993/rabbitmq/tree/master/rabbitmq-springboot-multi

    相关文章

      网友评论

        本文标题:SpringBoot整合多个RabbitMQ

        本文链接:https://www.haomeiwen.com/subject/rfetdktx.html