死信队列:(英文名)DLX, Dead-Letter-Exchange
利用DLX, 当消息在一个队列中变成死信(dead message) 之后,它能被重新publish到另一个Exchange, 这个Exchange就是DLX
什么是死信队列?
就是你的消息发布后,没有消费者去消费!就变成了死信了!在任何MQ产品中都有这个情况!
在RabbitMQ中,变成死信队列后,它会把这个消息重新publish到另一个Exchange中,这个Exchange就是DLX
消息变成死信有以下几种情况:
1.消息被拒绝(basic.reject / basic.nack) 并且requeue=fasle (队列重发设为了false)
2.消息TTL过期,(超过任何消息有效限制之后,就变成了死信)
3.队列达到最大长度。(如果消息的最大大小满了,后面的消息就被送进死信队里中)
步骤:
1.DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性(这个Exchange只需要正常的去定义就好, 和平常没却别)
2.当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列
3.可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0以前支持的immediate参数的功能!
死信队列的设置:
首先需要设置死信队列的Exchange和Queue,然后进行绑定:
Exchange: dlx.exchange
Queue: dlx.queue
RoutingKey:#
然后正常的声明交换机、队列、绑定,只不过我们需要在队列加上一个参数:
argument.put("x-dead-letter-exchange", “dlx.exchange”)
这个参数表示,当消息正常的路由到队列的时候,但是没有一个消费者去消费的话,就会被重新路由到 dlx.exchange
交换机上
这样消息在过期,requeue、队列在达到最大长度时,消息就可以直接路由到死信队列!
延时队列实现
可以通过死信队列来实现!给消息设置 TTL 有效期时间,比如设置 10 秒,那么在 10 秒后,这个消息成为了死信,然后会转发到另外一个 交换机上进行消息处理!
生产者
package com.example.rabbitmqapi.dlx;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
/**
* 死信队列 DLX
*
* 消息变成死信有以下几种情况:
* 1.消息被拒绝(basic.reject / basic.nack) 并且requeue=fasle (队列重发设为了false)
* 2.消息TTL过期,(超过任何消息有效限制之后,就变成了死信)
* 3.队列达到最大长度。(如果消息的最大大小满了,后面的消息就被送进死信队里中)
*
* 消费者
*
* 消费端 ACK 和 重回队列
*
* @author weiximei
*/
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 创建连接工厂并设置属性
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 创建连接
Connection connection = factory.newConnection();
// 3. 创建channel
Channel channel = connection.createChannel();
// 4. 声明Exchange
String exchangeName = "test_dlx_exchange";
String exchangeType = "topic";
String routingKey = "dlx.#";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
// 5. 声明消息队列
String queueName = "test_dlx_queue";
/**
* 声明死信队里的属性,出现死信后,会重新发送到这个 dlx.exchange 上
*/
Map<String,Object> agruments = new HashMap<>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
// 需要把 agruments 属性,要设置到声明队列上
channel.queueDeclare(queueName, true, false, false, agruments);
// 6. 绑定队列和Exchange
channel.queueBind(queueName, exchangeName, routingKey);
/**
* 声明死信队列
*
* 接下来,就可以更具接收到的死信消息,做相应的处理, 也就做一个消息的接收,处理消息(消费者)
* 在这里只是声明了,并没对消息做处理
*
*/
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#"); // # 表示所有队里都匹配接收
// 7. 设置消费者为自定义的消费者, 手工签收ACK,则必须 autoAck=false
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
消费者
package com.example.rabbitmqapi.dlx;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
/**
* 死信队列 DLX
*
*
* 生产者
*
* 在消费端的 ACK 和 重回队列
*
* @author weiximei
*/
public class Producer {
public static void main(String[] args) throws Exception {
// 1. 创建ConnectionFactory, 并设置属性
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 创建连接
Connection connection = factory.newConnection();
// 3. 创建channel
Channel channel = connection.createChannel();
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.saye";
// 发送消息
String msg = "自定义消费者, 消息发送 : Hello, DLX ";
for (int i = 0; i < 5; i++) {
// 携带额外的参数
Map<String,Object> headerMap = new HashMap<>();
headerMap.put("num", i);
// 添加额外的属性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 是否持久化 1 不持久化,2 持久化
.contentEncoding("UTF-8") // 设置字符集
.headers(headerMap).build();
/**
* mandatory 如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
*/
// exchange 表示交换机, 不设置交换机就输入空字符串, 就表示走第一个默认的交换机(AMQP default), 也就是说 routingKet 会和交换机绑定一起
// routingKet 表示key键,发送到哪一个队列
// mandatory 如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
// props 表示消息的其他属性 (BasicProperties)
// body 表示消息内容
channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
}
// 关闭连接
channel.close();
connection.close();
}
}
自定义消费者
package com.example.rabbitmqapi.dlx;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
/**
* 自定义消费者
*
* @author weiximei
*/
public class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
// 第一参数:consumerTag 同一个会话, consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1,可以做此消息处理通道的名字。
// 因此 envelope.deliveryTag 可以用来回传告诉 rabbitmq 这个消息处理成功 清除此消息(basicAck方法)。
// 第二个参数: envelope 表示消息类型信息
// 第三个参数:properties 表示消息路由头的其他属性等
// 第三个参数: body 消息内容
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("-------------自定义消费者------------");
System.out.println("consumerTag : " + consumerTag);
System.out.println("envelope : " + envelope);
System.out.println("properties : " + properties);
System.out.println("body : " + new String(body));
// 获取消息的额外参数
Integer num = (Integer) properties.getHeaders().get("num");
// 比如我让这个的额外参数等于 0 的时候,就进行重回队列
if (num == 0) {
// deliveryTag 参数是消息唯一标识
// multiple 是否是批量的,一般为false
// requeue 是否重回队列, true 是,false 不是
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
/**
* 手工签收, 第二个参数表示是否批量签收
*/
// envelope.getDeliveryTag() 是消息的唯一标识
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
网友评论