美文网首页
RabbitMQ学习(七)与Spring AMQP整合

RabbitMQ学习(七)与Spring AMQP整合

作者: kobe0429 | 来源:发表于2018-11-29 14:59 被阅读0次

    看到此篇的时候恭喜同学们已经对RabbitMQ的使用有了大致的掌握,前面6篇都是基于RabbitMQ的基础api实现的,从此篇开始我们将分别讨论与Spring AMQP、Spring Boot、Spring Cloud的整合操作,本篇主要讨论与Spring AMQP的整合流程。
    主要与以下6种组件的整合:
    RabbitAdmin
    SpringAMQP声明
    RabbitTemplate
    SimpleMessageListenerContainer
    MessageListenerAdapter
    MessageConverter

    1、RabbitAdmin

    (1)RabbitAdmin类可以很好的操作RabbitMQ,在Spring中直接进行注入即可
    (2)autoStartup必须设置true,否则Spring容器不会加载RabbitAdmin类
    (3)RabbitAdmin底层实现就是从Spring容器中获取Exchagge,Bingding,RoutingKey以及Queue的@Bean声明
    (4)然后使用RabbitTemplate的execute方法执行对应声明,修改,删除等操作

    2、SpringAMQP声明

    SpringAMQP声明即在rabbit基础API里面声明一个exchange、Bingding、queue。使用SpringAMQP去声明,就需要使用@bean的声明方式。

    3、RabbitTemplate

    Spring AMQP 提供了 RabbitTemplate 来简化 RabbitMQ 发送和接收消息操作

    4、SimpleMessageListenerContailer

    (1)简单消息监听容器:这个类非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足
    (2)设置事务特性,事务管理器,事务属性,事务容量,事务开启等
    (3)设置消息确认和自动确认模式,是否重回队列,异常捕获handler函数
    (4)设置消费者标签生成策略,是否独占模式,消费者属性等
    (5)simpleMessageListenerContailer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小,接收消息的模式等

    5、MessageListenerAdapter

    1)可以把一个没有实现MessageListener和ChannelAwareMessageListener接口的类适配成一个可以处理消息的处理器
    2)默认的方法名称为:handleMessage,可以通过setDefaultListenerMethod设置新的消息处理方法
    3)MessageListenerAdapter支持不同的队列交给不同的方法去执行。使用setQueueOrTagToMethodName方法设置,当根据queue名称没有找到匹配的方法的时候,就会交给默认的方法去处理。

    6、MessageConverter

    (1)消息转换器
    (2)在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要MessageConverter
    (3)实现MessageConverter接口,重写toMessage(java对象转换为Message) fromMessage(Message对象转换为java对象)
    (4)json转换器,自定义二进制转换器(比如图片类型,pdf,ppt,流媒体)

    下面我们用代码来演示以下RMQ是怎么整合到Spring中去的,以RabbitAdmin为例:
    1、在pom.xml中添加引用

               <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.6.5</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>   
    

    2、创建配置类,即为类添加配置注解和扫描注解

    package com.bfxy.spring;
    
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @ComponentScan({"com.bfxy.spring.*"})
    public class RabbitMQConfig {
    //TODO
    }
    

    3、在配置类中通过Bean注入的形式把组件注入到spring容器中

    package com.bfxy.spring;
    
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    
    @Configuration
    @ComponentScan({"com.bfxy.spring.*"})
    public class RabbitMQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses("10.136.197.244:5672");
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setVirtualHost("/");
            return connectionFactory;
        }
        
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
    

    以上,准备工作完成,让我们写一个测试类来验证一下吧

    package com.bfxy.spring;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    public class ApplicationTests {
        @Test
        public void contextLoads() {
        }
            @Autowired
        private RabbitAdmin rabbitAdmin;    
        @Test
        public void testAdmin() throws Exception {
            rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));       
            rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));     
            rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));       
            rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));        
            rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));     
            rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
                    rabbitAdmin.declareBinding(new Binding("test.direct.queue",
                    Binding.DestinationType.QUEUE,
                    "test.direct", "direct", new HashMap<>()));     
            rabbitAdmin.declareBinding(
                    BindingBuilder
                    .bind(new Queue("test.topic.queue", false))     //直接创建队列
                    .to(new TopicExchange("test.topic", false, false))  //直接创建交换机 建立关联关系
                    .with("user.#"));   //指定路由Key       
            rabbitAdmin.declareBinding(
                    BindingBuilder
                    .bind(new Queue("test.fanout.queue", false))        
                    .to(new FanoutExchange("test.fanout", false, false)));      
            //清空队列数据
            rabbitAdmin.purgeQueue("test.topic.queue", false);
        }
    }
    

    以上代码我们创建了一些交换机和队列,启动测试类,去RabbitMQ的管控台查看信息如下:


    RabbitMQ管控台信息.JPG

    相关文章

      网友评论

          本文标题:RabbitMQ学习(七)与Spring AMQP整合

          本文链接:https://www.haomeiwen.com/subject/svqfcqtx.html