美文网首页
概念--Confirm确认消息

概念--Confirm确认消息

作者: 爱吃豆包 | 来源:发表于2023-03-20 11:03 被阅读0次

    理解Confirm消息确认机制:

    消息的确认,是指生产者投递消息后,如果Broket(RabbitMQ消息中心)收到消息,则会给我生产者一个应答。

    生产者进行接收应答,用来确定这条消息是否正常的发送到Broket,这种方式也是消息的可靠性投递的核心保障!

    image.png

    producer 生产者,MQ Broker就是RabbitMQ消息中心

    生产者发送一条消息,到 Broker 里面,然后会有一个 Broker confirm 到生产者,生产者会有一个 Config Listener 监听!这个整个过程是异步的,也就是我发送了消息就可以不用管了,只需要监听就好了!

    如何实现Confirm 确认消息?

    第一步:在 channel 上开启确认模式:channel.confirmSelect();

    第二部:在 cahnnel 上添加监听:addConfirmListener ,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送,或记录日志等后续处理!

    什么情况下会 ack呢?

    比如 磁盘满了,rabbitMQ出现了异常,key的容量达到上限了

    如果我 Ack(确认) 和 Nack(没有确认) 都没有接受到消息呢?

    需要用到 可靠性投递!

    这个情况比如出现了,网络闪断,导致我没有收到,那么就需要用定时任务,去抓取状态,然后进行消息重发!(前提是基于 可靠性投递)

    生产者

     package com.example.rabbitmqapi.confirm;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConfirmListener;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 生产者
     *
     *      是在生产者处理
     *
     *      消息的确认,是指生产者投递消息后,如果Broket(RabbitMQ消息中心)收到消息,
     *      则会给我生产者一个应答。
     *      生产者进行接收应答,用来确定这条消息是否正常的发送到Broket,
     *      这种方式也是消息的可靠性投递的核心保障!
     *
     * @author weiximei on 2019-04-01
     */
    public class Producer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            // 1.创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // IP地址或者域名
            connectionFactory.setHost("192.168.1.118");
            // 端口号默认是 5672
            connectionFactory.setPort(5672);
            // 指定虚拟主机路径, "/" 是rabbitMQ默认的虚拟主机路径
            connectionFactory.setVirtualHost("/");
            connectionFactory.setUsername("weiximei");
            connectionFactory.setPassword("weiximei");
    
            // 2.获取Connection
            Connection connection = connectionFactory.newConnection();
    
            // 3. 创建 channel,也就是连接信道
            Channel channel = connection.createChannel();
    
            /**
             * 4.指定我们的消息确认模式:消息的确认模式
             */
            channel.confirmSelect();
    
            // 5. 声明
            // 交换机名称
            String exchangeName = "test_confirm_exchange";
            String routingKey = "confirm_save";
    
            String msg = "Hello RabbitMQ,Save confirm";
    
            // 6.发送消息
    
            /**
             * channel.basicPublish 方法有个其他的重载方法
             * 其中有个参数
             *
             * mandatory的作用:
             *         当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
             *         那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,
             *         出现上述情况broker会直接将消息丢弃;通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中,
             *         否则就将消息return给发送者;
             *
             */
    
            channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
    
    
            // 7.添加一个确认监听
            //异步监听确认和未确认的消息
            channel.addConfirmListener(new ConfirmListener() {
    
                /**
                 * 异步执行的,消息确认有可能是批量确认的,是否批量确认在于返回的multiple的参数,
                 * 此参数为bool值,如果true表示批量执行了deliveryTag这个值以前的所有消息,
                 * 如果为false的话表示单条确认。
                 */
    
                /**
                 *
                 * @param deliveryTag 消息的唯一标签,用来确认消息,标记消息
                 * @param multiple 如果true表示批量执行了deliveryTag这个值以前的所有消息,
                 *                  如果为false的话表示单条确认。
                 * @throws IOException
                 */
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
                }
    
                /**
                 *
                 * @param deliveryTag 消息的唯一标签,用来确认消息,标记消息
                 * @param multiple 如果true表示批量执行了deliveryTag这个值以前的所有消息,
                 *                 如果为false的话表示单条确认。
                 * @throws IOException
                 */
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println(" ------  no ack! ------  没有被ack确认,标识: " + deliveryTag);
                }
            });
    
    
    
    
    
        }
    
    
    
    
    
    }
    
    
    
    package com.example.rabbitmqapi.confirm;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 消费者
     *
     *      confirm 是在生产端
     *
     * @author weiximei on 2019-04-01
     */
    public class Consumer {
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    
            // 1.创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // IP地址或者域名
            connectionFactory.setHost("192.168.1.118");
            // 端口号默认是 5672
            connectionFactory.setPort(5672);
            // 指定虚拟主机路径, "/" 是rabbitMQ默认的虚拟主机路径
            connectionFactory.setVirtualHost("/");
            connectionFactory.setUsername("weiximei");
            connectionFactory.setPassword("weiximei");
    
            // 2.获取Connection
            Connection connection = connectionFactory.newConnection();
    
            // 3. 创建 channel,也就是连接信道
            Channel channel = connection.createChannel();
    
            /**
             * 4.指定我们的消息确认模式:消息的确认模式
             */
            channel.confirmSelect();
    
            // 5. 声明
            // 交换机名称
            String exchangeName = "test_confirm_exchange";
            String routingKey = "confirm_save";
            String queueName = "test_confirm_queue";
    
            // 6.声明 exchange
            // 交换机名称,交换机类型,是否持久化
            channel.exchangeDeclare(exchangeName, "topic",true);
    
            // 7. 声明 队列
            /**
             * declare 表示是否持久化消息类型
             * exclusive 是否为当前连接的专用队列,在连接断开后(close),会自动删除该队列,生产环境中应该很少用到吧。
             *           表示是否独占,true也就是说我这个消息只能是这一个channel进行消费
             *           场景:比如这个10条消息,让一个人消费,要顺序执行,如果多个人消息不能保证消息是顺序执行的
             * autoDelete 表示是否自动删除,true当没有任何消费者使用时(也就是这个队列没有一个消费者后, 也可以说是这个队列的最后一个消费者close后,断开了连接),自动删除该队列
             * arguments 其他的一些参数
             */
            channel.queueDeclare(queueName, true, false, false,null);
    
    
            // 8.绑定队列 (也就是把队列绑定到某交换机)
            channel.queueBind(queueName, exchangeName, routingKey);
    
            // 9.创建消费者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            // 创建消费者
            // 消费的队列,是否自动ack,消费者是谁
            channel.basicConsume(queueName, true, queueingConsumer);
    
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("接收到的消息:" + msg);
            }
    
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:概念--Confirm确认消息

          本文链接:https://www.haomeiwen.com/subject/avcokdtx.html