follower从leader拉取消息进行同步数据,是由fetcher线程完成的,fetcher线程数是根据num.replica.fetchers
(默认为1)以及要连接的broker共同决定的。一个broker上会有多个topic和partition的follower,而这些partition的leader在不同的broker上,那么根据Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
这个公式,计算出fetcher的id。然而起的线程数是由需要连接的broker的ip端口和这个fetcherId共同决定的。也就是默认当前这个broker要连接其他broker上的leader进行数据同步,不同的broker只会起一个线程进行同步,而这个线程会同步这个broker上的所有不同topic和partition的数据。
源码分析
注:本文源码基于kafka-0.10.1.1
主要的代码来自于AbstractFetcherManager.addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, BrokerAndInitialOffset])
方法,这个方法的参数是一个map,key是需要同步的topic的partition,value是这个partition对应的leader所在的broker以及同步的起始offset。
def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, BrokerAndInitialOffset]) {
mapLock synchronized {
val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) =>
BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
var fetcherThread: AbstractFetcherThread = null
fetcherThreadMap.get(brokerAndFetcherId) match {
case Some(f) => fetcherThread = f
case None =>
fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
fetcherThread.start
}
fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (tp, brokerAndInitOffset) =>
tp -> brokerAndInitOffset.initOffset
})
}
}
info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) =>
"[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "}))
}
通过上面的代码可以发现,fetcher线程数就是fetcherThreadMap
这个map的key的数量。这个key是由leader所在的broker的地址以及FetcherId组成的,那么问题的关键就在于FetcherId是如何得到的。
private def getFetcherId(topic: String, partitionId: Int) : Int = {
Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
}
上面这段代码生成了FetcherId,默认numFetchers(num.replica.fetchers
)为1,那么这个FetcherId无论topic和partitionId的值是什么都是0,这种情况下fetcherThreadMap
中key的数量就是需要同步的leader所在broker的数量。
通过上面的分析,可以知道如果将num.replica.fetchers
改成2,FetcherId就有可能是0或者1,那么fetcherThreadMap
中key将是原来的2倍,以此类推。
下面是kafka官方对num.replica.fetchers
含义的描述,重点是这个fetcher线程数是针对某一个leader所在broker而言的。换句话说,如果这个follower broker需要连接的是10个broker,那么这个值配置为2,follower broke所需要启动的fetcher线程数就是2(每个需要连接的线程数为2)X10=20。
Number of fetcher threads used to replicate messages from a source broker. Increasing this value can increase the degree of I/O parallelism in the follower broker.
网友评论