美文网首页
Spring AMQP RabbitAdmin 声明式配置

Spring AMQP RabbitAdmin 声明式配置

作者: Aska小强 | 来源:发表于2020-11-19 11:40 被阅读0次

    Spring AMQP RabbitAdmin 声明式配置

    Spring AMQP 中 比较重要的一个组件 就是 RabbitAdmin , 它封装了关于 声明 Queue ,Exchage , Binding 等等,也可以删除和清空队列数据等等操作,内部是通过RabbitTempate 去发送的请求,调用的还是原生的 amqp-client 的API 操作的,这里面有一定的坑 需要记录一下

    概述

    Spring 有很多不同的项目,其中就有对AMQP的支持

    spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ 以后可能有其他的基于AMQP协议的 消息中间件(otherbbit)的时候,那么Spring可能会提供 spring-otherbbit 具体实现。

    spring-amqp: 提供了如 AmqpAdmin AmqpTemplate 等等

    image-20201030135016874

    spring-rabbit: 提供了具体的实现 RabbitAdmin 和 RabbitTemplate 等等。。

    image-20201030135042265

    通过上面的概述以及图片,大概能够了解 spring-amqp 抽象层 和 spring-rabbit 的关系了

    1.Spring AMQP 整合 RabbitMQ

    虽然现在都是SpringBoot 直接整合RabbitMQ 但是SpringBoot 内部都是通过 Spring提供的AMQP 去整合的,所以我们一步一步来,先不直接使用SpringBoot自动配置,先自己动手整合

    1.1 引入Spring AMQP 依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    查看 依赖数可以发现,spring-boot-starter-amqp 它依赖了 spring-rabbit 它又依赖了操作rabbitmq的 amqp-client 原生API ,和 spring-amqp 抽象层

    image-20201101124015389

    1.2 配置RabbitConfig

    由于我们是 手动去使用 spring-amqp 去整合 rabbitmq ,那么是需要一些基本配置的如 ConnectionFactory 等等。

    添加ConnectionFactory ,使用spring-amqp提供的 CachingConnectionFactory 对原生的ConnectionFactory 进行了封装和维护 具有缓存功能

        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
            cachingConnectionFactory.setHost("127.0.0.1");
            cachingConnectionFactory.setPort(5672);
            cachingConnectionFactory.setUsername("johnny");
            cachingConnectionFactory.setPassword("johnny");
            cachingConnectionFactory.setVirtualHost("/");
            return cachingConnectionFactory;
        }
    
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            //关键!
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
    

    2. 组件 RabbitAdmin

    RabbitAdmin 提供了简单的方式 声明 队列 交换机 等 操作

    2.1 Api 分析

    先看看 AmqpAdmin的 方法 ,可以看到 抽象层定义了很多 声明方法

    image-20201101124832975

    RabbitAdmin 实现了AmqpAdmin 实现了 这些方法,底层使用 rabbitmq 的 amqp-client 原生api 去操作的

    image-20201101124908127

    2.2 基本使用

    可以看到 直接使用 rabbitAdmin的api 声明 queue exchange 和 binding 还是很简单的

    @Test
    public void testAdmin() {
    
        Queue queue = new Queue("hello-queue");
        //声明一个队列
        rabbitAdmin.declareQueue(queue);
            
        //有TopicExchange 有 DirectExchange 和 FanoutExchange 等等 
        TopicExchange topicExchange = new TopicExchange("topic.exchange001", true, false);
            //声明一个交换机
        rabbitAdmin.declareExchange(topicExchange);
            //声明 一个交换机和队列的绑定 
        rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(topicExchange).with("hello.*"));
    
    }
    

    2.3 挑一个 declareQueue 源码看看

    可以看到内部确实是使用 rebbitTemplate 去调用的

    image-20201101125239845

    内部使用的是 原生的 api 去声明队列

    image-20201101125521043

    3. RabbitAdmin 声明式配置 Queue Exchange Bingding

    上面的基本使用是 使用 rabbitAdmin 的 手动声明方式 去声明 Queue Exchange 等等。。下面来说基于 @Bean 配置 的方式

    可以直接通过 @Bean 配置的方式去 声明 Queue Exchange 等。。

    @Bean
    public Queue queue001() {
        return new Queue("queue-001", true);
    }
    
    @Bean
    public TopicExchange exchange001() {
        return new TopicExchange("exchange-001", true, false);
    }
    
    @Bean
    public Binding binding001() {
        return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
    }
    

    不过这里面有点小坑 以上配置并不能成功生效,必须配合 SimpleMessageListenerContainer 才行 ,下面来分析一下RabbitAdmin的源码 来分析一下 为什么@Bean的方式 也能够自动声明

    4. RabbitAdmin 源码分析 以及 问题

    3.1 实现 InitializingBean (afterPropertiesSet)

    可以看到RabbitAdmin 实现了InitializingBean 那么它肯定有个 afterPropertiesSet 去在Bean初始化结束后调用的

    public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware,
          BeanNameAware, InitializingBean 
    

    3.2重写 AmqpAdmin 的initalize()

    在上面 RabbitAdmin 组件 Api分析那块 可以看到 AmqpAdmin 抽象层提供了一个 initalize() 这里被实现了,并且在afterPropertiesSet中被调用

    image-20201101130401045

    3.3 获取IOC容器中的 Queue Exchange和 Binding 的Bean对象

    image-20201101130624846

    3.4 使用RabbitTemplate去 声明

    经过上面的源码分析就能看到 RabbitAdmin 如何基于@Bean配置 Queue Exchange等 的

    image-20201101130740232

    3.5 存在的问题(依赖SimpleMessageListenerContainer )

    那么这个 Lamda 编写的 ConnectionListener 什么时候被调用的呢 ,这就需要引入 SimpleMessageListenerContainer

    image-20201101130944501

    **SimpleMessageListenerContainer **继承 AbstractMessageListenerContainer ,而 AbstractMessageListenerContainer a

    实现了 Lifecycle ,它有 start 方法,这里AbstractMessageListenerContainer重写了它 ,内部调用 checkMismatchedQueues()方法

    image-20201101131356413 image-20201101131148481

    内部 调用 createConnection ,调用所有监听器的 onCreate 方法 而上面的 Lambda 表达式就会被调用 ,那个Lambda 表达式就是 一个 ConnectionListener

    image-20201101131802970

    最终会调用 这里的 initalize() 去根据@Bean 声明 Queue Exchange 和 Binding

    image-20201101132041530

    所以容器中 一定要有 SimpleMessageListenerContainer 使用它的 Lifecycle 的 start -> checkMismatchedQueues ->

    getConnectionFactory().createConnection() -> getConnectionListener().onCreate(this.connection) -> initialize();

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        simpleMessageListenerContainer.addQueues(queue001());
        return simpleMessageListenerContainer;
    }
    

    总结

    本篇主要讲解了 1.spring-amqp 和 rabbitmq 的整合它们的关系和配置 2. RabbitAdmin的 Api 和 基本使用

    3.RabbitAdmin的声明式配置 方式 4.RabbitAdmin的声明式配置的源码 ,以及 依赖的SimpleMessageListenerContainer 的源码部分 ,有了这些基础 后续再学习SpringBoot整合RabbitMq,去看它的 AutoConfig 等等 源码会更加清晰。

    相关文章

      网友评论

          本文标题:Spring AMQP RabbitAdmin 声明式配置

          本文链接:https://www.haomeiwen.com/subject/pobkiktx.html