美文网首页
RabbitMQ的Return消息机制

RabbitMQ的Return消息机制

作者: 若兮缘 | 来源:发表于2019-03-18 08:14 被阅读0次
    Return机制
    • Return Listener 用于处理一些不可路由的消息!
    • 我们的消息生产者,通过指定一个Exchange 和Routingkey,把消息送达到某一个队列中去, 然后我们的消费者监听队列,进行消费处理操作!
    • 但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener
    Return机制实现
    1. 添加return监听:addReturnListener,生产端去监听这些不可达的消息,做一些后续处理,比如说,记录下消息日志,或者及时去跟踪记录,有可能重新设置一下就好了
    2. 发送消息时,设置Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
    Return机制演示
    生产端
    public class ReturnProducer {
    
        public static void main(String[] args) throws Exception {
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.43.157");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            //2 获取Connection
            Connection connection = connectionFactory.newConnection();
            //3 通过Connection创建一个新的Channel
            Channel channel = connection.createChannel();
            
            String exchange = "test_return_exchange";
            //String routingKey = "return.save";
            String routingKeyError = "abc.save";
            
            String msg = "Hello RabbitMQ Return Message";
            //添加return监听
            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(int replyCode, String replyText, String exchange,
                        String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //replyCode:响应码    replyText:响应信息
                    System.err.println("---------handle  return----------");
                    System.err.println("replyCode: " + replyCode);
                    System.err.println("replyText: " + replyText);
                    System.err.println("exchange: " + exchange);
                    System.err.println("routingKey: " + routingKey);
                    //System.err.println("properties: " + properties);
                    System.err.println("body: " + new String(body));
                }
            });
            //5 发送一条消息,第三个参数mandatory:必须设置为true
            channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
        }
    }
    
    消费端
    public class ReturnConsumer {
        
        public static void main(String[] args) throws Exception {
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.43.157");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            //2 获取Connection
            Connection connection = connectionFactory.newConnection();
            //3 通过Connection创建一个新的Channel
            Channel channel = connection.createChannel();
            
            String exchangeName = "test_return_exchange";
            String routingKey = "return.#";
            String queueName = "test_return_queue";
            //4 声明交换机和队列,然后进行绑定设置路由Key
            channel.exchangeDeclare(exchangeName, "topic", true, false, null);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //5 创建消费者 
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName, true, queueingConsumer);
            
            while(true){
                Delivery delivery = queueingConsumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.err.println("消费者: " + msg);
            }
        }
    }
    
    运行说明

    先启动消费端,访问管控台:http://192.168.43.157:15672,检查Exchange和Queue是否设置OK,然后启动生产端。
    由于生产端设置的是一个错误的路由key,所以消费端没有任何打印,而生产端打印了如下内容

    ---------handle  return----------
    replyCode: 312
    replyText: NO_ROUTE
    exchange: test_return_exchange
    routingKey: abc.save
    body: Hello RabbitMQ Return Message
    

    如果我们将 Mandatory 属性设置为false,对于不可达的消息会被Broker直接删除,那么生产端就不会进行任何打印了。如果我们的路由key设置为正确的,那么消费端能够正确消费,生产端也不会进行任何打印。

    好了,这就是RabbitMQ为我们提供的Return机制,非常的简单实用,在实际工作中也是必不可少的,对于一些不可达的消息都可以第一时间进行监听然后做一些后续处理。

    相关文章

      网友评论

          本文标题:RabbitMQ的Return消息机制

          本文链接:https://www.haomeiwen.com/subject/weummqtx.html