什么是死信队列
当发生以下任何事件,那么消息将成为死信
-
消费者使用
basic.reject
或basic.nack
将requeue
参数设置为false
来否定该消息 -
消息在队列中存活时间到达设置的TTL
-
消息队列的消息数量超过了最大限制
如果配置了死信队列消息,那么该消息将会丢入死信队列,如果没有配置,消息将被丢弃。
过程
为每个需要使用死信的业务队列配置死信交换机,当消息成为死信的时候,将由死信交换机转发到死信队列,然后由死信队列的监听者去处理死信。
业务使用场景
延时任务
例如:下订单超过15分钟未支付自动取消订单,生成的未支付订单放到业务队列中并设置过期时间TTL,当业务队 列中的消息成为死信,交由死信交换机转发到死信队列,死信队列监听者再去判断订单是否完成支付,没 有完成支付的就关闭订单。
队列的消息设置了过期时间TTL,但RabbitMQ不会监听所有消息,之会监听队列第一个入队的消息,只有该消息过期了,才会监听下一个,因此有可能后进入但是TTL短的消息一直待在队列中,直到第一个入队的出队。
死信队列demo
准备
-
两个springboot项目,rabbitmq-provider,rabbitmq-comsumer
-
版本号:2.1.7.RELEASE
-
依赖:
<!--rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
application.yml
server: port: 9001 spring: application: name: rabbitmq-provider rabbitmq: host: 192.168.1.45 port: 5672 username: admin password: admin #virtual-host: xxxx listener: type: simple simple: # 信息被拒绝后是否重回队列 default-requeue-rejected: false # 消息确认: 手动 acknowledge-mode: manual
rabbitmq-provider
1、创建RabbitmqConfig.java
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitmqConfig {
public static final String BUSINESS_EXCHANGE = "dead.letter.business.exchange";
public static final String BUSINESS_QUEUE_A = "dead.letter.business.queue.a";
public static final String BUSINESS_QUEUE_B = "dead.letter.business.queue.b";
public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
public static final String DEAD_LETTER_QUEUE_A = "dead.letter.queue.a";
public static final String DEAD_LETTER_QUEUE_B = "dead.letter.queue.b";
public static final String DEAD_LETTER_QUEUE_A_ROUTING_KEY = "dead.letter.queue.a.routing.key";
public static final String DEAD_LETTER_QUEUE_B_ROUTING_KEY = "dead.letter.queue.b.routing.key";
/**
* 业务交换机
*/
@Bean
public FanoutExchange businessExchange() {
return new FanoutExchange(BUSINESS_EXCHANGE);
}
/**
* 死信交换机
*/
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
/**
* 业务队列A
*/
@Bean
public Queue businessQueueA() {
Map<String, Object> args = new HashMap<>(4);
// 当前队列绑定到死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// 当前队列的死信路由
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_A_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUE_A).withArguments(args).build();
}
/**
* 业务队列B
*/
@Bean
public Queue businessQueueB() {
Map<String, Object> args = new HashMap<>(4);
// 当前队列绑定到死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// 当前队列的死信路由
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_B_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUE_B).withArguments(args).build();
}
/**
* 死信队列A
*/
@Bean
public Queue deadLetterQueueA() {
return new Queue(DEAD_LETTER_QUEUE_A);
}
/**
* 死信队列B
*/
@Bean
public Queue deadLetterQueueB() {
return new Queue(DEAD_LETTER_QUEUE_B);
}
/**
* 绑定业务队列A到业务交换机
*/
@Bean
public Binding bindingBusinessQueueA() {
return BindingBuilder.bind(businessQueueA()).to(businessExchange());
}
/**
* 绑定业务队列B到业务交换机
*/
@Bean
public Binding bindingBusinessQueueB() {
return BindingBuilder.bind(businessQueueB()).to(businessExchange());
}
/**
* 绑定死信队列A到死信交换机
*/
@Bean Binding bindingDeadLetterQueueA() {
return BindingBuilder.bind(deadLetterQueueA())
.to(deadLetterExchange())
.with(DEAD_LETTER_QUEUE_A_ROUTING_KEY);
}
/**
* 绑定死信队列B到死信交换机
*/
@Bean Binding bindingDeadLetterQueueB() {
return BindingBuilder.bind(deadLetterQueueB())
.to(deadLetterExchange())
.with(DEAD_LETTER_QUEUE_B_ROUTING_KEY);
}
}
2、控制器提供一个发送消息的方法
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendMsg")
public String sendMsg() {
String msg = "hello dead letter queue";
rabbitTemplate.convertAndSend("dead.letter.business.exchange", null, msg);
return "success";
}
3、启动项目,调用接口发送消息
rabbitmq-consumer
1、创建业务队列消费者BusinessReceiver.java
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class BusinessReceiver {
@RabbitListener(queues = "dead.letter.business.queue.a")
public void receiverA(Message message, Channel channel) throws Exception {
// 这里由业务队列A的消费者来产生死信
String msg = new String(message.getBody());
System.out.println("receiverA拒绝消费消息A: " + msg);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
@RabbitListener(queues = "dead.letter.business.queue.b")
public void receiverB(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.out.println("receiverB接收消息B: " + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
2、创建死信队列消费者DeadLetterReceiver.java
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DeadLetterReceiver {
@RabbitListener(queues = "dead.letter.queue.a")
public void deadReceiverA(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.out.println("deadReceiverA:死信信息A:" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = "dead.letter.queue.b")
public void deadReceiverB(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.out.println("deadReceiverB:死信信息B:" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
3、启动项目,观察控制台输出
receiverB接收消息B: hello dead letter queue
receiverA拒绝消费消息A: hello dead letter queue
deadReceiverA:死信信息A:hello dead letter queue
网友评论