美文网首页
RabbitMQ(十)消费者应答和发送者确认

RabbitMQ(十)消费者应答和发送者确认

作者: 薛晨 | 来源:发表于2016-11-06 21:28 被阅读9632次

    文档:

    https://www.rabbitmq.com/confirms.html

    介绍:
    使用像RabbitMQ这样的消息代理的系统是分布式的,所以消息是否能到达对端或是被成功处理是无法保证的。

    所以,无论生产者还是消费者,都需要一种消息传递和处理的确认机制。

    消费者传递应答

    当RabbitMQ发送一个消息给消费者,他需要知道消息是什么时候被成功投递了。

    1)投递的标识:Delivery Tags

    当一个消费者向RabbitMQ注册后,RabbitMQ会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它在一个channel中唯一代表了一次投递。delivery tag的唯一标识范围限于channel.

    delivery tag是单调递增的正整数,客户端获取投递的方法用用dellivery tag作为一个参数。

    2)应答模式

    根据所使用的确认模式,RabbitMQ可以考虑在发送(写入TCP套接字)之后立即成功传送消息,或者接收到显式(“手动”)客户机确认时成功传送。 手动发送的确认可以是肯定的或否定的,并且使用以下协议方法之一:

    • basic.ack is used for positive acknowledgements
    • basic.nack is used for negative acknowledgements
    • basic.reject is used for negative acknowledgements but has one limitations compared to basic.nack

    积极的应答告诉RabbitMQ记录一个消息被投递了,像 basic.reject 这样的消极应答有着同样的作用。

    积极应答假定消息被成功处理,消极应答表示投递没被处理,还是要被删掉。

    3)一次应答多个投递

    为了减少网络流量,手动应答可以被批处理。

    ack unack 的mutiple参数设置为true,则可以一次性应答delivery_tag小于等于传入值的所有应答。

    4)通道预取设置(QoS)

    因为消息发送到客户端是异步的,在任何给定时刻在信道上通常存在多个消息竞争。此外,来自客户端的手动应答本质上也是异步的。所以总是存在一个消息未确认的滑动窗口。开发人员通常希望限制此窗口的大小,以避免消费者端的无界缓冲区问题。

    可以通过使用 basicQos 这个方法来设置预取的个数。这个数值定义了一个通道最多有多少个未确认的消息。

    值得重申的是,投递流程和手动客户端确认是完全异步的。 因此,如果在投递中已经有消息的情况下改变预取值,则会出现自然竞争条件,并且在信道上可能暂时存在多于预取未确认消息数量。

    5)客户端错误:双重应答和未知 tag

    如果客户端对同一个 delivery tag 应答超过一次,rabbitMQ会返回一个通道错误:

    PRECONDITION_FAILED - unknown delivery tag 100

    如果一个未知的 delivery tag 被使用的话,会返回同样的错误。

    发送者确认

    使用标准AMQP 0-9-1,保证消息不丢失的唯一方法是使用事务 - 使通道事务,发布消息,提交。 在这种情况下,事务不必要地重量级,并且吞吐量减少了250倍。为了补救这一点,引入了确认机制。 它模仿协议中已经存在的消费者确认机制。

    为了启用确认机制,客户端发送 confirm.select 方法。 根据是否设置了 no-wait,代理可以用confirm.select-ok进行响应。 一旦在一个通道上使用 confirm.select 方法,它就被认为处于确认模式。 事务通道不能进入确认模式,一旦通道处于确认模式,它不能进行事务。

    一旦通道处于确认模式,代理和客户端计数消息(计数从第一个确认选择的1开始计数)。 然后,代理程序在通过在同一个通道上发送basic.ack来处理消息时确认消息。 delivery-tag字段包含确认消息的序列号。 代理还可以在basic.ack中设置多个字段,以指示已经处理了直到并且包括具有序列号的消息的所有消息。

    1)消极确认

    在特殊情况下,代理无法成功处理消息,代理将发送basic.nack而不是basic.ack。 在这个上下文中,basic.nack的字段与basic.ack中的相应字段具有相同的含义,并且应忽略requeue字段。 通过nack一个或多个消息,代理指示它不能处理消息并拒绝对它们负责; 在这一点上,客户端可以选择重新发布消息。

    在将通道置于确认模式后,所有后续发布的消息将被确认或nack一次。 不能保证消息被多久确认一次。 没有消息将被确认和nack。

    只有在负责队列的Erlang进程中发生内部错误时,才会传递basic.nack。

    2)消息多久被确认

    对于一个不可路由的消息,一旦交换器证实消息不可能路由到任何队列,代理会发布一个确认。

    如果消息发布设置为mandatory,basic.return在basic.ack之前发送到客户端。 对于否定确认(basic.nack)也是如此。

    对于可路由消息,当所有队列接受消息时,发送basic.ack。 对于路由到持久队列的持久消息,这意味着持久存储到磁盘。 对于镜像队列,这意味着所有镜像都接受了消息。

    3)持久化消息的ACK延迟

    路由到持久队列的持久消息的basic.ack是在将消息保存到磁盘后才发送的。 RabbitMQ消息存储器在间隔(几百毫秒)后将消息批量保存到磁盘,以最小化fsync(2)调用的数量,或者当队列空闲时。 这意味着在恒定负载下,basic.ack的延迟可以达到几百毫秒。 为了提高吞吐量,强烈建议应用程序异步处理确认(作为流)或发布批量消息,并等待未完成的确认。 具体的API在不同的客户端库之间有所不同。

    4)确认和保证投递

    如果代理在消息写入磁盘之前崩溃,将丢失持久消息。 在某些条件下,这导致代理行为诡异。

    例如,考虑这种情况:

    • 客户端向持久队列发布持久消息
    • 客户端消费了队列中的消息(注意消息是持久的,队列持久),但是还没有确认
    • 代理挂了并重启
    • 客户端重连并开始消费

    在这一点上,客户端可以合理地假设消息将被再次传送。

    在上述情景下,client 有理由认为消息需要被(broker)重新 deliver 。但这并非事实:重启(有可能)会令 broker 丢失消息。为了确保持久性,client 应该使用 confirm 机制。如果 publisher 使用的 channel 被设置为 confirm 模式,publisher 将不会收到已丢失消息的 ack(这是因为 consumer 没有对消息进行 ack ,同时该消息也未被写入磁盘)。

    限制

    1)最大 Delivery Tag

    传递标记是一个64位长的值,因此其最大值为9223372036854775807。由于Delivery Tag唯一标识每个通道的每次投递,所以,发送者或客户端在实践中不太可能超过此值。

    代码示例

    package com.xc.rabbitmq;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.Collections;
    import java.util.SortedSet;
    import java.util.TreeSet;
    
    /**
     * Created by xc.
     */
    public class PublisherConfirms {
    
        private static final String QUEUE_NAME = "publisher-confirms";
    
        private static final int MSG_COUNT = 10;
    
        private static ConnectionFactory factory;
    
        static {
            factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("rabbit");
            factory.setPassword("carrot");
        }
    
    
        public static void main(String[] args) throws Exception {
            // Publish MSG_COUNT messages and wait for confirms.
            (new Thread(new Consumer())).start();
            // Consume MSG_COUNT messages.
            (new Thread(new Publisher())).start();
    
        }
    
        static class Publisher implements Runnable {
    
            volatile SortedSet<Long> ackSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
    
             public void run() {
                 try {
                     long startTime = System.currentTimeMillis();
    
                     // 创建一个新的连接
                     Connection connection = factory.newConnection();
                     // 创建一个频道
                     Channel channel = connection.createChannel();
    
                     channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
                     channel.confirmSelect();
    
                     channel.addConfirmListener(new ConfirmListener() {
                         public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                             if (multiple) {
                                 for (long i = ackSet.first(); i <= deliveryTag; ++i) {
                                     System.out.println("handle ack multiple, tag : " + deliveryTag);
                                     ackSet.remove(i);
                                 }
                             } else {
                                 System.out.println("handle ack, tag : " + deliveryTag);
                                 ackSet.remove(deliveryTag);
                             }
                         }
    
                         public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                             System.out.println("handle nack, tag : " + deliveryTag);
                         }
                     });
    
                     // Publish
                     for (long i = 0; i < MSG_COUNT; ++i) {
                         ackSet.add(i);
                         channel.basicPublish("", QUEUE_NAME,
                                 MessageProperties.PERSISTENT_TEXT_PLAIN,
                                 "nop".getBytes());
                         System.out.println("send msg : " + "nop");
                     }
    
                     // Wait
                     while (ackSet.size() > 0)
                         Thread.sleep(10);
    
                     // Cleanup
                     channel.close();
                     connection.close();
    
                     long endTime = System.currentTimeMillis();
                     System.out.printf("Test took %.3fs\n", (float) (endTime - startTime) / 1000);
    
                 } catch (Throwable e) {
                     System.out.println("foobar :(");
                     e.printStackTrace();
                 }
             }
    
        }
    
        static class Consumer implements Runnable {
    
            public void run() {
                try {
                    // Setup
                    Connection conn = factory.newConnection();
                    Channel ch = conn.createChannel();
                    ch.queueDeclare(QUEUE_NAME, true, false, false, null);
    
                    // Consume
                    QueueingConsumer qc = new QueueingConsumer(ch);
                    ch.basicConsume(QUEUE_NAME, true, qc);
                    for (int i = 0; i < MSG_COUNT; ++i) {
                        QueueingConsumer.Delivery delivery = qc.nextDelivery();
                        System.out.println("got msg : " + new String(delivery.getBody()));
                    }
    
                    // Consume
                    ch.close();
                    conn.close();
                } catch (Throwable e) {
                    System.out.println("Whoosh!");
                    e.printStackTrace();
                }
            }
        }
    }
    
    

    相关文章

      网友评论

          本文标题:RabbitMQ(十)消费者应答和发送者确认

          本文链接:https://www.haomeiwen.com/subject/sprnuttx.html