消息模板组件 - RabbitTemplate
- 我们在与SpringAMQP整合的时候进行发送消息的关键类
- 该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口ConfirmCallback、返回值确认接口 ReturnCallback等等。
- 同样我们需要进行注入到Spring容器中,然后直接使用
- 在与Spring整合时需要实例化,但是在与SrpingBoot整合时,在配置文件里添加配置即可
RabbitTemplate的使用
声明RabbitTemplate,在RabbitMQConfig类中声明
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
使用RabbitTemplate发送消息
MessagePostProcessor
类通常用来设置消息的Header以及消息的属性,它允许在消息被转换器处理后对其进行进一步的修改,可以通过它对发送的消息进行统一的设置。
@Test
public void testSendMessage() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
//设置消息属性
messageProperties.getHeaders().put("desc", "信息描述..");
messageProperties.getHeaders().put("type", "自定义消息类型..");
Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
//发送消息
rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.err.println("------添加额外的设置---------");
message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
return message;
}
});
}
运行测试类,进入管控台的Queues菜单,点击queue001
,然后下面有个Get messages
,点击按钮就可以获取消息了。
使用RabbitTemplate发送消息2
可以调用send方法发送消息,也可以直接发送字符串消息。
@Test
public void testSendMessage2() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq发送消息".getBytes(), messageProperties);
//第三个参数必须是Message类型
rabbitTemplate.send("topic001", "spring.abc", message);
//直接发送字符串消息,会自动转换
rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
}
运行测试类,依然是进入管控台的Queues菜单,点击相应的队列,点击Get messages
获取消息。
消息容器 - SimpleMessageListenerContainer
- 简单消息监听容器,这个类非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足
- 监听队列(多个队列)、自动启动、自动声明功能
- 设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等
- 设置消费者数量、最小最大数量、批量消费
- 设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数
- 设置消费者标签生成策略、是否独占模式、消费者属性等
- 设置具体的监听器、消息转换器等等
特性说明
-
SimpleMessageListenerContainer
可以进行动态设置, 比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等 - 很多基于RabbitMQ的自制定化后端管控台在进行动态设置的时候,也是根据这一特去实现的。所以可以看出SpringAMQP非常的强大
SimpleMessageListenerContainer使用
这里简单列举一些,还可以设置很多很多属性,包括后面要介绍的消息适配器和消息转换器。
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//监控队列,可以同时监控多个
container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
//设置当前消费者数量
container.setConcurrentConsumers(1);
//设置消费者并发数量的上限,不能小于ConcurrentConsumers,默认为ConcurrentConsumers,消费者会按需添加
//指的是消费者的最大并行度是多少
container.setMaxConcurrentConsumers(5);
//设置是否重回队列
container.setDefaultRequeueRejected(false);
//设置签收模式:AUTO(自动签收)、MANUAL(手工签收)、NONE(不签收,没有任何操作)
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//设置消费端标签策略:就是在消费端生成自己的标签时可以指定一个生成策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//设置消息监听器
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.err.println("----------消费者: " + msg);
}
});
return container;
}
运行Application类,选择Channel菜单,点击表格中的channel,可以看到队列对应的消费者情况,Consumer tag就是根据我们指定的规则生成的。
关闭Application,运行测试类进行消息发送
@Test
public void testSendMessage2() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq发送消息".getBytes(), messageProperties);
//第三个参数必须是Message类型
rabbitTemplate.send("topic001", "spring.abc", message);
//直接发送字符串消息,会自动转换
rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
}
监听器监听到了消息并打印了如下内容
----------消费者: mq发送消息
----------消费者: hello object message send!
----------消费者: hello object message send!
网友评论