美文网首页玩转大数据收藏Java
Kafka broker 存储不均衡问题排查记录

Kafka broker 存储不均衡问题排查记录

作者: AlienPaul | 来源:发表于2022-02-07 18:17 被阅读0次

    背景

    Kafka原本集群磁盘扩容后,新建topic的业务量较大,存在数据积压。一段时间后收到集群节点报警,部分磁盘使用率到达90%警戒值。经过检查发现问题节点各磁盘使用率不均衡。

    问题排查

    首先需要确认kafka的磁盘均衡策略。我们找到LogManagergetOrCreateLog方法。如果log不存在或者isNew为true,为特定的topic和partition创建一个新的log。代码如下所示:

    def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, isFuture: Boolean = false, topicId: Option[Uuid]): UnifiedLog = {
        // 创建或者删除log的时候加锁
        logCreationOrDeletionLock synchronized {
            // 从futureLogs(isFuture为true)或者currentLogs中查找topicPartition对应的log
            val log = getLog(topicPartition, isFuture).getOrElse {
                // 如果没找到,执行这个block
                // 这段代码不可能被并发执行
                // create the log if it has not already been created in another thread
                if (!isNew && offlineLogDirs.nonEmpty)
                throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline")
    
                val logDirs: List[File] = {
                    val preferredLogDir = preferredLogDirs.get(topicPartition)
    
                    if (isFuture) {
                        if (preferredLogDir == null)
                        throw new IllegalStateException(s"Can not create the future log for $topicPartition without having a preferred log directory")
                        else if (getLog(topicPartition).get.parentDir == preferredLogDir)
                        throw new IllegalStateException(s"Can not create the future log for $topicPartition in the current log directory of this partition")
                    }
    
                    if (preferredLogDir != null)
                    List(new File(preferredLogDir))
                    else
                    // 进入这个方法,寻找存放该log的目录
                    nextLogDirs()
                }
    
                val logDirName = {
                    if (isFuture)
                    UnifiedLog.logFutureDirName(topicPartition)
                    else
                    UnifiedLog.logDirName(topicPartition)
                }
    
                // 逐个创建log目录
                val logDir = logDirs
                .iterator // to prevent actually mapping the whole list, lazy map
                .map(createLogDirectory(_, logDirName))
                .find(_.isSuccess)
                .getOrElse(Failure(new KafkaStorageException("No log directories available. Tried " + logDirs.map(_.getAbsolutePath).mkString(", "))))
                .get // If Failure, will throw
    
                // 获取log配置
                val config = fetchLogConfig(topicPartition.topic)
                // 创建UnifiedLog对象
                val log = UnifiedLog(
                    dir = logDir,
                    config = config,
                    logStartOffset = 0L,
                    recoveryPoint = 0L,
                    maxProducerIdExpirationMs = maxPidExpirationMs,
                    producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
                    scheduler = scheduler,
                    time = time,
                    brokerTopicStats = brokerTopicStats,
                    logDirFailureChannel = logDirFailureChannel,
                    topicId = topicId,
                    keepPartitionMetadataFile = keepPartitionMetadataFile)
    
                // 加入到log列表中
                if (isFuture)
                futureLogs.put(topicPartition, log)
                else
                currentLogs.put(topicPartition, log)
    
                info(s"Created log for partition $topicPartition in $logDir with properties ${config.overriddenConfigsAsLoggableString}")
                // Remove the preferred log dir since it has already been satisfied
                preferredLogDirs.remove(topicPartition)
    
                log
            }
            // When running a ZK controller, we may get a log that does not have a topic ID. Assign it here.
            if (log.topicId.isEmpty) {
                topicId.foreach(log.assignTopicId)
            }
    
            // Ensure topic IDs are consistent
            topicId.foreach { topicId =>
                log.topicId.foreach { logTopicId =>
                    if (topicId != logTopicId)
                    throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," +
                                                           s"but log already contained topic ID $logTopicId")
                }
            }
            log
        }
    }
    

    上面方法中,我们关心的地方是寻找存放log目录的重要逻辑。它位于nextLogDirs方法。该方法为接下来的partition提供建议的存储目录,按照建议顺序排序。目前的处理方式为计算各个目录中存放的分区数,从小到大排序。

    private def nextLogDirs(): List[File] = {
        if(_liveLogDirs.size == 1) {
            // 如果可用log目只有1个,返回这个目录
            List(_liveLogDirs.peek())
        } else {
            // count the number of logs in each parent directory (including 0 for empty directories
            // 统计所有目录以及目录中的log数量
            val logCounts = allLogs.groupBy(_.parentDir).map { case (parent, logs) => parent -> logs.size }
            val zeros = _liveLogDirs.asScala.map(dir => (dir.getPath, 0)).toMap
            val dirCounts = (zeros ++ logCounts).toBuffer
    
            // choose the directory with the least logs in it
            // 按照各个目录的log size从小到大排序,创建File后返回
            dirCounts.sortBy(_._2).map {
                case (path: String, _: Int) => new File(path)
            }.toList
        }
    }
    

    综上可知,Kafka在broker级别的存储策略为寻找log最少的目录优先存储。所以说Kafka是文件级别的均衡而不是存储容量的均衡。结合背景中的案例,新扩容的磁盘肯定是log数量最少的,因此新创建的topic一定会优先存储在这个磁盘上。在这些topic数据量较多的时候,扩容的磁盘使用率首先到达告警值。

    解决方法

    转移topic

    对于已经位于问题broker上面的topic,建议转移partition。

    首先查看topic分布在哪些broker上:

    bin/kafka-topics.sh --zookeeper xxx.xxx.xxx.xxx:2181 --describe --topic topic_name
    

    确定需要转移的topic。例如我们需要转移的topic为topicA和topicB,按照如下格式编写json文件:

    {
        "topics": [
            {"topic": "topicA"},
            {"topic": "topicB"}
        ],
        "version":1
    }
    

    保存为topics-to-move.json文件。

    然后使用如下命令,Kafka会自动生成各个partition需要转移到的目标broker,partition的均衡性自动保证。

    bin/kafka-reassign-partitions.sh --zookeeper zk:2181 --topics-to-move-json-file topics-to-move.json --broker-list "3,4,5" --generate 
    

    输出可能如下所示:

    Current partition replica assignment
    {"version":1,"partitions":[{"topic":"topicA","partition":2,"replicas":[0,2]},
    {"topic":"topicA","partition":1,"replicas":[1,0]},
    {"topic":"topicB","partition":1,"replicas":[0,1]},
    {"topic":"topicA","partition":0,"replicas":[2,1]},
    {"topic":"topicB","partition":0,"replicas":[2,0]},
    {"topic":"topicB","partition":2,"replicas":[1,2]}]}
    
    Proposed partition reassignment configuration
    
    {"version":1,"partitions":[{"topic":"topicA","partition":2,"replicas":[3,4]},
    {"topic":"topicA","partition":1,"replicas":[5,3]},
    {"topic":"topicB","partition":1,"replicas":[3,4]},
    {"topic":"topicA","partition":0,"replicas":[4,5]},
    {"topic":"topicB","partition":0,"replicas":[5,3]},
    {"topic":"topicB","partition":2,"replicas":[4,5]}]}
    

    我们再编写一个json文件,例如move-to-brokers.json,将Proposed partition reassignment configuration下面的内容填入该文件。最后执行:

    bin/kafka-reassign-partitions.sh --zookeeper zk:2181 --reassignment-json-file move-to-brokers.json --execute
    

    开始执行topic转移任务。可以使用:

    bin/kafka-reassign-partitions.sh --zookeeper zk:2181 --reassignment-json-file move-to-brokers.json --verify
    

    命令查看转移操作执行状态。

    减少log保存时间

    观察并调整如下参数:

    • log.retention.bytes:保留日志大小,单位字节
    • log.retention.hours:保留日志时间,单位小时
    • log.retention.minutes:保留日志时间,单位分钟
    • log.retention.ms:保留日志时间,单位毫秒
    • log.retention.check.interval.ms:多久检测一次log是否需要删除

    注意:上面的限制条件如果同时配置均有效,无论哪个首先满足都会触发。

    删除无用topic

    查找集群中已存在的topic,删除掉不再使用的。此步骤相当于手动均衡各个磁盘下的partition分布。删除topic的命令如下:

    ./bin/kafka-topics --delete --zookeeper zk:2181 --topic topic_name
    

    相关文章

      网友评论

        本文标题:Kafka broker 存储不均衡问题排查记录

        本文链接:https://www.haomeiwen.com/subject/jfomkrtx.html