背景
Kafka原本集群磁盘扩容后,新建topic的业务量较大,存在数据积压。一段时间后收到集群节点报警,部分磁盘使用率到达90%警戒值。经过检查发现问题节点各磁盘使用率不均衡。
问题排查
首先需要确认kafka的磁盘均衡策略。我们找到LogManager
的getOrCreateLog
方法。如果log不存在或者isNew为true,为特定的topic和partition创建一个新的log。代码如下所示:
def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, isFuture: Boolean = false, topicId: Option[Uuid]): UnifiedLog = {
// 创建或者删除log的时候加锁
logCreationOrDeletionLock synchronized {
// 从futureLogs(isFuture为true)或者currentLogs中查找topicPartition对应的log
val log = getLog(topicPartition, isFuture).getOrElse {
// 如果没找到,执行这个block
// 这段代码不可能被并发执行
// create the log if it has not already been created in another thread
if (!isNew && offlineLogDirs.nonEmpty)
throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline")
val logDirs: List[File] = {
val preferredLogDir = preferredLogDirs.get(topicPartition)
if (isFuture) {
if (preferredLogDir == null)
throw new IllegalStateException(s"Can not create the future log for $topicPartition without having a preferred log directory")
else if (getLog(topicPartition).get.parentDir == preferredLogDir)
throw new IllegalStateException(s"Can not create the future log for $topicPartition in the current log directory of this partition")
}
if (preferredLogDir != null)
List(new File(preferredLogDir))
else
// 进入这个方法,寻找存放该log的目录
nextLogDirs()
}
val logDirName = {
if (isFuture)
UnifiedLog.logFutureDirName(topicPartition)
else
UnifiedLog.logDirName(topicPartition)
}
// 逐个创建log目录
val logDir = logDirs
.iterator // to prevent actually mapping the whole list, lazy map
.map(createLogDirectory(_, logDirName))
.find(_.isSuccess)
.getOrElse(Failure(new KafkaStorageException("No log directories available. Tried " + logDirs.map(_.getAbsolutePath).mkString(", "))))
.get // If Failure, will throw
// 获取log配置
val config = fetchLogConfig(topicPartition.topic)
// 创建UnifiedLog对象
val log = UnifiedLog(
dir = logDir,
config = config,
logStartOffset = 0L,
recoveryPoint = 0L,
maxProducerIdExpirationMs = maxPidExpirationMs,
producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
scheduler = scheduler,
time = time,
brokerTopicStats = brokerTopicStats,
logDirFailureChannel = logDirFailureChannel,
topicId = topicId,
keepPartitionMetadataFile = keepPartitionMetadataFile)
// 加入到log列表中
if (isFuture)
futureLogs.put(topicPartition, log)
else
currentLogs.put(topicPartition, log)
info(s"Created log for partition $topicPartition in $logDir with properties ${config.overriddenConfigsAsLoggableString}")
// Remove the preferred log dir since it has already been satisfied
preferredLogDirs.remove(topicPartition)
log
}
// When running a ZK controller, we may get a log that does not have a topic ID. Assign it here.
if (log.topicId.isEmpty) {
topicId.foreach(log.assignTopicId)
}
// Ensure topic IDs are consistent
topicId.foreach { topicId =>
log.topicId.foreach { logTopicId =>
if (topicId != logTopicId)
throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," +
s"but log already contained topic ID $logTopicId")
}
}
log
}
}
上面方法中,我们关心的地方是寻找存放log目录的重要逻辑。它位于nextLogDirs
方法。该方法为接下来的partition提供建议的存储目录,按照建议顺序排序。目前的处理方式为计算各个目录中存放的分区数,从小到大排序。
private def nextLogDirs(): List[File] = {
if(_liveLogDirs.size == 1) {
// 如果可用log目只有1个,返回这个目录
List(_liveLogDirs.peek())
} else {
// count the number of logs in each parent directory (including 0 for empty directories
// 统计所有目录以及目录中的log数量
val logCounts = allLogs.groupBy(_.parentDir).map { case (parent, logs) => parent -> logs.size }
val zeros = _liveLogDirs.asScala.map(dir => (dir.getPath, 0)).toMap
val dirCounts = (zeros ++ logCounts).toBuffer
// choose the directory with the least logs in it
// 按照各个目录的log size从小到大排序,创建File后返回
dirCounts.sortBy(_._2).map {
case (path: String, _: Int) => new File(path)
}.toList
}
}
综上可知,Kafka在broker级别的存储策略为寻找log最少的目录优先存储。所以说Kafka是文件级别的均衡而不是存储容量的均衡。结合背景中的案例,新扩容的磁盘肯定是log数量最少的,因此新创建的topic一定会优先存储在这个磁盘上。在这些topic数据量较多的时候,扩容的磁盘使用率首先到达告警值。
解决方法
转移topic
对于已经位于问题broker上面的topic,建议转移partition。
首先查看topic分布在哪些broker上:
bin/kafka-topics.sh --zookeeper xxx.xxx.xxx.xxx:2181 --describe --topic topic_name
确定需要转移的topic。例如我们需要转移的topic为topicA和topicB,按照如下格式编写json文件:
{
"topics": [
{"topic": "topicA"},
{"topic": "topicB"}
],
"version":1
}
保存为topics-to-move.json
文件。
然后使用如下命令,Kafka会自动生成各个partition需要转移到的目标broker,partition的均衡性自动保证。
bin/kafka-reassign-partitions.sh --zookeeper zk:2181 --topics-to-move-json-file topics-to-move.json --broker-list "3,4,5" --generate
输出可能如下所示:
Current partition replica assignment
{"version":1,"partitions":[{"topic":"topicA","partition":2,"replicas":[0,2]},
{"topic":"topicA","partition":1,"replicas":[1,0]},
{"topic":"topicB","partition":1,"replicas":[0,1]},
{"topic":"topicA","partition":0,"replicas":[2,1]},
{"topic":"topicB","partition":0,"replicas":[2,0]},
{"topic":"topicB","partition":2,"replicas":[1,2]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"topicA","partition":2,"replicas":[3,4]},
{"topic":"topicA","partition":1,"replicas":[5,3]},
{"topic":"topicB","partition":1,"replicas":[3,4]},
{"topic":"topicA","partition":0,"replicas":[4,5]},
{"topic":"topicB","partition":0,"replicas":[5,3]},
{"topic":"topicB","partition":2,"replicas":[4,5]}]}
我们再编写一个json文件,例如move-to-brokers.json
,将Proposed partition reassignment configuration
下面的内容填入该文件。最后执行:
bin/kafka-reassign-partitions.sh --zookeeper zk:2181 --reassignment-json-file move-to-brokers.json --execute
开始执行topic转移任务。可以使用:
bin/kafka-reassign-partitions.sh --zookeeper zk:2181 --reassignment-json-file move-to-brokers.json --verify
命令查看转移操作执行状态。
减少log保存时间
观察并调整如下参数:
- log.retention.bytes:保留日志大小,单位字节
- log.retention.hours:保留日志时间,单位小时
- log.retention.minutes:保留日志时间,单位分钟
- log.retention.ms:保留日志时间,单位毫秒
- log.retention.check.interval.ms:多久检测一次log是否需要删除
注意:上面的限制条件如果同时配置均有效,无论哪个首先满足都会触发。
删除无用topic
查找集群中已存在的topic,删除掉不再使用的。此步骤相当于手动均衡各个磁盘下的partition分布。删除topic的命令如下:
./bin/kafka-topics --delete --zookeeper zk:2181 --topic topic_name
网友评论