美文网首页
RabbitMQ-消息确认机制(事物+confirm)

RabbitMQ-消息确认机制(事物+confirm)

作者: jiahzhon | 来源:发表于2020-07-20 19:08 被阅读0次

    在rabbitmq中,我们可以通过持久化数据,解决rabbitmq的服务器异常 的数据丢失问题。问题:生产者将消息发送出去后,是否到达rabbitmq服务器?默认的情况下是不知道的。

    • 两种方式解决:
    1. AMQP 实现了事物机制
    2. confirm 模式
    • AMQP 事物机制
      • txSelect 用户将当前channel设置成transation模式
      • txCommit 用于提交事物
      • txRollback 回滚事物
    image.png

    这种模式比较耗时,降低了rabbitmq的吞吐量

    • confirm模式
      • 生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理;
      • confirm模式最大的好处是 异步
      • rabbitmq如果服务器异常或者崩溃,就会发送一个nack消息
      • 开启confirm模式 `channel.confirmSelect();
      • 编程模式
        • 普通 发一条 waitForConfirm()
        • 批量 发一批 waitForConfirms()
        • 异步confirm模式 提供一个回调方法
        • confirm单条
        • image.png
        • confirm多条
    public class SendMany {
        private static final String QUEUE_NAME = "confirm_test_1";
    
        public static void main(String[] args) throws IOException, InterruptedException {
            Connection connections = ConnectionUtils.getConnections();
            Channel channel = connections.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 生产者调用confirmSelect 将channel设置为confirm模式
            channel.confirmSelect();
            String msg = "hello  confirm  msg";
            System.out.println("send--confirm---" + msg);
            for (int i = 0; i < 10; i++) {
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            }
            if (!channel.waitForConfirms()) {
                System.out.println("massage send failed");
            } else {
                System.out.println("massage send ok");
            }
            connections.close();
        }
    }
    
    • 异步confirm模式
      • Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms()方法也是通过SortedSet维护消息序号的。
    public class Send3 {
        //下面代码并没有处理handleAck和handleNack
        private static final String QUEUE_NAME = "confirm_test_1";
    
        public static void main(String[] args) throws IOException, InterruptedException {
            Connection connections = ConnectionUtils.getConnections();
            Channel channel = connections.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 生产者调用confirmSelect 将channel设置为confirm模式
            channel.confirmSelect();
            // 存放未确认的消息标识
            final SortedSet<Long> confirmSet = Collections.synchronizedNavigableSet(new TreeSet<Long>());
            // 添加通道监听
            channel.addConfirmListener(new ConfirmListener() {
                public void handleAck(long deliverTag, boolean mutiple) throws IOException {
                    if (mutiple) {
                        System.out.println("---handleAck-------mutiple----");
                        confirmSet.headSet(deliverTag + 1).clear();
                    } else {
                        System.out.println("---handleAck-------mutiple---false");
                        confirmSet.remove(deliverTag);
                    }
                }
    
                public void handleNack(long deliverTag, boolean mutiple) throws IOException {
                    if (mutiple) {
                        System.out.println("---handleNack-------mutiple----");
                        confirmSet.headSet(deliverTag + 1).clear();
                    } else {
                        System.out.println("---handleNack-------mutiple---false");
                        confirmSet.remove(deliverTag);
                    }
                }
    
            });
    
            String msg = "hello confirm msg";
            while (true) {
                long seqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                confirmSet.add(seqNo);
            }
        }
    }
    
    

    相关文章

      网友评论

          本文标题:RabbitMQ-消息确认机制(事物+confirm)

          本文链接:https://www.haomeiwen.com/subject/okgykktx.html