美文网首页Rabbitmqspring bootJMS
RabbitMQ整合Spring AMQP(一)

RabbitMQ整合Spring AMQP(一)

作者: 若兮缘 | 来源:发表于2019-03-25 22:01 被阅读199次

    Spring AMQP项目是用于开发AMQP的解决方案。它对RabbitMQ底层的一些API进行了封装,使其更加的易用,并且提供了很多扩展功能。

    SpringAMQP核心内容

    • RabbitAdmin:管控组件
    • Spring AMQP声明
    • RabbitTemplate:消息模版组件
    • SimpleMessageListenerContainer:简单消息监听容器
    • MessageListenerAdapter:消息适配器
    • MessageConverter:消息转换器

    RabbitAdmin

    • RabbitAdmin类可以很好的操作RabbitMQ,在Spring中直接进行注入即可
    • 注意:autoStartup必须要设置为true,否则Spring容器不会加载RabbitAdmin类
    • RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean声明
    • 然后使用RabbitTemplate的execute方法执行对应的声明、修改、删除等一系列RabbitMQ基础功能操作
    • 例如:添加一个交换机、删除一个绑定、 清空一个队列里的消息等等

    RabbitAdmin演示

    1. 首先创建一个Spring Boot工程,这里使用Spring Tool Suite工具,选择导航菜单File --> New -->Spring Starter Project

    然后一直下一步就可以了,这里使用SpringBoot工程主要是方便测试。

    1. 引入Maven依赖
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>3.6.5</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    1. 创建RabbitAdmin配置类
    @Configuration
    @ComponentScan({"com.rxy.spring.*"})
    public class RabbitMQConfig {
        
        @Bean
        public ConnectionFactory connectionFactory() {
            //注入并配置ConnectionFactory
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses("192.168.43.157:5672");
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setVirtualHost("/");
            return connectionFactory;
        }
        
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            //注入RabbitAdmin
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
    }
    

    RabbitAdmin简单使用

    1. 声明交换机、声明队列、建立交换机与队列绑定关系
    package com.rxy.spring;
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ApplicationTests {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
        //在测试类中使用RabbitAdmin
        @Test
        public void testAdmin() throws Exception {
            //声明交换机(非持久化,服务重启,这些交换机就会消失)
            rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
            rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
            rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
            
            //声明队列
            rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
            rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
            rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
            
            //建立绑定关系
            rabbitAdmin.declareBinding(new Binding("test.direct.queue", 
                    Binding.DestinationType.QUEUE, 
                    "test.direct", "direct", new HashMap<>()));
        }
    }
    
    1. 换一种方式一步创建交换机、队列以及建立绑定关系
            //一步到位
            rabbitAdmin.declareBinding(
                    BindingBuilder
                    .bind(new Queue("test.topic.queue", false)) //直接创建队列
                    .to(new TopicExchange("test.topic", false, false)) //直接创建交换机 建立关联关系
                    .with("user.#") //指定路由Key
                    );
            
            //fanout交换机不需要routingKey
            rabbitAdmin.declareBinding(
                    BindingBuilder
                    .bind(new Queue("test.fanout.queue", false))        
                    .to(new FanoutExchange("test.fanout", false, false)));
    

    运行测试类,通过查看管控台:http://192.168.43.157:15672,可以看到我们刚声明的交换机和队列,以及它们之间的绑定关系

    其他方法的使用,同样可以通过管控台进行相关功能的验证。

            //删除交换机
            rabbitAdmin.deleteExchange("test002_exchange");
            //删除队列
            rabbitAdmin.deleteQueue("test002");
            //解除绑定关系
            rabbitAdmin.removeBinding(new Binding("test_topic_queue", 
                    Binding.DestinationType.QUEUE, 
                    "test_topic_exchange", "topic.#", new HashMap<>()));
            //清空队列消息(可以先通过管控台向该队列发送一条消息)
            rabbitAdmin.purgeQueue("test.topic.queue", false);
    

    RabbitAdmin源码分析

    前面我们提到RabbitAdmin会自动查找类型为Queue、Exchange和 Binding的bean,并将它们逐一进行声明创建。那么是否真的如此呢。

    可以进入RabbitAdmin的源码,首先它实现了InitializingBean接口,InitializingBean接口为bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候会执行该方法。

    进入该方法,找到如下代码,这里也证实了autoStartup一定要设置为true才行。

    继续往下它调用了initialize()方法,查看该方法的实现,这段代码就是获取所有的交换机、队列和绑定对象。

    继续往下,从spring容器中获取了相关的对象后,就调用了execute方法真正的在rabbitmq服务器上声明队列、交换机、绑定关系,当然这些操作肯定是在Bean初始化之后执行,等这些对象都加载好了之后再去做。

    SpringAMQP 声明

    在RabbitMQ基础API里面声明一个Exchange、声明一个绑定、一个队列

    使用SpringAMQP去声明,就需要使用SpringAMQP的如下模式,即声明@Bean方式

    声明式配置使用

    1. 设置交换机类型
    2. 将队列绑定到交换机
    • FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
    • HeadersExchange:通过添加属性key-value匹配
    • DirectExchange:按照routingkey分发到指定队列
    • TopicExchange:多关键字匹配队列

    在RabbitMQConfig类中进行声明

        @Bean  
        public TopicExchange exchange001() {  
            return new TopicExchange("topic001", true, false);  
        }  
    
        @Bean  
        public Queue queue001() {  
            return new Queue("queue001", true); //队列持久化  
        }  
        
        @Bean  
        public Binding binding001() {  
            return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");  
        }  
        
        @Bean  
        public TopicExchange exchange002() {  
            return new TopicExchange("topic002", true, false);  
        }  
        
        @Bean  
        public Queue queue002() {  
            return new Queue("queue002", true); //队列持久化
        }
        
        @Bean  
        public Binding binding002() {  
            return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");  
        } 
        
        @Bean  
        public Queue queue003() {  
            return new Queue("queue003", true); //队列持久化
        }
        
        @Bean  
        public Binding binding003() {  
            return BindingBuilder.bind(queue003()).to(exchange001()).with("mq.*");  
        } 
        
        @Bean  
        public Queue queue_image() {  
            return new Queue("image_queue", true); //队列持久化
        }
        
        @Bean  
        public Queue queue_pdf() {
            return new Queue("pdf_queue", true); //队列持久化
        }
    

    运行测试类,查看管控台是否生成了相关的队列和交换机。例如查看topic001的绑定关系

    相关文章

      网友评论

        本文标题:RabbitMQ整合Spring AMQP(一)

        本文链接:https://www.haomeiwen.com/subject/mohivqtx.html