什么是死信队列
在RabbitMQ中一条消息出现下面三种情况就会成为死信:
- 消息被nack或者reject且requeue参数为false
- 消息因TTL过期
- 队列超出长度限制
死信会被RabbitMQ特殊处理,如果配置了死信队列则会进入到死信队列中。如果没有配置,该消息则会被丢弃。
如何配置死信队列
配置死信队列非常简单,只需要完成下面几步即可。下面是我们需要用到的相关配置:
名称 | 值 | 备注 |
---|---|---|
业务Exchange | order.direct | direct类型Exchange |
业务Queue | order.queue | |
业务Routing Key | order | |
死信Exchange | dead.direct | direct类型Exchange |
死信Queue | dead.queue | |
死信Routing Key | dead |
正常业务配置
表格中前三行是我们正常的业务配置,与平常配置不一样的地方在于Queue的配置,在Queue中我们在其Arguments中增加两个配置:x-dead-letter-exchange和x-dead-letter-routing-key。其中x-dead-letter-exchange中的值代表了消息称为死信之后将会被发送到哪个Exchange,而x-dead-letter-routing-key表示变成死信之后它的RoutingKey。所以我们增加的配置如下:
x-dead-letter-exchange = dead.direct
x-dead-letter-routing-key = dead
死信队列相关配置
我们在上面正常的业务Queue中增加了x-dead-letter-exchange和x-dead-letter-routing-key的配置,接下来就是创建配置中对应的Exchange和Queue。这里的Exchange和Queue与我们平常使用的并没有什么区别。
配置代码如下
下面是定义Exchange和Queue相关的主要代码:
/**
* 正常的业务Exchange和Queue
*/
channel.exchangeDeclare(Config.BUSINESS_EXCHANGE, BuiltinExchangeType.DIRECT, true);
Map<String, Object> arg = new HashMap<>();
arg.put("x-dead-letter-exchange", Config.DEAD_EXCHANGE);
arg.put("x-dead-letter-routing-key", Config.DEAD_ROUTING_KEY);
channel.queueDeclare(Config.BUSINESS_QUEUE, true, false, false, arg);
channel.queueBind(Config.BUSINESS_QUEUE,Config.BUSINESS_EXCHANGE,Config.BUSINESS_ROUTING_KEY);
/**
* 死信Exchange和Queue
*/
channel.exchangeDeclare(Config.DEAD_EXCHANGE, BuiltinExchangeType.DIRECT,true);
channel.queueDeclare(Config.DEAD_QUEUE,true,false,false,new HashMap<>());
channel.queueBind(Config.DEAD_QUEUE,Config.DEAD_EXCHANGE,Config.DEAD_ROUTING_KEY);
使用测试
生产者发送消息
byte[] msg1 = "ok".getBytes(StandardCharsets.UTF_8);
channel.basicPublish(Config.BUSINESS_EXCHANGE,Config.BUSINESS_ROUTING_KEY,new AMQP.BasicProperties(),msg1);
for (int i = 0; i < 5; i++) {
byte[] msg = String.format("死信消息[%d]",i).getBytes(StandardCharsets.UTF_8);
channel.basicPublish(Config.BUSINESS_EXCHANGE,Config.BUSINESS_ROUTING_KEY,new AMQP.BasicProperties(),msg);
}
我们向正常的业务Exchange发送了6条消息,消息的内容如下:
ok
死信消息[0]
死信消息[1]
死信消息[2]
死信消息[3]
死信消息[4]
消费者消费消息
//业务队列消息消费
channel.basicConsume(Config.BUSINESS_QUEUE,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
long deliveryTag = envelope.getDeliveryTag();
if (Objects.equals("ok",message)){
channel.basicAck(deliveryTag,false);
log.info("消息正常签收");
}else {
//requeue参数需要设置成false,表示不再进入原队列。如果为false则不会进入死信队列
channel.basicNack(deliveryTag,false,false);
log.info("消息未签收,进入到死信队列,消息内容:{}",message);
}
}
});
//死信队列消息消费
channel.basicConsume(Config.DEAD_QUEUE,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("死信队列收到消息内容:{}",message);
}
});
上面是业务队列和死信队列消费的主要代码。对于业务队列收到的消息,如果消息内容是ok,我们则认为消费成功。而对于其他内容的消息,我们都认为消费失败,在手动ACK时我们都选择了NACK让其进入到死信队列。这里面的一个关键点在于消息被NACK了且它的requeue参数为false。如果requeue设置为true,它并不会进入到死信队列的,这一点需要知道。
最后运行的打印结果如下:
14:47:34.425 [pool-1-thread-3] INFO com.buydeem.share.rabbit.ConsumerApp - 消息正常签收
14:47:34.428 [pool-1-thread-3] INFO com.buydeem.share.rabbit.ConsumerApp - 消息未签收,进入到死信队列,消息内容:死信消息[0]
14:47:34.429 [pool-1-thread-3] INFO com.buydeem.share.rabbit.ConsumerApp - 消息未签收,进入到死信队列,消息内容:死信消息[1]
14:47:34.430 [pool-1-thread-3] INFO com.buydeem.share.rabbit.ConsumerApp - 消息未签收,进入到死信队列,消息内容:死信消息[2]
14:47:34.430 [pool-1-thread-3] INFO com.buydeem.share.rabbit.ConsumerApp - 消息未签收,进入到死信队列,消息内容:死信消息[3]
14:47:34.430 [pool-1-thread-3] INFO com.buydeem.share.rabbit.ConsumerApp - 消息未签收,进入到死信队列,消息内容:死信消息[4]
14:47:34.431 [pool-1-thread-4] INFO com.buydeem.share.rabbit.ConsumerApp - 死信队列收到消息内容:死信消息[0]
14:47:34.431 [pool-1-thread-4] INFO com.buydeem.share.rabbit.ConsumerApp - 死信队列收到消息内容:死信消息[1]
14:47:34.431 [pool-1-thread-4] INFO com.buydeem.share.rabbit.ConsumerApp - 死信队列收到消息内容:死信消息[2]
14:47:34.431 [pool-1-thread-4] INFO com.buydeem.share.rabbit.ConsumerApp - 死信队列收到消息内容:死信消息[3]
14:47:34.431 [pool-1-thread-4] INFO com.buydeem.share.rabbit.ConsumerApp - 死信队列收到消息内容:死信消息[4]
从结果可以看出生产者发送的6条消息中,我们的消费者只成功的处理了一条,而其他5条最后都进入了死信队列,被死信队列中的消费者消费了。
消息进入死信队列的变化
原本正常的业务消息进入到死信队列之后它的消息体内容并不会发生变化,但是消息的头部会发生变化。下面我直接通过RabbitMQ的后台管理端来进行下面的测试:
-
首先我们先Exchange中发送消息
exchange发送消息.png -
接下来在业务队列中获取消息
不同的策略最后的处理逻辑.png -
消息进入到死信队列再次获取消息
对比消息在业务队列和死信队列中的数据我们可以发现在死信队列中的消息头部多出来需要内容,这部分内容主要记载了该消息原始的一些Exchange、Queue、Routing Key等相关信息,同时还有因为什么原因进入到死信对了等等。
总结
简单的概括来说死信队列有点像一个兜底机制,对于最初我们说的那三种情况的一个保底。当消息遇见上面三种情况,我们可以通过业务队列中设置的x-dead-letter相关参数让消息进入到别的队列中。如果不使用死信队列我们自己也能实现死信队列同样的效果,只不过需要我们在遇见上面的情况时手动的将消息推送到其他队列,这样做会比较麻烦。而死信队列正式解决了这个麻烦,让我们可以通过简单配置完成上面业务。
网友评论