美文网首页收藏
RocketMQ生产者

RocketMQ生产者

作者: DH大黄 | 来源:发表于2021-12-10 23:53 被阅读0次

书接上文,今天我们来说说RocketMQ生产者发送消息

生产者发送消息

生产者发送消息逻辑.png

生产者发送消息的主要流程图如上图所示。具体的代码由于比较多,我就不在这边贴出来的。

主要讲一下我认为比较重要的点

消息队列负载均衡

Producer会每隔30s从Namesrv获取最新的Topic路由信息,并缓存到本地

路由信息就是用来发送时选择具体的Broker和队列的

关于异步消息

异步消息需要我们单独加一个回调方法,添加在发送消息成功/失败的一些处理。

因为异步消息没有对Broker回来的结果进行额外的处理,那么自然我们就不能像同步消息一样,对Broker返回回来的结果单独针对SendResult进行单独的重试操作。所以需要我们在失败的回调方法上进行额外的处理

具体的原因,我们可以看下消息发送的主干逻辑

消息发送的主干逻辑

另外我在这里贴一下消息发送的主干代码(具体的代码和它们的注释可以看下我的github 我是一个超链接

// ... 省略部分代码
// 查找主题路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
    // ... 省略部分代码
    for (; times < timesTotal; times++) {
      // 上次发送的broker名称 
      String lastBrokerName = null == mq ? null : mq.getBrokerName();
      // 选择一条messagequeue(轮询+失败规避)
      MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
      if (mqSelected != null) {
        // ... 省略部分代码
        try {
          // ... 省略部分代码
          sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
          endTimestamp = System.currentTimeMillis();
          // 失败规避
          this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
          switch (communicationMode) {
            case ASYNC:
              return null;
            case ONEWAY:
              return null;
            case SYNC:
              if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                // 发送失败重试
                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                  continue;
                }
              }

              return sendResult;
            default:
              break;
          }
        } catch (/** 一系列异常处理 **/) {
          
        }
      }
    }
}

失败规避

我在上面反复说失败规避,到底什么是失败规避呢?

在我们一次消息发送过程中,消息有可能发送失败。在消息发送失败,重试时选择发送消息的队列时,就会规避上次MessageQueue所在的Broker。这样能够减少很多不必要的请求(因为Broker宕机后,很大情况下这个Broker短时间内依旧是无法使用的)

那么,为什么会有宕机的Broker在我们的内存中存在?

因为NameSrv是根据心跳检测来确定Broker是否可用的(有间隔 10s),且消息生产者更新路由信息也是有间隔的(30s)。且为了Namesrv设计的简单,Namesrv不会主动将Broker宕机的信息推给消息生产者,而是需要消息生产者定时更新的时候,才会感知到Broker宕机。

在这期间存在误差,所以我们是要一个机智(失败规避策略)来减少一些不必要的性能消耗

相关文章

网友评论

    本文标题:RocketMQ生产者

    本文链接:https://www.haomeiwen.com/subject/ndnhfrtx.html