美文网首页Kafka技术专栏
Kafka解惑之Old Producer(4)——Case An

Kafka解惑之Old Producer(4)——Case An

作者: 朱小厮 | 来源:发表于2018-02-01 22:07 被阅读11次

    上接:

    1. Kafka解惑之Old Producer(1)—— Beginning
    2. Kafka解惑之Old Producer(2)——Sync Analysis
    3. Kafka解惑之Old Producer(3)——Async Analysis

    在前面三篇文章中详细了解了Old Producer的内容,本文主要通过一个实际应用案例来加深各位对Old Producer的理解。

    问题描述:线上由多台Kafka Broker组成的集群(Producer的metadata.broker.list参数设置的是所有broker的地址+端口号的列表),版本号为0.8.2.x,当一台Kafka Broker的硬盘发生故障导致系统崩溃,由于Kafka的HA的作用线上业务无明显异常,发送方和消费方的流量也与之前持稳,但是后面监测到每隔10分钟左右就有少量的消息发送的时延很大,而且有WARN告警报出。

    2018-01-30 00:53:20 -[ERROR] - [fetching topic metadata for topics [Set(hidden-topic)] from broker [ArrayBuffer(id:0,host:xxx.xxx.xxx.xxx,port:9092)] failed] - [kafka.utils.Logging$$anonfun$swallowError$1:106]
    kafka.common.KafkaException: fetching topic metadata for topics [Set(hidden-topic)] from broker [ArrayBuffer(id:0,host:xxx.xxx.xxx.xxx,port:9092)] failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
        at kafka.utils.Utils$.swallow(Utils.scala:172)
        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
        at kafka.utils.Utils$.swallowError(Utils.scala:45)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
        at kafka.producer.Producer.send(Producer.scala:77)
        at kafka.javaapi.producer.Producer.send(Producer.scala:33)
    

    通过上面的异常栈我们可以发现在获取元数据的时候(kafka.client.ClientUtils$.fetchTopicMetadata)发生了异常,其实如果你对Kafka的配置参数足够了解的话,看到10分钟这个数值就可以联想到600*1000的某个参数,也就是topic.metadata.refresh.interval.ms。在DefaultEventHandler的handle()方法中,在调用dispatchSerializedData()方法预处理并发送消息之前就会有下面的一个if判断语言用来判断当前是否需要更新元数据:

    while (remainingRetries > 0 && outstandingProduceRequests.nonEmpty) {
      topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
      if (topicMetadataRefreshInterval >= 0 &&
          Time.SYSTEM.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
        CoreUtils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
        sendPartitionPerTopicCache.clear()
        topicMetadataToRefresh.clear
        lastTopicMetadataRefreshTime = Time.SYSTEM.milliseconds
      }
      outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
    

    上面代码中的topicMetadataRefreshInterval指的就是topic.metadata.refresh.interval.ms参数。如果topic.metadata.refresh.interval.ms这个参数设置为0,那么就意味着每次发送消息之前都需要拉取并更新元数据信息,更新元数据信息之后还要更新ProducerPool的内容,包括重建SyncProducer,那么这一番操作必然会有一定的延迟,当然这个延迟没有本文中所述问题中的延迟那么大。如果topic.metadata.refresh.interval.ms这个参数设置为负数,那么这个if条件语句就不能成立,也就是不会有定时更新元数据的操作,只有在获取元数据信息失败是才会请求完整的元数据信息。

    本文中所述问题中采用的topic.metadata.refresh.interval.ms参数设置的是默认值大小,那么问题中还有一个细节:只有少量消息发送超时。为了进一步的一探究竟,我们来继续深究。当定时更新元数据信息条件满足时,就会调用brokerPartitionInfo.updateInfo的方法,更进取之后,实际上内部调用的是:ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)来请求元数据信息,并获得TopicMetadataResponse来做更新。

    TopicMetadataResponse中包含有broker的id、host、port,还有topic、topic中的partition、partition所对应的leader、AR和ISR等等,有兴趣的同学可以进一步翻阅kafka.api.TopicMetadataResponse这个类,主体代码只有30行左右,只要学一点Scala的构造函数相关的只是就能看懂。

    我们回过头来再来进一步分析ClientUtils.fetchTopicMetadata这个方法,详细代码如下:

    def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndPoint], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
      var fetchMetaDataSucceeded: Boolean = false
      var i: Int = 0
      val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)
      var topicMetadataResponse: TopicMetadataResponse = null
      var t: Throwable = null
      // shuffle the list of brokers before sending metadata requests so that most requests don't get routed to the
      // same broker
      val shuffledBrokers = Random.shuffle(brokers)
      while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) {
        val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))
        info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics))
        try {
          topicMetadataResponse = producer.send(topicMetadataRequest)
          fetchMetaDataSucceeded = true
        }
        catch {
          case e: Throwable =>
            warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed"
              .format(correlationId, topics, shuffledBrokers(i).toString), e)
            t = e
        } finally {
          i = i + 1
          producer.close()
        }
      }
      if(!fetchMetaDataSucceeded) {
        throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, shuffledBrokers), t)
      } else {
        debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics))
      }
      topicMetadataResponse
    }
    

    fetchTopicMetadata方法参数列表中brokers代表metadata.broker.list所配置的地址列表。可以看到方法中首先建立TopicMetadataRequest的请求,然后从brokers中随机挑选(做一个shuffle,然后从列表中的第一个开始取,也就是相当于随机)一个broker建立SyncProducer并发送TopicMetadataRequest请求,问题的关键就在这个随机挑选一个broker之上,如果正好随机到的是那台磁盘损毁而崩溃的机器,那么这个请求必定要等到设定的超时时间之后才能捕获异常:[ERROR] - [fetching topic metadata for topics [Set(hidden-topic)] from broker [ArrayBuffer(id:0,host:xxx.xxx.xxx.xxx,port:9092)] failed] ,进而再找到下一个broker重新发送TopicMetadataRequest请求。

    上面提到了超时时间,这个超时时间是通过request.timeout.ms参数设定的,默认值为10000,也就是10s。具体指的是kafka.network.BlockingChannel中的channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)这段代码,参数connectTimeoutMs就是指request.timeout.ms。如果元数据的请求时打到那台崩溃的broker上的话,那么元数据的请求就要耗时10s以上,待元数据刷新后才能发送消息。这个request.timeout.ms参数才是导致文中开头有少量消息发送时延很大的原因。为了进一步验证结论是否正确,笔者将相关的类SyncProducer、BlockingChannel用Java重写了一遍,并测试出请求的耗时,当访问一个不存在的ip地址时,从发送请求到异常报出的耗时在10194ms。不过如果访问一个存在的ip地址时,但是没有kafka服务的话,从发送请求到返回的耗时只有1248ms,基本上减少了一个数量级,如果读者在遇到同样的问题时,不妨上线一台(随意一台能建立TCP连接的机器就好)与崩溃宕机的那台broker一样的ip地址的机器,上面无需运行kafka的服务,就能大大的降低发送消息的时延。这样可以流出更多的时间去定位、修复、重新上线那台崩溃的broker。

    当然如果对时延过敏,还有一些其它的方法可以参考。比如metadata.broker.list配置的是一个类似虚IP(VIP)的话,可以在VIP的下游剔除掉这台broker,让VIP过来的请求不会落到崩溃的broker上。或者也可以通过topic.metadata.refresh.interval.ms参数来调节,比如设置为负数就可以免去定时刷新元数据的烦恼,不过在元数据变动的时候很难有效的感知其变化,通过定时重连之类的方法又显得有点奇葩。当然直接上线一台服务器,安装运行kafka服务,且设置这台kafka服务器的ip为崩溃的那台服务器的ip地址,不过这番操作比直接拉一台空机器的时效性要低一些,凡事在于抉择。

    Old Producer拉取元数据信息以及发送消息是在同一个线程中的,这必然会引起局部消息的时延增大。不过在新版的KafkaProducer中,这些问题都已经迎刃而解,具体怎么处理,且看后面的文章分析。


    PS:消息中间件(Kafka、RabbitMQ)交流可加微信:hiddenzzh
    欢迎支持《RabbitMQ实战指南》以及关注微信公众号:朱小厮的博客。


    欢迎关注

    相关文章

      网友评论

        本文标题:Kafka解惑之Old Producer(4)——Case An

        本文链接:https://www.haomeiwen.com/subject/envjzxtx.html