美文网首页
RabbitMQ发布确认

RabbitMQ发布确认

作者: 小成都人 | 来源:发表于2022-08-12 18:35 被阅读0次

    消息持久化的两个前提:
    1、设置要求队列必须在持久化,保证RabbitMQ宕机后,信道不会消失
    2、设置要求队列中的消息必须持久话,保证RabbitMQ宕机后,消息不会消失

    设置两个值后,并不能完全保证消息不丢失,因为将消息保存到磁盘的过程中,服务器可能宕机,无法将消息完整地保留到磁盘上。所以,就需要发布确认

    发布确认:发布者将消息发送给消息队列,消息队列将消息保存到磁盘上,然后告诉给发布者,确认已经保存到磁盘中。这个过程就叫做发布确认

    发布确认的策略

    开启发布确认的方法

    // 发布确认 channel.confirmSelect();
    

    单个确认发布

    这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它
    被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认
    的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
    这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会
    阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某
    些应用程序来说这可能已经足够了。

    ConfirmMessage.class

    // 批量发消息的个数
        public static final int MESSAGE_COUNT=1000;
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
           // 单个确认
            publishMessageIndividually();
        }
    
        public static void publishMessageIndividually() throws IOException, TimeoutException, InterruptedException {
            Channel channel = RabbitMqUitls.rabbitMqConnection();
            // 队列声明
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName,true,false, false, null);
            // 开启发布确认
            channel.confirmSelect();
            // 开始时间
            long startTime = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = i + "";
                channel.basicPublish("",queueName,null,message.getBytes());
                // 单个消息马上进行发布确认, true 为成功, false为失败
                boolean flag = channel.waitForConfirms();
                if (flag) {
                    System.out.println("消息发送成功!");
                }
            }
            long endTime = System.currentTimeMillis();
            System.out.println("发布"+MESSAGE_COUNT+"条,单个发送确认所耗时间:" + (endTime-startTime) + "ms");
        }
    

    耗时:


    单个发布确认.png

    RabbitMQ控制台:


    单个发布确认RabbitMQ控制台.png

    批量确认发布

    与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

    在代码中,批量发布相对于单个发布,发布确认channel.waitForConfirms()的位置有所不同,需要在生产者发送批量确认数量时,进行发布确认。当然,这是最简单的示例。代码如下:

    public static void publishMessageBatch() throws Exception {
            Channel channel = RabbitMqUitls.rabbitMqConnection();
            // 队列声明
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName,true,false, false, null);
            // 开启发布确认
            channel.confirmSelect();
            // 开始时间
            long startTime = System.currentTimeMillis();
    
            // 定义批量确认数量100, 发送100条确认一次
            int batchCount = 100;
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = i + "";
                channel.basicPublish("",queueName,null,message.getBytes());
                if (i%batchCount==0) {
                    channel.waitForConfirms();
                    System.out.println(i + "条消息发送确认成功");
                }
            }
            long endTime = System.currentTimeMillis();
            System.out.println("发布"+MESSAGE_COUNT+"条,单个发送确认所耗时间:" + (endTime-startTime) + "ms");
        }
    

    批量确认发布所耗时间:


    批量发布确认.png

    异步确认发布

    是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,

     public static void publishMessageAsync() throws Exception {
            Channel channel = RabbitMqUitls.rabbitMqConnection();
            // 队列声明
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName,true,false, false, null);
            // 开启发布确认
            channel.confirmSelect();
            /**
             * 线程安全有序的一个哈希表,适用于高并发的情况
             * 1.轻松的将序号与消息进行关联
             * 2.轻松批量删除条目 只要给到序列号
             * 3.支持并发访问
             */
            ConcurrentSkipListMap<Long, String> concurrentSkipListMap = new  ConcurrentSkipListMap<>();
    
            /**
             * 确认收到消息的一个回调
             * 1.消息序列号
             * 2.true 可以确认小于等于当前序列号的消息
             * false 确认当前序列号消息
             */
            // 消息成功
            ConfirmCallback ackCallBack = (deliveryTag, multiple) -> {
                System.out.println("确认的消息内容"+concurrentSkipListMap.get(deliveryTag) + "========确认的消息序号:"+ deliveryTag);
                if (multiple) {
                    ConcurrentNavigableMap<Long, String> concurrentNavigableMap =
                            concurrentSkipListMap.headMap(deliveryTag);
                    concurrentNavigableMap.clear();
                } else {
                    concurrentSkipListMap.clear();
                }
            };
            // 消息失败
            ConfirmCallback nackCallBack = (deliveryTag, multiple) -> {
                String s = concurrentSkipListMap.get(deliveryTag);
                System.out.println("未确认的消息内容"+s + "========未确认的消息序号:"+ deliveryTag);
            };
    
            channel.addConfirmListener(ackCallBack, nackCallBack);
            // 开始时间
            long startTime = System.currentTimeMillis();
    
            // 发送消息
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "消息" + i ;
                channel.basicPublish("",queueName,null,message.getBytes());
                // 记录发送成功的消息
                concurrentSkipListMap.put(channel.getNextPublishSeqNo(), message);
            }
            long endTime = System.currentTimeMillis();
            System.out.println("发布"+MESSAGE_COUNT+"条,异步发送确认所耗时间:" + (endTime-startTime) + "ms");
        }
    

    异步处理耗时:


    异步发布确认.png

    注意点: 异步发布确认需要创建一个ConcurrentLinkedQueue(异步链接队列)队列--ConcurrentSkipListMap,用于记录发送成功的消息,然后将发送成功的消息清空,剩下的就是未成功的消息,返回给生产者。

    ps:观点只作为个人理解,如有错误,请指正。

    相关文章

      网友评论

          本文标题:RabbitMQ发布确认

          本文链接:https://www.haomeiwen.com/subject/gnbqgrtx.html