美文网首页程序猿之路分布式各组件
rabbitmq 两种模式下(事务/发布确认)性能对比

rabbitmq 两种模式下(事务/发布确认)性能对比

作者: 0爱上1 | 来源:发表于2018-09-05 17:43 被阅读568次

如何保证消息成功发送?

在实际场景下,有的生产者发送的消息是必须保证成功发送到消息队列中,那么如何保证成功投递呢?

  • 事务方式

  • 发布确认

何为事务方式发送

事务方式:amqp协议提供的一种保证消息成功投递的方式
通过将信道开启 transactional 模式
并利用信道 Channel 的三个方式来实现以事务方式发送消息,若发送失败,通过异常处理回滚事务,确保消息成功投递

  • channel.txSelect(): 开启事务
  • channel.txCommit() :提交事务
  • channel.txRollback() :回滚事务

事务方式流程分析

  1. 代码实现

     @Test
     public void testTransactionalMode() {
     // 开启事务模式,模拟发送一万条消息,记录总耗时
     // 获取连接工厂
     ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();
    
     // 开启连接 - tcp连接
     Connection connection = connectionFactory.createConnection();
    
     // 建立信道 构造参数 true代表该信道开启 Transactional 事务模式
     // 此处传入true,就不需要再显示编码 channel.txSelect()了
     // 内部已经调用了channle.txSelect();
     Channel channel = connection.createChannel(true);
    
     // 准备发送一万条测试消息, 每条消息都会开启一个新的事务。
     long start = System.currentTimeMillis();
     for (int i = 0; i <= 10000; i++) {
         try {
             // channel.txSelect();
             channel.basicPublish("x-hello", "test", true, MessageProperties.PERSISTENT_BASIC, ("第" + (i + 1) + "条消息").getBytes());
    
             channel.txCommit();
         } catch (Exception e) {
             // 发生异常,说明消息没有到达broker的queue中,回滚。
             try {
                 log.error("提交事务失败,事务回滚 i = " + i);
                 channel.txRollback();
             } catch (IOException e1) {
                 log.error("mq broker error...");
             }
             log.error("mq broker error...");
         }
     }
     System.out.println("事务方式单消息单事务提交下,10000条消息发送共耗时: " + (System.currentTimeMillis() - start) + "ms");
    

    }

  2. 抓包分析


    消息投递-事务方式
  3. 流程分析

  • 客户端(生产者)打开连接(tcp)
  • 客户端(生产者)打开信道,模式选择为事务模式
  • mq服务器返回信道开启成功确认(对应上图70)
  • 客户端(生产者)发送给mq服务器Tx.Select消息,告诉服务器开启事务(对应上图66)
  • mq服务器返回Tx.Select-Ok 告知客户端事务模式已开启
  • 客户端(生产者)开始推送消息
  • 客户端(生产者)发送Tx.Commit 消息提交事务
  • mq服务器返回Tx.Commit-Ok 告知事务提交成功

以上步骤中有关事务的地方发生错误,都会抛出IOException,只要在代码中捕获异常进行事务回滚即可,或者自行决定是否需要重发消息。

何为发布确认方式发送

既然有了事务方式保证消息的投递,那么为何还需要发布确认方式呢?答案是:性能

发布确认模仿了协议中已经存在的消费者ACK确认机制,生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出

后续会放出事务模式和发送确认模式的实测性能对比

rabbitmq的发送确认方式,也是通过信道开启的

// 开启信道为确认模式
channel.confirmSelect();

而确认模式又分为同步等待mq服务器确认和异步等待确认两种。

  • 同步等待 channel.waitForConfirms()

          @Test
          public void testAsynMode() {
              // 获取连接工厂
              ConnectionFactory connectionFactory =         
              rabbitTemplate.getConnectionFactory();
    
              // 开启连接 - tcp连接
              Connection connection = connectionFactory.createConnection();
    
              // 建立信道 构造参数 true代表该信道开启 Transactional 事务模式, false 代表为非事务模式
              Channel channel = connection.createChannel(false);
    
              long start = System.currentTimeMillis();
              for (int i = 0; i <= 10000; i++) {
                  try {
                      // 开启发布确认模式
                      channel.confirmSelect();
    
                      channel.basicPublish("x-hello", "test", true, MessageProperties.PERSISTENT_BASIC, ("第" + (i + 1) + "条消息").getBytes());
    
                      // 阻塞方法,直到mq服务器确认消息
                      if (channel.waitForConfirms()) {
                          log.info("消息发送成功");
                      }
                  } catch (Exception e) {
                      // 发生异常,说明消息没有到达broker的queue中,回滚。
                      log.error("mq broker error...");
                  }
              }
              System.out.println("发送确认 - 同步确认提交下,10000条消息发送共耗时: " + (System.currentTimeMillis() - start) + "ms");
    

    }

  • 异步监听等待 channel.addConfirmListener(listener)

      // 开启confirm模式, 模拟发送一千条消息,记录总耗时
      rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
          String correlationId = message.getMessageProperties().getCorrelationIdString();
          log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
      });
    
      // 获取连接工厂
      rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
          System.out.println("=====消息确认回调了======");
          if (ack) {
              System.out.println("消息id为: " + correlationData + "的消息,已经被ack成功");
          } else {
              System.out.println("消息id为: " + correlationData + "的消息,消息nack,失败原因是:" + cause);
          }
      });
    
      // 开启连接 - tcp连接
      // 准备发送一万条测试消息
      long start = System.currentTimeMillis();
      for (int i = 0; i < 1000; i++) {
          rabbitTemplate.convertAndSend("x-hello", "test", ("第" + (i + 1) + "条消息").getBytes(), new CorrelationData(String.valueOf(i)));
      }
      System.out.println("消息确认 - 异步确认,1000条消息发送共耗时: " + (System.currentTimeMillis() - start) + "ms");
    

配置文件

application.properties

当mandatory标志设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃

  • confirm 主要是用来判断消息是否有正确到达交换机,如果有,那么就 ack 就返回 true;如果没有,则是 false。

  • return 则表示如果你的消息已经正确到达交换机,但是后续处理出错了,那么就会回调 return,并且把信息送回给你(前提是需要设置了 Mandatory,不设置那么就丢弃);如果消息没有到达交换机,那么不会调用 return 的东西。

性能对比

  • 事务模式

1000条消息发送耗时:18733ms

事务模式下
  • 异步发送确认方式下

1000条信息耗时:387ms

异步发送确认方式

消息流转图

消息流转图

如何保证消息被成功消费

为了保证消息被可靠消费,即消息从队列中可靠的发送给消费者,需要有一定的机制保证。

RabbitMQ 提供了消息消费确认机制,即当消息到达消费者,消费者执行消费代码后,需要告知mq,该消息已经被成功消费。这就是消费确认机制

消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它

手动确认消息消费配置

spring.rabbitmq.listener.simple.acknowledge-mode=manual

总结

事务方式发布消息,性能太差,往往不采用事务的方式发布消息,建议采用异步发送确认的方式

遗留问题

  • 如果在mq服务器异步通知过程中,由于网络原因或者mq正好准备回调就挂了,导致发布者没有收到确认发送的消息怎么办?
  • mq 迟迟未收到consumer的ack怎么处理?

相关文章

网友评论

    本文标题:rabbitmq 两种模式下(事务/发布确认)性能对比

    本文链接:https://www.haomeiwen.com/subject/funowftx.html