美文网首页rabbitmq
SpringAMQP 用户管理组件 RabbitAdmin 以及

SpringAMQP 用户管理组件 RabbitAdmin 以及

作者: HmilyMing | 来源:发表于2019-02-07 20:46 被阅读11次

    RabbitAdmin 类可以很好的操作 rabbitMQ,在 Spring 中直接进行注入即可。

        !!! 注意,autoStartup 必须设置为 true,否则 Spring 容器不会加载 RabbitAdmin 类。
    

    RabbitAdmin 类 的底层实现就是从 Spring 容器中获取 exchange、Bingding、routingkey 以及queue 的 @bean 声明,然后使用 rabbitTemplate 的 execute 方法进行执行对应的声明、修改、删除等一系列 rabbitMQ 基础功能操作。例如添加交换机、删除一个绑定、清空一个队列里的消息等等

    代码示例:

    代码地址: https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 项目下

    RabbitAdmin 类的使用如下

    rabbitMQ 的基本配置
    
        public interface RabbitMQCommon {
    
        final static String RABBITMQ_HOST = "www.mycentosserver.com";
        final static int RABBITMQ_PORT = 5672;
        final static String RABBITMQ_DEFAULT_VIRTUAL_HOST = "/";
    
        public final static String RABBITMQ_USERNAME = "guest";
        public final static String RABBITMQ_PASSWORD = "guest";
    }
    
    @Configuration
    @ComponentScan({"com.hmily.rabbitmqapi.spring.*"})
    public class RabbitMQConfig {
    
        private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses(RabbitMQCommon.RABBITMQ_HOST + ":" + RabbitMQCommon.RABBITMQ_PORT);
            connectionFactory.setUsername(RabbitMQCommon.RABBITMQ_USERNAME);
            connectionFactory.setPassword(RabbitMQCommon.RABBITMQ_PASSWORD);
            connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
            return connectionFactory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            rabbitAdmin.setAutoStartup(true);
            return  rabbitAdmin;
        
    }
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitmqApiApplicationTests {
    
        private static final Logger log = LoggerFactory.getLogger(RabbitmqApiApplicationTests.class);
    
        @Test
        public void contextLoads() {
        }
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
        
            @Test
        public void testAdmin() throws Exception {
            rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
            rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
            rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
            rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
            rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
            rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
    
            rabbitAdmin.declareBinding(new Binding("test.direct.queue",
                    Binding.DestinationType.QUEUE,
                    "test.direct", "direct", new HashMap<>()));
    
            rabbitAdmin.declareBinding(
                    BindingBuilder
                            .bind(new Queue("test.topic.queue", false))        //直接创建队列
                            .to(new TopicExchange("test.topic", false, false))    //直接创建交换机 建立关联关系
                            .with("user.#"));    //指定路由Key
    
            rabbitAdmin.declareBinding(
                    BindingBuilder
                            .bind(new Queue("test.fanout.queue", false))
                            .to(new FanoutExchange("test.fanout", false, false)));
    
            //清空队列数据
            rabbitAdmin.purgeQueue("test.topic.queue", false);
        }
    }
    

    运行单元测试,然后去 rabbitMQ 的管控台页面验证这些交换机、队列以及绑定关系是否正确。

    源码分析:

    RabbitAdmin 类 的底层实现就是从 Spring 容器中获取 exchange、Bingding、routingkey 以及queue 的 @bean 声明,然后使用 rabbitTemplate 的 execute 方法进行执行对应的声明、修改、删除等一系列 rabbitMQ 基础功能操作。例如添加交换机、删除一个绑定、清空一个队列里的消息等等

    image image

    afterPropertiesSet 方法就是在 我们的 bean 加载后进行一些设置

    image image image image

    SpringAMQP 的声明式

    回顾一下消费者配置

        1. 设置交换机类型
    
        2. 将队列绑定到交换机
    
    交换机类型:
    
        FanoutExchange 类型: 将消息分发到所有的绑定队列,无 routingkey 的概念
    
        HeadersExchange 类型:通过添加属性 key-value 匹配
    
        DirectExchange :按照 routingkey 分发到指定队列
    
        TopicExchange : 多关键字匹配
    

    在 Rabbit 基础 API 里面声明一个 Exchange、queue、binding 的代码如下所示


    image

    SpringAMQP 声明即在 rabbit 基础 API 里面声明一个 exchange、Bingding、queue。使用SpringAMQP 去声明,就需要使用 @bean 的声明方式。

    • 针对消费者配置

        1. 设置交换机类型
        1. 将队列绑定到交换机
    • FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念

    • HeadersExchange :通过添加属性key-value匹配

    • DirectExchange:按照routingkey分发到指定队列

    • TopicExchange:多关键字匹配

    具体的代码实现如下

    在 RabbitMQConfig 里面添加下面的代码
    
    
        /**
         *  声明 TopicExchange 类型的交换机 topic001
         * @return
         */
        @Bean
        public TopicExchange exchange001() {
    //      是否持久化,是否自动删除
            return new TopicExchange("topic001", true, false);
        }
        
    //    声明 queue001 队列
        @Bean
        public Queue queue001() {
            return new Queue("queue001", true); //队列持久
        }
        
    //    将上面的交换机和队列绑定
        @Bean
        public Binding binding001() {
            return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
        }
        
        @Bean
        public TopicExchange exchange002() {
            return new TopicExchange("topic002", true, false);
        }
        
        @Bean
        public Queue queue002() {
            return new Queue("queue002", true);
        }
        
        @Bean
        public Binding binding002() {
            return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");
        }
        
        @Bean
        public Queue queue003() {
            return new Queue("queue003", true); //队列持久
        }
    
        @Bean
        public Binding binding003() {
            return BindingBuilder.bind(queue003()).to(exchange001()).with("mq.*");
        }
    
    

    然后然后在单元测试里面写个方法测试一下,也可以利用刚才 RabbitAdmin 的单元测试,只要能跑起来就行,例如刚才的 testAdmin() ,运行单元测试,然后上管控太验证刚才声明的 exchange、Bingding、queue 是否都成功


    image
    image
    image
    image

    自此,RabbitAdmin以及声明式配置的简单使用demo演示完毕

    相关文章

      网友评论

        本文标题:SpringAMQP 用户管理组件 RabbitAdmin 以及

        本文链接:https://www.haomeiwen.com/subject/shvdsqtx.html