Return Listener 用于处理一些不可路由的消息!
我们的消息生产者,通过指定一个Exchange和RoutingKey,把消息送达到一个队列中去,然后我们的消费者监听队列,进行消费处理操作!
但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener
在基础API中有一个关键的配置项:
Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
![](https://img.haomeiwen.com/i4084822/adce07606c957072.png)
Producer 生产者,MQ Broker 消息中心
Producer 发送一条消息,发现Exchange或者RoutingKey不存在,那么消息就会返回一个 Return Listener 事件到Producer生产者端,
消费者
package com.example.rabbitmqapi.returnListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
*
* return Listener 消息机制 在生产端
*
* @author weiximei on 2019-04-09
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// IP地址或者域名
connectionFactory.setHost("192.168.1.118");
// 端口号默认是 5672
connectionFactory.setPort(5672);
// 指定虚拟主机路径, "/" 是rabbitMQ默认的虚拟主机路径
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("weiximei");
connectionFactory.setPassword("weiximei");
// 2.创建一个连接
Connection connection = connectionFactory.newConnection();
// 3.创建一个管道
Channel channel = connection.createChannel();
// 声明
String exchange = "test_return_exchange";
// 路由key
String routingKey = "return.save";
// 路由key(用来测试,当这个路由key不存在的时候,return消息机制)
// String routingKeyError = "abc.save";
// 队列名称
String queueName = "test_return_queue";
// 设置交换机
/**
* exchange: 交换机名称
* type: 消息类型(topic,direct等等)
* durable: 是否持久化,durable设置为true表示持久化,反之是非持久化,持久化的可以将交换器存盘,在服务器重启的时候不会丢失信息.
* autoDelete: 是否自动删除,设置为TRUE则表是自动删除,自删除的前提是至少有一个队列或者交换器与这交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑,一般都设置为false
* internal: 是否内置,如果设置 为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器的方式
* arguments: 其它一些结构化参数比如:alternate-exchange
*/
channel.exchangeDeclare(exchange, "topic", true, false, null);
// 设置队列
/**
* queueName: 队列名称
* declare 表示是否持久化消息类型
* exclusive 是否为当前连接的专用队列,在连接断开后(close),会自动删除该队列,生产环境中应该很少用到吧。
* 表示是否独占,true也就是说我这个消息只能是这一个channel进行消费
* 场景:比如这个10条消息,让一个人消费,要顺序执行,如果多个人消息不能保证消息是顺序执行的
* autoDelete 表示是否自动删除,true当没有任何消费者使用时(也就是这个队列没有一个消费者后, 也可以说是这个队列的最后一个消费者close后,断开了连接),自动删除该队列
* arguments: 其它一些结构化参数比如:alternate-exchange
*/
channel.queueDeclare(queueName, true, false, false, null);
// 绑定交换机和队列关系
channel.queueBind(queueName, exchange, routingKey);
// 消费消息
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
// 设置channel消费者
// queue 表示要监听的队列
// autoAck 是否自动发送签收消息(也就是如果我收到消息了,我就自动发送一个消息确认通知)
channel.basicConsume(queueName, true,queueingConsumer);
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String bodyMsg = new String(delivery.getBody(), "UTF-8");
System.out.println("消费消息:" + bodyMsg);
}
}
}
package com.example.rabbitmqapi.returnListener;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*
* 生产者
*
* return Listener 消息机制
*
* 我们的消息生产者,通过指定一个Exchange和RoutingKey,
* 把消息送达到一个队列中去,然后我们的消费者监听队列,
* 进行消费处理操作!但是在某些情况下,如果我们在发送消息的时候,
* 当前的exchange不存在或者指定的路由key路由不到,
* 这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener
*
* 在基础API中有一个关键的配置项:
* Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
*
* @author weiximei on 2019-04-09
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// IP地址或者域名
connectionFactory.setHost("192.168.1.118");
// 端口号默认是 5672
connectionFactory.setPort(5672);
// 指定虚拟主机路径, "/" 是rabbitMQ默认的虚拟主机路径
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("weiximei");
connectionFactory.setPassword("weiximei");
// 2.创建一个连接
Connection connection = connectionFactory.newConnection();
// 3.创建一个管道
Channel channel = connection.createChannel();
// 声明
String exchange = "test_return_exchange";
// 路由key
String routingKey = "return.save";
// 路由key(用来测试,当这个路由key不存在的时候,return消息机制)
String routingKeyError = "abc.save";
String body = "Hello RabbitMQ! Return Message!";
/**
* mandatory 如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
*/
// exchange 表示交换机, 不设置交换机就输入空字符串, 就表示走第一个默认的交换机(AMQP default), 也就是说 routingKet 会和交换机绑定一起
// routingKet 表示key键,发送到哪一个队列
// mandatory 如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
// props 表示消息的其他属性 (BasicProperties)
// body 表示消息内容
channel.basicPublish(exchange, routingKey, true, null, body.getBytes());
/**
* Return消息机制
*/
channel.addReturnListener(new ReturnListener() {
/**
*
* @param replyCode 响应码
* @param replyText 响应的文本
* @param exchange 交换机
* @param routingKey 路由key
* @param properties 消息的一些属性
* @param body 消息具体的内容
* @throws IOException
*/
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("===== 接收到的 return 的消息");
System.out.println("replyCode: " + replyCode);
System.out.println("replyText: " + replyText);
System.out.println("exchange: " + exchange);
System.out.println("routingKey: " + routingKey);
System.out.println("properties: " + properties.toString());
System.out.println("body: " + new String(body, "UTF-8"));
}
});
}
}
网友评论