一、认识死信队列
首先了解一下什么是死信,官方将其翻译为单词Dead Letter
。死信,其实这是 RabbitMQ 中一种消息类型,和普通的消息在本质上没有什么区别,更多的是一种业务上的划分。如果队列中的消息出现以下情况之一,就会变成死信:
- 消息接收时被拒绝会变成死信,例如调用
channel.basicNack
或channel.basicReject
,并设置requeue
为false
。 - 如果给消息队列设置了消息的过期时间(
x-message-ttl
),或者发送消息时设置了当前消息的过期时间,当消息在队列中的存活时间大于过期时间时,就会变成死信。 - 如果给消息队列设置了最大容量(
x-max-length
),队列已经满了,后续再进来的消息会溢出,无法被队列接收就会变成死信。
如果不对死信做任何处理,则消息会被直接丢弃。一般死信都是那些在业务上未被正常处理的消息,我们可以考虑用一个队列来接收这些死信消息,接收死信消息的队列就是死信队列
,它就是一个普通的消息队列,没有什么特殊的,只是我们在业务上赋予了它特殊的职责罢了,后期再根据实际情况处理死信队列中的消息即可。
二、准备工作
创建一个 SpringBoot 项目,添加 RabbitMQ 依赖,并添加需要的配置:
# rabbitmq 相关配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
# 设置消费者需要手动确认消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual
接下来创建一个死信队列、交换机,并完成绑定,这里的交换机也可以称作死信交换机
,交换机的类型没有特殊的要求根据实际需求选择即可:
@Configuration
public class DeadLetterRabbitMQConfig {
// 创建交换机
@Bean
DirectExchange deadLetterExchange() {
return new DirectExchange("dead.letter.exchange", true, false);
}
// 创建死信队列
@Bean
Queue deadLetterQueue() {
return new Queue("dead.letter.queue", true);
}
// 绑定队列和交换机
@Bean
Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead.letter");
}
}
三、死信队列用法
这里我们根据文章开头描述的,正常消息变成死信的几种场景分别来看死信队列的用法。
1、消息被拒绝
首先创建处理业务消息的交换机、队列:
@Configuration
public class BusinessRabbitMQConfig {
// 创建交换机
@Bean
DirectExchange businessExchange() {
return new DirectExchange("business.exchange", true, false);
}
// 创建业务消息队列
@Bean
Queue businessQueue1() {
HashMap<String, Object> args = new HashMap<>();
// 设置死信交换机
args.put("x-dead-letter-exchange", "dead.letter.exchange");
// 设置死信交换机绑定队列的routingKey
args.put("x-dead-letter-routing-key", "dead.letter");
return new Queue("business.queue1", true, false, false, args);
}
@Bean
Binding businessBinding1() {
return BindingBuilder.bind(businessQueue1()).to(businessExchange()).with("business1");
}
}
创建business.queue1
时,我们给它配置了前边创建死信交换机、以及 routingKey,这样就完成了业务消息队列和死信队列的绑定,业务消息被拒绝后,就会进入死信队列。
注意,如果队列已经创建,之后再修改队列的配置参数,则不会生效,需要删除掉队列重新创建
接下来,创建消费者来消费business.queue1
中的业务消息,为了突出效果,直接让消费者拒绝掉消息,不了解消息确认机制的可以翻阅之前的文章:
@Service
public class BusinessReceiveService {
@RabbitListener(queues = "business.queue1")
public void receive(String msg, Channel channel, Message message) {
try {
// 拒绝消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
System.out.println("拒绝的业务消息:" + msg);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
发送消息的服务很简单:
@Service
public class BusinessSendService {
@Autowired
RabbitTemplate rabbitTemplate;
public void send(String routingKey, String message) {
rabbitTemplate.convertAndSend("business.exchange", routingKey, message);
System.out.println("发送的业务消息:" + message);
}
}
启动项目后,通过测试类发送一条消息:
@SpringBootTest
class DeadLetterApplicationTests {
@Autowired
BusinessSendService businessSendService;
@Test
void contextLoads() {
String routingKey = "business1";
String message = routingKey + "-data-" + System.currentTimeMillis();
businessSendService.send(routingKey, message);
}
}
按照预期,消息最终会流入死信队列。可以通过 RabbitMQ 的后台管理界面查看具体的效果:
2、消息过期
在BusinessRabbitMQConfig
中再添加一个business.queue2
业务消息队列,设置队列中消息的过期时间为10秒,同样设置好死信队列:
@Bean
Queue businessQueue2() {
HashMap<String, Object> args = new HashMap<>();
// 设置队列中消息的过期时间,单位毫秒
args.put("x-message-ttl", 10000);
args.put("x-dead-letter-exchange", "dead.letter.exchange");
args.put("x-dead-letter-routing-key", "dead.letter");
return new Queue("business.queue2", true, false, false, args);
}
@Bean
Binding businessBinding2() {
return BindingBuilder.bind(businessQueue2()).to(businessExchange()).with("business2");
}
不用给business.queue2
配置消费者,重启项目,直接发送一条消息,让它自动过期即可:
String routingKey = "business2";
String message = routingKey + "-data-" + System.currentTimeMillis();
businessSendService.send(routingKey, message);
等待10秒后,消息会自动流入死信队列:
除了给队列设置消息的超时时间,也可以在发送消息时配置,有兴趣的可以自己尝试:
public void send2(String routingKey, String message) {
MessagePostProcessor processor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000");
return message;
}
};
rabbitTemplate.convertAndSend("business.exchange", routingKey, message);
System.out.println("发送的业务消息:" + message);
}
3、消息溢出
由于消息队列满了,导致消息溢出而进入死信队列的场景也比较简单。
在BusinessRabbitMQConfig
中再添加一个business.queue3
业务消息队列,设置队列的大小为10,同样设置好死信队列:
@Bean
Queue businessQueue3() {
HashMap<String, Object> args = new HashMap<>();
// 设置消息队列的大小
args.put("x-max-length", 10);
args.put("x-dead-letter-exchange", "dead.letter.exchange");
args.put("x-dead-letter-routing-key", "dead.letter");
return new Queue("business.queue3", true, false, false, args);
}
@Bean
Binding businessBinding3() {
return BindingBuilder.bind(businessQueue3()).to(businessExchange()).with("business3");
}
business.queue3
也不设置消费者,重启项目,发送15条消息:
for (int i = 0; i < 15; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
String routingKey = "business3";
String message = routingKey + "-data-" + System.currentTimeMillis();
businessSendService.send(routingKey, message);
}
按照预期business.queue3
最终会有10条消息,剩下的5条进入死信队列:
四、小结
关于死信队列的用法就介绍到这里了,还是很简单的。在一些重要的业务场景中,为了防止有些消息由于各种原因未被正常消费而丢失掉,可以考虑使用死信队列来保存这些消息,以方便后期排查问题使用,这样总比后期再去复现错误要简单的多。其实,延时队列也可以结合死信队列来实现,本文消息过期例子就是它的雏形,后边的文章我们再详细探讨。
本文完!
网友评论