美文网首页
概念--Return消息机制

概念--Return消息机制

作者: 爱吃豆包 | 来源:发表于2023-03-20 11:03 被阅读0次

Return Listener 用于处理一些不可路由的消息!

我们的消息生产者,通过指定一个Exchange和RoutingKey,把消息送达到一个队列中去,然后我们的消费者监听队列,进行消费处理操作!

但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener

在基础API中有一个关键的配置项:

Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!

image.png

Producer 生产者,MQ Broker 消息中心
Producer 发送一条消息,发现Exchange或者RoutingKey不存在,那么消息就会返回一个 Return Listener 事件到Producer生产者端,

消费者

 package com.example.rabbitmqapi.returnListener;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 *
 *      return Listener 消息机制  在生产端
 *
 * @author weiximei on 2019-04-09
 */
public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // IP地址或者域名
        connectionFactory.setHost("192.168.1.118");
        // 端口号默认是 5672
        connectionFactory.setPort(5672);
        // 指定虚拟主机路径, "/" 是rabbitMQ默认的虚拟主机路径
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("weiximei");
        connectionFactory.setPassword("weiximei");

        // 2.创建一个连接
        Connection connection = connectionFactory.newConnection();
        // 3.创建一个管道
        Channel channel = connection.createChannel();

        // 声明
        String exchange = "test_return_exchange";
        // 路由key
        String routingKey = "return.save";
        // 路由key(用来测试,当这个路由key不存在的时候,return消息机制)
        // String routingKeyError = "abc.save";

        // 队列名称
        String queueName = "test_return_queue";


        // 设置交换机
        /**
         * exchange: 交换机名称
         * type: 消息类型(topic,direct等等)
         * durable: 是否持久化,durable设置为true表示持久化,反之是非持久化,持久化的可以将交换器存盘,在服务器重启的时候不会丢失信息.
         * autoDelete: 是否自动删除,设置为TRUE则表是自动删除,自删除的前提是至少有一个队列或者交换器与这交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑,一般都设置为false
         * internal: 是否内置,如果设置 为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器的方式
         * arguments: 其它一些结构化参数比如:alternate-exchange
         */
        channel.exchangeDeclare(exchange, "topic", true, false, null);

        // 设置队列
        /**
         * queueName: 队列名称
         * declare 表示是否持久化消息类型
         * exclusive 是否为当前连接的专用队列,在连接断开后(close),会自动删除该队列,生产环境中应该很少用到吧。
         *          表示是否独占,true也就是说我这个消息只能是这一个channel进行消费
         *          场景:比如这个10条消息,让一个人消费,要顺序执行,如果多个人消息不能保证消息是顺序执行的
         * autoDelete 表示是否自动删除,true当没有任何消费者使用时(也就是这个队列没有一个消费者后, 也可以说是这个队列的最后一个消费者close后,断开了连接),自动删除该队列
         * arguments: 其它一些结构化参数比如:alternate-exchange
         */
        channel.queueDeclare(queueName, true, false, false, null);

        // 绑定交换机和队列关系
        channel.queueBind(queueName, exchange, routingKey);


        // 消费消息
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        // 设置channel消费者
        // queue 表示要监听的队列
        // autoAck 是否自动发送签收消息(也就是如果我收到消息了,我就自动发送一个消息确认通知)
        channel.basicConsume(queueName, true,queueingConsumer);

        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String bodyMsg = new String(delivery.getBody(), "UTF-8");
            System.out.println("消费消息:" + bodyMsg);
        }


    }

}


 package com.example.rabbitmqapi.returnListener;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *
 * 生产者
 *
 *      return Listener 消息机制
 *
 *      我们的消息生产者,通过指定一个Exchange和RoutingKey,
 *      把消息送达到一个队列中去,然后我们的消费者监听队列,
 *      进行消费处理操作!但是在某些情况下,如果我们在发送消息的时候,
 *      当前的exchange不存在或者指定的路由key路由不到,
 *      这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener
 *
 *       在基础API中有一个关键的配置项:
 *          Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
 *
 * @author weiximei on 2019-04-09
 */
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {

        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // IP地址或者域名
        connectionFactory.setHost("192.168.1.118");
        // 端口号默认是 5672
        connectionFactory.setPort(5672);
        // 指定虚拟主机路径, "/" 是rabbitMQ默认的虚拟主机路径
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("weiximei");
        connectionFactory.setPassword("weiximei");

        // 2.创建一个连接
        Connection connection = connectionFactory.newConnection();
        // 3.创建一个管道
        Channel channel = connection.createChannel();

        // 声明
        String exchange = "test_return_exchange";
        // 路由key
        String routingKey = "return.save";
        // 路由key(用来测试,当这个路由key不存在的时候,return消息机制)
        String routingKeyError = "abc.save";

        String body = "Hello RabbitMQ! Return Message!";

        /**
         * mandatory 如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
         */
        // exchange 表示交换机, 不设置交换机就输入空字符串, 就表示走第一个默认的交换机(AMQP default), 也就是说 routingKet 会和交换机绑定一起
        // routingKet 表示key键,发送到哪一个队列
        // mandatory 如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
        // props 表示消息的其他属性 (BasicProperties)
        // body 表示消息内容
        channel.basicPublish(exchange, routingKey, true, null, body.getBytes());


        /**
         * Return消息机制
         */
        channel.addReturnListener(new ReturnListener() {

            /**
             *
             * @param replyCode 响应码
             * @param replyText 响应的文本
             * @param exchange 交换机
             * @param routingKey 路由key
             * @param properties 消息的一些属性
             * @param body 消息具体的内容
             * @throws IOException
             */
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("===== 接收到的 return 的消息");
                System.out.println("replyCode: " + replyCode);
                System.out.println("replyText: " + replyText);
                System.out.println("exchange: " + exchange);
                System.out.println("routingKey: " + routingKey);
                System.out.println("properties: " + properties.toString());
                System.out.println("body: " + new String(body, "UTF-8"));

            }
        });

    }

}


相关文章

网友评论

      本文标题:概念--Return消息机制

      本文链接:https://www.haomeiwen.com/subject/dmcokdtx.html