美文网首页数客联盟
详解num.recovery.threads.per.data.

详解num.recovery.threads.per.data.

作者: Woople | 来源:发表于2018-12-01 11:10 被阅读23次

kafka broker启动的过程中会加载此节点上所有topic的log文件,如果数据量非常大会导致加载时间过长,通过修改num.recovery.threads.per.data.dir可以加快log的恢复速度。默认num.recovery.threads.per.data.dir是1。

源码分析

注:本文源码基于kafka-0.10.1.1

加载log调用的是LogManager.loadLogs()方法,部分代码如下

val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
    val jobs = mutable.Map.empty[File, Seq[Future[_]]]

    for (dir <- this.logDirs) {
      val pool = Executors.newFixedThreadPool(ioThreads)

可以看到会为每个log文件夹创建一个线程池,线程数由变量ioThreads指定,而这个值就是num.recovery.threads.per.data.dir。参考下面KafkaServer.createLogManager方法片段。

new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
               topicConfigs = configs,
               defaultConfig = defaultLogConfig,
               cleanerConfig = cleanerConfig,
               ioThreads = config.numRecoveryThreadsPerDataDir,
               flushCheckMs = config.logFlushSchedulerIntervalMs,
               flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
               retentionCheckMs = config.logCleanupIntervalMs,
               scheduler = kafkaScheduler,
               brokerState = brokerState,
               time = time)

相关文章

网友评论

    本文标题:详解num.recovery.threads.per.data.

    本文链接:https://www.haomeiwen.com/subject/pcmlcqtx.html