SpringBoot集成RabbitMQ-动态注入Bean方式

作者: 七岁能杀猪 | 来源:发表于2018-07-18 11:18 被阅读62次

    实现Direct,Fanout,Topic和死信转发方式实现的延迟队列

    一个让处女座程序员很难受的问题:
    每次申明一个队列,都需要用@Bean注解在config类里面显式的往容器里面注入一个Queue Bean和Binding Bean,十几个队列下来,那场面简直不能忍.
    怎么解决呢,思路:
    通过遍历枚举的方式,统一往spring容器中注入bean.废话不多说,上代码

    一 使用场景说明
    1.Direct
    根据routekey精确匹配消费,只消费一次
    2.Fanout
    广播消息队列,同交换机内的所有消费者,都接收到消息
    3.Topic
    支持模糊匹配,可匹配到多个.配合AnonymousQueue队列可实现集群内多点同一业务集群消费.如:修改集群内所有应用内存中配置
    4.TTL
    延迟队列,实现消息延迟指定时间消费

    二 关键代码

    1. 配置类:
    import com.google.common.collect.Maps;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
    import org.springframework.beans.factory.support.BeanDefinitionBuilder;
    import org.springframework.beans.factory.support.BeanDefinitionRegistry;
    import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.stereotype.Service;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    /**
     * @author onlinever
     * @date 2018/09/06
     */
    @Service
    public class RabbitQueueBeanRegister implements BeanDefinitionRegistryPostProcessor, ApplicationContextAware {
    
        private ApplicationContext applicationContext;
    
        private BeanDefinitionRegistry beanDefinitionRegistry;
    
        private String adapterSuffix = "Adapter";
    
        private Map<RabbitQueueEnum, Queue> topicQueues = Maps.newHashMap();
    
        private List<TopicConsumer> topicConsumers;
    
        @Override
        public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
            this.beanDefinitionRegistry = beanDefinitionRegistry;
            //声明交换机
            declareExchange();
            //声明队列和绑定
            declareQueueAndBinding();
            //奇怪的执行顺序
            if (haveTopicQueue()) {
                declareTopicMessageListenerAdapter();
                declareTopicMessageListenerContainer();
            }
        }
    
        private boolean haveTopicQueue() {
            try {
                topicConsumers = new ArrayList<>(applicationContext.getBeansOfType(TopicConsumer.class).values());
                return !topicConsumers.isEmpty();
            } catch (Exception e) {
                System.out.println(e.getMessage());
                return false;
            }
        }
    
        /**
         * 声明交换机
         */
        private void declareExchange() {
            for (RabbitExchangeEnum rabbitExchangeEnum : RabbitExchangeEnum.values()) {
                switch (rabbitExchangeEnum.getRabbitExchangeTypeEnum()) {
                    case FANOUT_QUEUE:
                        beanDefinitionRegistry.registerBeanDefinition(rabbitExchangeEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(FanoutExchange.class, () -> (FanoutExchange) ExchangeBuilder
                                .fanoutExchange(rabbitExchangeEnum.getExchangeName())
                                .durable(true)
                                .build()).getBeanDefinition());
                        break;
                    case TOPIC_QUEUE:
                        beanDefinitionRegistry.registerBeanDefinition(rabbitExchangeEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(TopicExchange.class, () -> (TopicExchange) ExchangeBuilder
                                .topicExchange(rabbitExchangeEnum.getExchangeName())
                                .durable(true)
                                .build()).getBeanDefinition());
                        break;
                    default:
                        beanDefinitionRegistry.registerBeanDefinition(rabbitExchangeEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(DirectExchange.class, () -> (DirectExchange) ExchangeBuilder
                                .directExchange(rabbitExchangeEnum.getExchangeName())
                                .durable(true)
                                .build()).getBeanDefinition());
                        break;
                }
            }
        }
    
        /**
         * 声明队列和绑定
         */
        private void declareQueueAndBinding() {
            String bindingSuffix = "Binding";
            for (RabbitQueueEnum rabbitQueueEnum : RabbitQueueEnum.values()) {
                //注册所有队列
                beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(Queue.class, () -> {
                    Queue queue;
                    switch (rabbitQueueEnum.getExchangeEnum().getRabbitExchangeTypeEnum()) {
                        case TTL_QUEUE:
                            queue = QueueBuilder
                                    .durable(rabbitQueueEnum.getRouteKey())
                                    // 配置到期后转发的交换
                                    .withArgument("x-dead-letter-exchange", rabbitQueueEnum.getRabbitQueueEnum().getExchangeName())
                                    // 配置到期后转发的路由键
                                    .withArgument("x-dead-letter-routing-key", rabbitQueueEnum.getRabbitQueueEnum().getRouteKey())
                                    .build();
                            break;
                        case TOPIC_QUEUE:
                            queue = new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy(StringUtils.getTopicQueueNamePrefix(rabbitQueueEnum.getRouteKey())));
                            topicQueues.put(rabbitQueueEnum, queue);
                            break;
                        default:
                            queue = new Queue(rabbitQueueEnum.getRouteKey());
                            break;
                    }
                    return queue;
                }).getBeanDefinition());
                //注册队列与交换机的绑定
                switch (rabbitQueueEnum.getExchangeEnum().getRabbitExchangeTypeEnum()) {
                    case FANOUT_QUEUE:
                        beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName() + bindingSuffix, BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () -> BindingBuilder
                                .bind(applicationContext.getBean(rabbitQueueEnum.getBeanName(), Queue.class))
                                .to(applicationContext.getBean(rabbitQueueEnum.getExchangeEnum().getBeanName(), FanoutExchange.class))).getBeanDefinition());
    
                        break;
                    case NORMAL_QUEUE:
                    case TTL_QUEUE:
                        beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName() + bindingSuffix, BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () -> BindingBuilder
                                .bind(applicationContext.getBean(rabbitQueueEnum.getBeanName(), Queue.class))
                                .to(applicationContext.getBean(rabbitQueueEnum.getExchangeEnum().getBeanName(), DirectExchange.class))
                                .with(rabbitQueueEnum.getRouteKey())).getBeanDefinition());
                        break;
                    case TOPIC_QUEUE:
                        beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName() + bindingSuffix, BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () -> BindingBuilder
                                .bind(applicationContext.getBean(rabbitQueueEnum.getBeanName(), Queue.class))
                                .to(applicationContext.getBean(rabbitQueueEnum.getExchangeEnum().getBeanName(), TopicExchange.class))
                                .with(StringUtils.getTopicQueueRoute(rabbitQueueEnum.getRouteKey()))).getBeanDefinition());
                        break;
                    default:
                        break;
                }
            }
        }
    
        /**
         * 声明Topic消息监听适配器
         */
        private void declareTopicMessageListenerAdapter() {
            topicConsumers.forEach(topicConsumer -> beanDefinitionRegistry.registerBeanDefinition(topicConsumer.getQueueEnum().getBeanName() + adapterSuffix,
                    BeanDefinitionBuilder.genericBeanDefinition(MessageListenerAdapter.class, () -> new MessageListenerAdapter(topicConsumer)).getBeanDefinition()));
        }
    
        /**
         * 声明Topic消息监听容器
         */
        private void declareTopicMessageListenerContainer() {
            String containerSuffix = "Container";
            topicConsumers.forEach(topicConsumer -> beanDefinitionRegistry.registerBeanDefinition(topicConsumer.getQueueEnum().getBeanName() + containerSuffix,
                    BeanDefinitionBuilder.genericBeanDefinition(SimpleMessageListenerContainer.class, () -> {
                        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
                        container.setQueues(topicQueues.get(topicConsumer.getQueueEnum()));
                        container.setConnectionFactory(applicationContext.getBean("rabbitConnectionFactory", ConnectionFactory.class));
                        container.setMessageListener(applicationContext.getBean(topicConsumer.getQueueEnum().getBeanName() + adapterSuffix));
                        container.setRabbitAdmin(applicationContext.getBean(RabbitAdmin.class));
                        return container;
                    }).getBeanDefinition()));
        }
    
        @Override
        public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
    
        }
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    }
    
    1. 枚举类
      2.1 交换机类型枚举
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.TopicExchange;
    
    /**
     * @author onlinever
     * @date 2018/09/06
     */
    public enum RabbitExchangeTypeEnum {
    
        /**
         * 死信转发方式延迟队列
         */
        TTL_QUEUE(1, DirectExchange.class),
        /**
         * 正常队列
         */
        NORMAL_QUEUE(2, DirectExchange.class),
        /**
         * 广播队列
         */
        FANOUT_QUEUE(3, FanoutExchange.class),
        /**
         * topic队列
         */
        TOPIC_QUEUE(4, TopicExchange.class);
    
    
        /**
         * 队列routeKey
         */
        private int index;
    
        /**
         * 交换机class
         */
        private Class exchangeClazz;
    
    
        RabbitExchangeTypeEnum(int index, Class exchangeClazz) {
            this.index = index;
            this.exchangeClazz = exchangeClazz;
        }
    
        public int getIndex() {
            return index;
        }
    
        public Class getExchangeClazz() {
            return exchangeClazz;
        }
    }
    

    2.2 交换机枚举

    /**
     * @author onlinever
     * @date 2018/09/06
     */
    public enum RabbitExchangeEnum {
    
    
        /**
         * rabbit交换机名称
         * 默认一个应用设置一个交换机
         * exchange.{0}.{1}
         * 0: 交换机类型 direct、topic、fanout、headers
         * 1: 应用名称
         */
        DIRECT_EXCHANGE("directExchange", "exchange.direct.gateway", RabbitExchangeTypeEnum.NORMAL_QUEUE),
        FANOUT_EXCHANGE("fanoutExchange", "exchange.fanout.gateway", RabbitExchangeTypeEnum.FANOUT_QUEUE),
        TOPIC_EXCHANGE("topicExchange", "exchange.topic.gateway", RabbitExchangeTypeEnum.TOPIC_QUEUE),;
    
        /**
         * 交换机beanName
         */
        private String beanName;
        /**
         * 交换机key
         */
        private String exchangeName;
        /**
         * 交换机类型
         */
        private RabbitExchangeTypeEnum rabbitExchangeTypeEnum;
    
        RabbitExchangeEnum(String beanName, String exchangeName, RabbitExchangeTypeEnum rabbitExchangeTypeEnum) {
            this.beanName = beanName;
            this.exchangeName = exchangeName;
            this.rabbitExchangeTypeEnum = rabbitExchangeTypeEnum;
        }
    
        public String getExchangeName() {
            return exchangeName;
        }
    
        public String getBeanName() {
            return beanName;
        }
    
        public RabbitExchangeTypeEnum getRabbitExchangeTypeEnum() {
            return rabbitExchangeTypeEnum;
        }
    }
    

    2.3 队列枚举

    /**
     * @author onlinever
     * @date 2018/09/06
     */
    public enum RabbitQueueEnum {
    
        ;
        /**
         * 队列BeanName
         */
        private String beanName;
        /**
         * 队列routeKey
         */
        private String routeKey;
        /**
         * 交换机
         */
        private RabbitExchangeEnum exchangeEnum;
    
        /**
         * 死信转发到队列
         */
        private RabbitQueueEnum rabbitQueueEnum;
    
    
        RabbitQueueEnum(String beanName, String routeKey, RabbitExchangeEnum exchangeEnum, RabbitQueueEnum rabbitQueueEnum) {
            this.beanName = beanName;
            this.routeKey = routeKey;
            this.exchangeEnum = exchangeEnum;
            this.rabbitQueueEnum = rabbitQueueEnum;
        }
    
        public String getRouteKey() {
            return routeKey;
        }
    
        public RabbitExchangeEnum getExchangeEnum() {
            return exchangeEnum;
        }
    
        public String getExchangeName() {
            return exchangeEnum.getExchangeName();
        }
    
        public String getBeanName() {
            return beanName;
        }
    
        public RabbitQueueEnum getRabbitQueueEnum() {
            return rabbitQueueEnum;
        }
    }
    
    1. Topic消费者接口
    /**
     * topic队列消费者
     *
     * @author onlinever
     * @date 2018/8/17
     */
    public interface TopicConsumer {
        /**
         * 消费的队列
         *
         * @return 队列
         */
        RabbitQueueEnum getQueueEnum();
    
        /**
         * 具体消费者的实现
         *
         * @param message 消息
         */
        void handleMessage(String message);
    }
    
    1. 其他消费者使用@RabbitListener方式

    相关文章

      网友评论

      • 61a7bd8ce5c1:自从看了你的文章,腰不酸了,腿不疼了,感觉年轻了好几岁

      本文标题:SpringBoot集成RabbitMQ-动态注入Bean方式

      本文链接:https://www.haomeiwen.com/subject/hphopftx.html