美文网首页数客联盟
详解num.replica.fetchers

详解num.replica.fetchers

作者: Woople | 来源:发表于2018-12-01 20:41 被阅读34次

    follower从leader拉取消息进行同步数据,是由fetcher线程完成的,fetcher线程数是根据num.replica.fetchers(默认为1)以及要连接的broker共同决定的。一个broker上会有多个topic和partition的follower,而这些partition的leader在不同的broker上,那么根据Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers这个公式,计算出fetcher的id。然而起的线程数是由需要连接的broker的ip端口和这个fetcherId共同决定的。也就是默认当前这个broker要连接其他broker上的leader进行数据同步,不同的broker只会起一个线程进行同步,而这个线程会同步这个broker上的所有不同topic和partition的数据。

    源码分析

    注:本文源码基于kafka-0.10.1.1

    主要的代码来自于AbstractFetcherManager.addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, BrokerAndInitialOffset])方法,这个方法的参数是一个map,key是需要同步的topic的partition,value是这个partition对应的leader所在的broker以及同步的起始offset。

    def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, BrokerAndInitialOffset]) {
      mapLock synchronized {
        val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) =>
          BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
        for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
          var fetcherThread: AbstractFetcherThread = null
          fetcherThreadMap.get(brokerAndFetcherId) match {
            case Some(f) => fetcherThread = f
            case None =>
              fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
              fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
              fetcherThread.start
          }
    
          fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (tp, brokerAndInitOffset) =>
            tp -> brokerAndInitOffset.initOffset
          })
        }
      }
    
      info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) =>
        "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "}))
    }
    

    通过上面的代码可以发现,fetcher线程数就是fetcherThreadMap这个map的key的数量。这个key是由leader所在的broker的地址以及FetcherId组成的,那么问题的关键就在于FetcherId是如何得到的。

    private def getFetcherId(topic: String, partitionId: Int) : Int = {
      Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
    }
    

    上面这段代码生成了FetcherId,默认numFetchers(num.replica.fetchers)为1,那么这个FetcherId无论topic和partitionId的值是什么都是0,这种情况下fetcherThreadMap中key的数量就是需要同步的leader所在broker的数量。

    通过上面的分析,可以知道如果将num.replica.fetchers改成2,FetcherId就有可能是0或者1,那么fetcherThreadMap中key将是原来的2倍,以此类推。

    下面是kafka官方对num.replica.fetchers含义的描述,重点是这个fetcher线程数是针对某一个leader所在broker而言的。换句话说,如果这个follower broker需要连接的是10个broker,那么这个值配置为2,follower broke所需要启动的fetcher线程数就是2(每个需要连接的线程数为2)X10=20

    Number of fetcher threads used to replicate messages from a source broker. Increasing this value can increase the degree of I/O parallelism in the follower broker.

    相关文章

      网友评论

        本文标题:详解num.replica.fetchers

        本文链接:https://www.haomeiwen.com/subject/ernlcqtx.html