美文网首页
RabbitMQ整合Spring-AMQP-------Rabb

RabbitMQ整合Spring-AMQP-------Rabb

作者: 爱吃豆包 | 来源:发表于2019-05-19 13:50 被阅读0次

    RabbitAdmin

    RabbitAdmin类是针对RabbitMQ管理端进行封装操作,比如:Exchange操作、Queue操作,Binding绑定操作等,操作起来简单便捷!

    基本方法

    • declareExchange 创建Exchange操作
    • deleteExchange 删除Exchange操作
    • declareQueue 创建Queue操作
    • deleteQueue 删除Queue操作
    • purgeQueue 清空队列
    • declareBinding 绑定操作,绑定Queue,Exchange
    • removeBinding 删除绑定操作

    使用

    声明 ConnectionFactory 连接工厂

    /**
         * 创建 RabbitMQ 连接工厂
         * @return
         */
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            // rabbitmq 服务地址
            connectionFactory.setAddresses("192.168.72.138:5672");
            // 用户名
            connectionFactory.setUsername("weiximei");
            // 密码
            connectionFactory.setPassword("weiximei");
            // 虚拟机路径
            connectionFactory.setVirtualHost("/");
    
            return connectionFactory;
        }
    

    声明 RabbitAdmin 类

    注意: 在声明 RabbitAdmin 类的时候,一定要把 setAutoStartup 设置为true,才能在spring启动的时候,进行加载

     /**
         * 创建 RabbitAdmin 类,这个类封装了对 RabbitMQ 管理端的操作!
         *
         *    比如:Exchange 操作,Queue 操作,Binding 绑定 等
         *
         * @param connectionFactory
         * @return
         */
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
    
    

    测试类中测试

    /****    RabbitAdmin 操作    ****/
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
    
        /**
         * 交换机操作
         */
        @Test
        public void testAdminExchange() {
            // 创建交换机, 类型为 direct
            // durable 参数表示是否持久化
            rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
    
            // 创建交换机,类型为 topic
            // durable 参数表示是否持久化
            rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
    
            // 创建交换机,类型为 fanout
            // durable 参数表示是否持久化
            rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
    
        }
    
    
        /**
         * 队列操作
         */
        @Test
        public void testAdminQueue() {
            // 创建队列
            // durable 参数表示是否持久化
            rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
            // 创建队列
            // durable 参数表示是否持久化
            rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
            // 创建队列
            // durable 参数表示是否持久化
            rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
    
        }
    
        /**
         * 绑定操作
         */
        @Test
        public void testAdminBinding() {
            /**
             * 两种写法都可以,都选择绑定 队列 或者 交换机
             */
    
    
            /**
             * destination 需要绑定队列的名字
             * DestinationType 绑定类型,
             *          Binding.DestinationType.QUEUE 表示是队列绑定
             *          Binding.DestinationType.EXCHANGE 表示交换机绑定
             *
             * exchange 交换机名称
             * routingKey 路由key
             * arguments 额外参数(比如绑定队列,可以设置 死信队列的参数)
             */
            rabbitAdmin.declareBinding(new Binding("test.direct.queue",
                    Binding.DestinationType.QUEUE, "test.direct", "routing_direct", new HashMap<>()));
    
    
            /**
             * 链式写法
             *
             */
            rabbitAdmin.declareBinding(
                    BindingBuilder.bind(new Queue("test.topic.queue", false)) // 直接创建队列
                            .to(new TopicExchange("test.topic", false, false)) // 直接创建交换机,并建立关联关系
                            .with("routing_topic") // 指定路由 key
            );
    
            /**
             * 链式写法
             *
             * FanoutExchange 交换机,和路由key没有绑定关系,因为他是给交换机内所有的 queue 都发送消息!
             *
             */
            rabbitAdmin.declareBinding(
                    BindingBuilder.bind(new Queue("test.fanout.queue", false)) // 直接创建队列
                            .to(new FanoutExchange("test.fanout", false, false)) // 直接创建交换机,并建立关联关系
            );
    
    
        }
    
    
        /**
         * 其他操作
         */
        @Test
        public void testAdminOther() {
    
            // 清空队列
            // noWait 参数是否需要等待: true 表示需要,false 表示不需要
            //      也就是需要清空的时候,我需要等待一下,在清空(会自动等待几秒钟)
            rabbitAdmin.purgeQueue("test.fanout.queue", false);
    
        }
    

    还可在启动的时候,就执行Exchange、Queue、Binding的操作

        /**
         * 创建 一个 topic 的交换机
         * @return
         */
        @Bean
        public TopicExchange topicExchange() {
    
            // 创建交换机,类型为 topic
            // durable 参数表示是否持久化
            return new TopicExchange("test.topic", true, false);
    
        }
    
        /**
         * 创建一个 队列
         * @return
         */
        @Bean
        public Queue queue() {
            // 创建队列
            // durable 参数表示是否持久化
            return new Queue("test.topic.queue", true);
        }
    
        /**
         * 创建一个 绑定操作 ,绑定 test.topic 交换机 和  test.topic.queue 队列
         * @return
         */
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(queue()) // 直接创建队列
                    .to(topicExchange()) // 直接创建交换机,并建立关联关系
                    .with("routing_topic"); // 指定路由 key
        }
    

    完整的配置类

    package com.example.rabbitmq.spring.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * RabbitMQ 配置类
     *
     * @author weiximei
     */
    @Configuration
    @ComponentScan({"com.example.rabbitmq.spring.*"})
    public class RabbitMQConfig {
    
        /**
         * 创建 RabbitMQ 连接工厂
         * @return
         */
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            // rabbitmq 服务地址
            connectionFactory.setAddresses("192.168.72.138:5672");
            // 用户名
            connectionFactory.setUsername("admin");
            // 密码
            connectionFactory.setPassword("admin");
            // 虚拟机路径
            connectionFactory.setVirtualHost("/");
    
            return connectionFactory;
        }
    
        /**
         * 创建 RabbitAdmin 类,这个类封装了对 RabbitMQ 管理端的操作!
         *
         *    比如:Exchange 操作,Queue 操作,Binding 绑定 等
         *
         * @param connectionFactory
         * @return
         */
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
    
        /**
         * 除了 在测试类,里面是直接操作之外
         *
         * 还可以直接在 用 Bean 声明,这样会直接执行
         */
    
        /**
         * 交换机类型:
         *
         * FanoutExchange 将消息分发到所有的绑定队列,无 routingKey 的概念。
         * HeadersExchange 通过添加属性 key-value 匹配
         * DirectExchange 按照 routingKey 分发到知道队列
         * TopicExchange 多关键字匹配
         *
         *
         */
    
    
        /**
         * 创建 一个 topic 的交换机
         * @return
         */
        @Bean
        public TopicExchange topicExchange() {
    
            // 创建交换机,类型为 topic
            // durable 参数表示是否持久化
            return new TopicExchange("test.topic", true, false);
    
        }
    
        /**
         * 创建一个 队列
         * @return
         */
        @Bean
        public Queue queue() {
            // 创建队列
            // durable 参数表示是否持久化
            return new Queue("test.topic.queue", true);
        }
    
        /**
         * 创建一个 绑定操作 ,绑定 test.topic 交换机 和  test.topic.queue 队列
         * @return
         */
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(queue()) // 直接创建队列
                    .to(topicExchange()) // 直接创建交换机,并建立关联关系
                    .with("routing_topic"); // 指定路由 key
        }
    
    }
    
    

    简单分析下 RabbitAdmin 这个类

    进入 RabbitAdmin 这个类

    CD4CFE82-E19B-4F6A-AB16-71B3A965F2A6.png

    发现实现 InitializingBean 这个接口,这个接口里面有个 afterPropertiesSet 方法,这个方法是 Bean初始化完成后,可以做一些操作!

    RabbitAdmin 在这个 afterPropertiesSet 方法中做了哪些操作?

    1. 首先有一个 synchronized 同步代码块的锁,锁的是一个 Object 类,这个Object 是一个生命周期监控,意思就是 锁定的是生命周期,防止被 二次声明 RabbitAdmin 这个类。
    2. 接下来有个 if 判断,判断 RabbitAdmin 是否初始化了运行,或者 autoStartup 这个值是否为false ,false 表示spring 初始化的时候,不加载RabbitAdmin这个类。如果 RabbitAdmin 运行了,或者 autoStartup 为 false 就直接 return,进行结束这个 afterPropertiesSet 方法的执行!
    3. 接着调用 this.connectionFactory.addConnectionListener 添加连接监听


      1.png

    发现在调用 this.connectionFactory.addConnectionListener 的时候,有个原子操作,会先执行 CAS 操作,如果值成功替换为了 true,就继续往下执行,进行 initialize() 方法,这个 initialize() 里面会创建三个Collection 集合,

    分别是 : Collection<Exchange>, Collection<Queue>, Collection<Binding>

    这三个集合分别存放的是,我们之前声明 Exchange,Queue,Binding 的操作! 然后会这三个集合进行转换,转换后最终调用 rabbitTemplate 进行操作,操作的实际上就是 RabbitMQ 提供的 客户端操作的API!
    这也是在 Exchange,Queue,Binding 进行 @Bean 声明后,可以直接执行这些创建,绑定的操作的原因!

    3.png

    如有错误地方,欢迎指出!大家共同进步!

    相关文章

      网友评论

          本文标题:RabbitMQ整合Spring-AMQP-------Rabb

          本文链接:https://www.haomeiwen.com/subject/frofzqtx.html