1.概念
在上述的三种情况中,第一种在代码中就是我们之前帖子中在CustomConsumer类中,类似于channel.basicQos()的方法;
队列达到最大长度的意思就是 队列能塞100条,如果再路由过来一条消息,那么这个消息就会进入死信队列
2.设置
image.png3.实战
与之前的代码一样,不过我们在一个队列中是需要一个参数arguments,里面指定队列的死信队列,然后还需要声明一个死信队列的exchange还有queue才行。
consumer:
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 获取Connection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
String exchangeName = "dlx_exchange";
String routingKey = "dlx.#";
String queueName = "dlx_queue";
//4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key
channel.exchangeDeclare(exchangeName, "topic", true);
//死信队列的声明:
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange","dlx.exchange");
channel.queueDeclare(queueName, true, false, false, arguments);
channel.queueBind(queueName, exchangeName, routingKey);
channel.exchangeDeclare("dlx.exchange", "topic", true);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
//5 创建消费者
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
producer:
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 获取Connection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
//4 指定我们的消息投递模式: 消息的确认模式
channel.confirmSelect();
String exchangeName = "dlx_exchange";
String routingKey = "dlx.save";
//5 发送一条消息
String msg = "Hello RabbitMQ Send dlx message!";
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.build();
channel.basicPublish(exchangeName, routingKey, properties, msg.getBytes());
}
我们首先启动consumer代码,让他创建好 exchange以及队列,然后因为我们要测试死信是否可以传输到死信队列,所以不能让这个消息被消费掉,我们通过过期时间来测试,所以把consumer关闭掉,然后启动producer来生产消息。
在下面的图中可以看到我们生产的消息已经正常的路由到了相应的队列,但是过了10秒钟之后,因为没有消费端消费这条消息,到了过期时间之后,这条消息就被路由到了指定的死信队列。
网友评论