Return消息机制
- Return Listener用于处理一些不可路由的消息
- 我们的消息生产者,通过指定一个Exchange和Routingkey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作。
-
某些情况下Exchange不存在或路由key路由不到,这时我们就需要监听这个不可达的消息,就要用return listener
image.png
Producer
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.6.99");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/**
* mandatory需要设置为true,监听器会接受到路由不可达的消息,然后进行处理
* 如果为false,则broker端自动删除该消息
*/
channel.basicPublish("exchange2","return.error",true,null,"hello world".getBytes());
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("handleReturn");
System.out.println("replyCode "+replyCode);
System.out.println("replyText "+replyText);
System.out.println("body "+new String(body));
}
});
}
}
Consumer
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.6.99");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("exchange2","topic",true);
channel.queueDeclare("queueName2", true, false, false, null);
channel.queueBind("queueName2","exchange2","return.key");
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume("queueName2",true,queueingConsumer);
while (true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" Received '" + message + "'");
}
}
}
网友评论