RabbitAdmin
RabbitAdmin类是针对RabbitMQ管理端进行封装操作,比如:Exchange操作、Queue操作,Binding绑定操作等,操作起来简单便捷!
基本方法
- declareExchange 创建Exchange操作
- deleteExchange 删除Exchange操作
- declareQueue 创建Queue操作
- deleteQueue 删除Queue操作
- purgeQueue 清空队列
- declareBinding 绑定操作,绑定Queue,Exchange
- removeBinding 删除绑定操作
使用
声明 ConnectionFactory 连接工厂
/**
* 创建 RabbitMQ 连接工厂
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
// rabbitmq 服务地址
connectionFactory.setAddresses("192.168.72.138:5672");
// 用户名
connectionFactory.setUsername("weiximei");
// 密码
connectionFactory.setPassword("weiximei");
// 虚拟机路径
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
声明 RabbitAdmin 类
注意: 在声明 RabbitAdmin 类的时候,一定要把 setAutoStartup 设置为true,才能在spring启动的时候,进行加载
/**
* 创建 RabbitAdmin 类,这个类封装了对 RabbitMQ 管理端的操作!
*
* 比如:Exchange 操作,Queue 操作,Binding 绑定 等
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
测试类中测试
/**** RabbitAdmin 操作 ****/
@Autowired
private RabbitAdmin rabbitAdmin;
/**
* 交换机操作
*/
@Test
public void testAdminExchange() {
// 创建交换机, 类型为 direct
// durable 参数表示是否持久化
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
// 创建交换机,类型为 topic
// durable 参数表示是否持久化
rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
// 创建交换机,类型为 fanout
// durable 参数表示是否持久化
rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
}
/**
* 队列操作
*/
@Test
public void testAdminQueue() {
// 创建队列
// durable 参数表示是否持久化
rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
// 创建队列
// durable 参数表示是否持久化
rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
// 创建队列
// durable 参数表示是否持久化
rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
}
/**
* 绑定操作
*/
@Test
public void testAdminBinding() {
/**
* 两种写法都可以,都选择绑定 队列 或者 交换机
*/
/**
* destination 需要绑定队列的名字
* DestinationType 绑定类型,
* Binding.DestinationType.QUEUE 表示是队列绑定
* Binding.DestinationType.EXCHANGE 表示交换机绑定
*
* exchange 交换机名称
* routingKey 路由key
* arguments 额外参数(比如绑定队列,可以设置 死信队列的参数)
*/
rabbitAdmin.declareBinding(new Binding("test.direct.queue",
Binding.DestinationType.QUEUE, "test.direct", "routing_direct", new HashMap<>()));
/**
* 链式写法
*
*/
rabbitAdmin.declareBinding(
BindingBuilder.bind(new Queue("test.topic.queue", false)) // 直接创建队列
.to(new TopicExchange("test.topic", false, false)) // 直接创建交换机,并建立关联关系
.with("routing_topic") // 指定路由 key
);
/**
* 链式写法
*
* FanoutExchange 交换机,和路由key没有绑定关系,因为他是给交换机内所有的 queue 都发送消息!
*
*/
rabbitAdmin.declareBinding(
BindingBuilder.bind(new Queue("test.fanout.queue", false)) // 直接创建队列
.to(new FanoutExchange("test.fanout", false, false)) // 直接创建交换机,并建立关联关系
);
}
/**
* 其他操作
*/
@Test
public void testAdminOther() {
// 清空队列
// noWait 参数是否需要等待: true 表示需要,false 表示不需要
// 也就是需要清空的时候,我需要等待一下,在清空(会自动等待几秒钟)
rabbitAdmin.purgeQueue("test.fanout.queue", false);
}
还可在启动的时候,就执行Exchange、Queue、Binding的操作
/**
* 创建 一个 topic 的交换机
* @return
*/
@Bean
public TopicExchange topicExchange() {
// 创建交换机,类型为 topic
// durable 参数表示是否持久化
return new TopicExchange("test.topic", true, false);
}
/**
* 创建一个 队列
* @return
*/
@Bean
public Queue queue() {
// 创建队列
// durable 参数表示是否持久化
return new Queue("test.topic.queue", true);
}
/**
* 创建一个 绑定操作 ,绑定 test.topic 交换机 和 test.topic.queue 队列
* @return
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()) // 直接创建队列
.to(topicExchange()) // 直接创建交换机,并建立关联关系
.with("routing_topic"); // 指定路由 key
}
完整的配置类
package com.example.rabbitmq.spring.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ 配置类
*
* @author weiximei
*/
@Configuration
@ComponentScan({"com.example.rabbitmq.spring.*"})
public class RabbitMQConfig {
/**
* 创建 RabbitMQ 连接工厂
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
// rabbitmq 服务地址
connectionFactory.setAddresses("192.168.72.138:5672");
// 用户名
connectionFactory.setUsername("admin");
// 密码
connectionFactory.setPassword("admin");
// 虚拟机路径
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
/**
* 创建 RabbitAdmin 类,这个类封装了对 RabbitMQ 管理端的操作!
*
* 比如:Exchange 操作,Queue 操作,Binding 绑定 等
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
/**
* 除了 在测试类,里面是直接操作之外
*
* 还可以直接在 用 Bean 声明,这样会直接执行
*/
/**
* 交换机类型:
*
* FanoutExchange 将消息分发到所有的绑定队列,无 routingKey 的概念。
* HeadersExchange 通过添加属性 key-value 匹配
* DirectExchange 按照 routingKey 分发到知道队列
* TopicExchange 多关键字匹配
*
*
*/
/**
* 创建 一个 topic 的交换机
* @return
*/
@Bean
public TopicExchange topicExchange() {
// 创建交换机,类型为 topic
// durable 参数表示是否持久化
return new TopicExchange("test.topic", true, false);
}
/**
* 创建一个 队列
* @return
*/
@Bean
public Queue queue() {
// 创建队列
// durable 参数表示是否持久化
return new Queue("test.topic.queue", true);
}
/**
* 创建一个 绑定操作 ,绑定 test.topic 交换机 和 test.topic.queue 队列
* @return
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()) // 直接创建队列
.to(topicExchange()) // 直接创建交换机,并建立关联关系
.with("routing_topic"); // 指定路由 key
}
}
简单分析下 RabbitAdmin 这个类
进入 RabbitAdmin 这个类
CD4CFE82-E19B-4F6A-AB16-71B3A965F2A6.png发现实现 InitializingBean 这个接口,这个接口里面有个 afterPropertiesSet 方法,这个方法是 Bean初始化完成后,可以做一些操作!
RabbitAdmin 在这个 afterPropertiesSet 方法中做了哪些操作?
- 首先有一个 synchronized 同步代码块的锁,锁的是一个 Object 类,这个Object 是一个生命周期监控,意思就是 锁定的是生命周期,防止被 二次声明 RabbitAdmin 这个类。
- 接下来有个 if 判断,判断 RabbitAdmin 是否初始化了运行,或者 autoStartup 这个值是否为false ,false 表示spring 初始化的时候,不加载RabbitAdmin这个类。如果 RabbitAdmin 运行了,或者 autoStartup 为 false 就直接 return,进行结束这个 afterPropertiesSet 方法的执行!
-
接着调用 this.connectionFactory.addConnectionListener 添加连接监听
1.png
发现在调用 this.connectionFactory.addConnectionListener 的时候,有个原子操作,会先执行 CAS 操作,如果值成功替换为了 true,就继续往下执行,进行 initialize() 方法,这个 initialize() 里面会创建三个Collection 集合,
分别是 : Collection<Exchange>, Collection<Queue>, Collection<Binding>
这三个集合分别存放的是,我们之前声明 Exchange,Queue,Binding 的操作! 然后会这三个集合进行转换,转换后最终调用 rabbitTemplate 进行操作,操作的实际上就是 RabbitMQ 提供的 客户端操作的API!
这也是在 Exchange,Queue,Binding 进行 @Bean 声明后,可以直接执行这些创建,绑定的操作的原因!
如有错误地方,欢迎指出!大家共同进步!
网友评论