看到此篇的时候恭喜同学们已经对RabbitMQ的使用有了大致的掌握,前面6篇都是基于RabbitMQ的基础api实现的,从此篇开始我们将分别讨论与Spring AMQP、Spring Boot、Spring Cloud的整合操作,本篇主要讨论与Spring AMQP的整合流程。
主要与以下6种组件的整合:
RabbitAdmin
SpringAMQP声明
RabbitTemplate
SimpleMessageListenerContainer
MessageListenerAdapter
MessageConverter
1、RabbitAdmin
(1)RabbitAdmin类可以很好的操作RabbitMQ,在Spring中直接进行注入即可
(2)autoStartup必须设置true,否则Spring容器不会加载RabbitAdmin类
(3)RabbitAdmin底层实现就是从Spring容器中获取Exchagge,Bingding,RoutingKey以及Queue的@Bean声明
(4)然后使用RabbitTemplate的execute方法执行对应声明,修改,删除等操作
2、SpringAMQP声明
SpringAMQP声明即在rabbit基础API里面声明一个exchange、Bingding、queue。使用SpringAMQP去声明,就需要使用@bean的声明方式。
3、RabbitTemplate
Spring AMQP 提供了 RabbitTemplate 来简化 RabbitMQ 发送和接收消息操作
4、SimpleMessageListenerContailer
(1)简单消息监听容器:这个类非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足
(2)设置事务特性,事务管理器,事务属性,事务容量,事务开启等
(3)设置消息确认和自动确认模式,是否重回队列,异常捕获handler函数
(4)设置消费者标签生成策略,是否独占模式,消费者属性等
(5)simpleMessageListenerContailer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小,接收消息的模式等
5、MessageListenerAdapter
1)可以把一个没有实现MessageListener和ChannelAwareMessageListener接口的类适配成一个可以处理消息的处理器
2)默认的方法名称为:handleMessage,可以通过setDefaultListenerMethod设置新的消息处理方法
3)MessageListenerAdapter支持不同的队列交给不同的方法去执行。使用setQueueOrTagToMethodName方法设置,当根据queue名称没有找到匹配的方法的时候,就会交给默认的方法去处理。
6、MessageConverter
(1)消息转换器
(2)在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要MessageConverter
(3)实现MessageConverter接口,重写toMessage(java对象转换为Message) fromMessage(Message对象转换为java对象)
(4)json转换器,自定义二进制转换器(比如图片类型,pdf,ppt,流媒体)
下面我们用代码来演示以下RMQ是怎么整合到Spring中去的,以RabbitAdmin为例:
1、在pom.xml中添加引用
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、创建配置类,即为类添加配置注解和扫描注解
package com.bfxy.spring;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan({"com.bfxy.spring.*"})
public class RabbitMQConfig {
//TODO
}
3、在配置类中通过Bean注入的形式把组件注入到spring容器中
package com.bfxy.spring;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
@Configuration
@ComponentScan({"com.bfxy.spring.*"})
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("10.136.197.244:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
以上,准备工作完成,让我们写一个测试类来验证一下吧
package com.bfxy.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
public class ApplicationTests {
@Test
public void contextLoads() {
}
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void testAdmin() throws Exception {
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
rabbitAdmin.declareBinding(new Binding("test.direct.queue",
Binding.DestinationType.QUEUE,
"test.direct", "direct", new HashMap<>()));
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("test.topic.queue", false)) //直接创建队列
.to(new TopicExchange("test.topic", false, false)) //直接创建交换机 建立关联关系
.with("user.#")); //指定路由Key
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("test.fanout.queue", false))
.to(new FanoutExchange("test.fanout", false, false)));
//清空队列数据
rabbitAdmin.purgeQueue("test.topic.queue", false);
}
}
以上代码我们创建了一些交换机和队列,启动测试类,去RabbitMQ的管控台查看信息如下:
RabbitMQ管控台信息.JPG
网友评论