RabbitMQ整合Spring AMQP
1.1RabbitAdmin
代码实战
- maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
然后新建一个配置类:
@Configuration
@ComponentScan("com.example.streamsample.*")
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
接下来我们就可以进行测试这个功能了:
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void contextLoads(){
}
@Test
public void testAdmin(){
rabbitAdmin.declareExchange(new DirectExchange("test.direct",false,false));
rabbitAdmin.declareExchange(new TopicExchange("test.topic",false,false));
rabbitAdmin.declareExchange(new FanoutExchange("test.fanout",false,false));
rabbitAdmin.declareQueue(new Queue("test.direct.queue",false));
rabbitAdmin.declareQueue(new Queue("test.topic.queue",false));
rabbitAdmin.declareQueue(new Queue("test.fanout.queue",false));
rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE,
"test.direct","direct",new HashMap<>()));
//链式编程
rabbitAdmin.declareBinding(
BindingBuilder.bind(new Queue("test.direct.queue",false))
.to(new DirectExchange("test.direct",false,false))
.with("user.#")
);
//因为fanoutExchange 路由信息直接到队列,所以不需要路由键,所以也就没有with
rabbitAdmin.declareBinding(
BindingBuilder.bind(new Queue("test.fanout.queue",false))
.to(new FanoutExchange("test.fanout",false,false))
);
rabbitAdmin.purgeQueue("test.direct.queue",false);
}
可以在代码中看到,我们使用rabbitAdmin就可以完成很多很多的操作了,创建exchange,创建队列,创建绑定关系,甚至清空队列的操作;
1.2 Spring AMQP声明
1.3 消息模板RabbitTemplate
代码实战
我们在上一个帖子的基础上在RabbitMQConfig中加入下面配置:
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
return new RabbitTemplate(connectionFactory);
}
然后在测试类中加入:
@Test
public void testSendMessage(){
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc","信息描述..");
messageProperties.getHeaders().put("type","自定义消息类型..");
Message message = new Message("Hello RabbitMQ".getBytes(),messageProperties);
rabbitTemplate.convertSendAndReceive("test.topic", "spring.ad", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.out.println("----添加额外的配置信息-----");
message.getMessageProperties().getHeaders().put("desc","修改后的配置信息");
message.getMessageProperties().getHeaders().put("attr","修改后的属性信息");
return message;
}
});
}
注意:发送消息的时候,肯定是首先要创建这个exchange以及queue以及绑定关系,因为在前面帖子,我已经创建过了,而且rabbitmq又没有重启过,虽然创建的exchange,queue不是持久化的,但是因为没有重启rabbitmq服务,所以就还存在着呢,所以我这边就直接可以运行这段代码,来使用rabbittemplate即可。
发送完之后我们就可以在rabbitmq的管理界面来查看消息是否已经发送,是否已经修改了部分参数了:
网友评论