日志段及其相关代码是kafka服务器源码中最为重要的组件代码之一。
kafka日志结构概览
kafka日志在磁盘上的组织架构如下图所示:

日志是kafka服务器端代码的重要组件之一,很多其他的核心组件都是以日志为基础的,比如状态管理机和副本管理器等。
kakfa日志对象由多个日志端对象组成,而每个日志段对象会在磁盘上创建一组文件,包括消息日志文件(.log)、位移索引文件(.index)、时间戳索引文件(.timeindex)以及已中止(Aborted)事务的索引文件(.txnindex)。当然,如果没有使用kafka事务,已中止事务的索引文件是不会被创建出来的。图中的一串数字0是该日志段的起始位移值(Base Offset),也就是该日志所存的第一条消息的位移值。
一般情况下,一个kafka主题有很多分区,每个分区就对应一个Log对象,在物理磁盘上则对应一个子目录。比如你创建一个两个分区的test-topic,那么,kafka在磁盘上会创建两个子目录:test-topic-0和test-topic-1。而在服务器端,这就是两个Log对象。每个子目录下存在多组日志段,也就是多组.log、.index、.timeindex文件组合,只不过文件名不同,因为每个日志段的起始位移不同。
日志段代码解析
日志段源码位于kafka的core工程下,具体文件位置是core/src/main/scala/kafka/log/LogSegment.scala。实际上,所有日志结构部分的源码都在core的kafka.log包下。
该文件下定义了三个scala对象:
- LogSegment class
- LogSegment object
- LogFlushStats object。LogFlushStats结尾有个Stats,它是做统计用的,主要负责为日志落盘进行计时。
首先看一下这段源码的注释:
A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in any previous segment.
这段英文清楚地说明了每个日志段由两个核心的组件构成:日志和索引。当然,这里的索引泛指广义的索引文件。另外,这段注释还给出了一个重要的事实:每个日志段都有一个起始位移值(Base Offset),而该位移值是此日志段所有消息中最小的位移值,同时,该值却又比前面任何日志段中消息的位移值都大。
日志段类声明
首先看一下LogSegment的定义:
class LogSegment private[log] (val log: FileRecords,
val lazyOffsetIndex: LazyIndex[OffsetIndex],
val lazyTimeIndex: LazyIndex[TimeIndex],
val txnIndex: TransactionIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val time: Time) extends Logging { ... }
就像前面说的,一个日志段包含消息日志文件、位移索引文件、时间戳索引文件、已中止事务索引文件等。这里的FileRecords就是实际保存kafka消息的对象。
下面是lazyOffsetIndex、LazyTimeIndex和txnIndex分别对应刚才的三个索引文件。
每个日志段对象保存自己的起始位移baseOffset---这是非常重要的属性!事实上,你在磁盘看到的文件名就是baseOffset的值。每个LogSegment对象实例一旦被创建,它的起始位移就是固定的了,不能再被更改。
indexIntervalBytes值其实就是Broker段参数log.index.interval.bytes值,它控制了日志段对象新增索引项的频率。默认情况下,日志段至少新写入4kb的消息数据才会新增一条索引项。而rollJitterMs是日志段对象新增倒计时的“扰动值”。因为目前Broker端日志段新增倒计时是全局设置,这就是说,在未来的某个时刻可能同时创建多个日志段对象,这将极大地增加物理磁盘I/O压力。有了rollJitterMs值的干扰,每个新增日志段在创建时会彼此岔开一小段时间,这样就可以缓解物理磁盘的I/O负载瓶颈。
下面,看一下重要的方法。
对于一个日志段而言,最重要的方法就是写入消息和读取消息了,他们分别对应着源码的append方法和read方法。另外,recover方法同样很关键,它是Broker重启后恢复日志段的操作逻辑。
append方法
def append(largestOffset: Long,
largestTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
records: MemoryRecords): Unit = {
if (records.sizeInBytes > 0) {
trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
val physicalPosition = log.sizeInBytes()
if (physicalPosition == 0)
rollingBasedTimestamp = Some(largestTimestamp)
ensureOffsetInRange(largestOffset)
// append the messages
val appendedBytes = log.append(records)
trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
// Update the in memory max timestamp and corresponding offset.
if (largestTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = largestTimestamp
offsetOfMaxTimestampSoFar = shallowOffsetOfMaxTimestamp
}
// append an entry to the index (if needed)
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex.append(largestOffset, physicalPosition)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
bytesSinceLastIndexEntry = 0
}
bytesSinceLastIndexEntry += records.sizeInBytes
}
}
append方法接收4个参数,分别表示写入消息批次中消息的最大位移值、最大时间戳、最大时间戳对应消息的位移以及真正要写入的消息集合。下面这张图展示了append方法的完整执行流程:

第一步:
在源码中,首先调用log.sizeInBytes方法判断该日志是否为空,如果是空的话,kafka需要记录要写入消息集合的最大时间戳,并将其作为后面新增日志段倒计时的依据。
第二步:
代码调用ensureOffsetInRange方法确保输入参数最大位移值是合法的。那怎么判断是不是合法呢?标准就是看它与日志段起始位移的差值是否在整数范围内,即largestOffset-baseOffset的值是不是介于[0, Int.MAXVALUE]之间。在极个别的情况下,这个差值可能会越界,这时,append方法就会抛出异常,阻止后续的消息写入。一旦碰到这个问题,你需要做的是升级你的kafka版本,因为这是由已知的bug导致的。
第三步:
待这些做完之后,append方法调用FileRecords的append方法执行真正的写入。
第四步:
再下一步,就是更新日志段的最大时间戳以及最大时间戳所属消息的位移值属性。每个日志段都要保存当前最大时间戳信息和所属消息的位移信息。
比如Broker端提供定期删除日志的功能,当前最大时间戳这个值就是判断的依据:而最大时间戳对应的消息的位移值则用于时间戳索引项。
第五步:
append方法的最后一步就是更新索引项和写入的字节数了。日志段每写入4kb数据就要写入一个索引项。当已写入字节数超过了4kb之后,append方法会调用索引对象的append方法新增索引项,同时清空已写入字节数,以备下次重新累积计算。
read方法
再看一下read方法,了解下读取日志段的具体操作。
def read(startOffset: Long,
maxSize: Int,
maxPosition: Long = size,
minOneMessage: Boolean = false): FetchDataInfo = {
if (maxSize < 0)
throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")
val startOffsetAndSize = translateOffset(startOffset)
// if the start position is already off the end of the log, return null
if (startOffsetAndSize == null)
return null
val startPosition = startOffsetAndSize.position
val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
val adjustedMaxSize =
if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
else maxSize
// return a log segment but with zero size in the case below
if (adjustedMaxSize == 0)
return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)
FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
}
read方法接收4个输入参数。
- startOffset:要读取的第一条消息的位移
- maxSize:能读取的最大字节数
- maxPosition:能读到的最大文件位置
- minOneMessage:是否允许在消息体过大时至少返回第一条消息。
前3个参数的含义很好理解,第四个参数为true时,即使出现消息体字节数超过了maxSize的情形,read方法依然能返回至少一条消息。引入这个参数是为了确保不出现消费饿死的情况。
下入展示了read方法的完整执行逻辑:

第一步是调用了translateOffset方法定位要读取的起始文件位置(startPosition)。输入参数startOffset仅仅是位移值,kafka需要根据索引信息找到对应的物理文件位置才能开始读取消息。
待确定了读取起始位置,日志段代码需要根据这部分信息以及maxSize和maxPosition参数共同计算要读取的总字节数。举个例子,假设maxSize=100,maxposition=300,startPosition=250,那么read方法方法只能读取50字节,因为maxPosition - startPosition = 50。我们把它和maxSize参数相比较,其中的最小值就是最终能够读取的总字节数。
最后一步是调用FileRecords的slice方法,从指定的位置读取指定大小的消息集合。
recover方法:
除了append和read方法,LogSegment还有一个重要的方法,他就是recover方法,用于恢复日志段。
下面的代码是recover方法源码。什么是恢复日志段?其实就是说,Broker在启动时会从磁盘上加载所有日志段信息到内存中,并创建相应的LogSegment对象实例。在这个过程中,它需要执行一系列的操作。
def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {
offsetIndex.reset()
timeIndex.reset()
txnIndex.reset()
var validBytes = 0
var lastIndexEntry = 0
maxTimestampSoFar = RecordBatch.NO_TIMESTAMP
try {
for (batch <- log.batches.asScala) {
batch.ensureValid()
ensureOffsetInRange(batch.lastOffset)
// The max timestamp is exposed at the batch level, so no need to iterate the records
if (batch.maxTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = batch.maxTimestamp
offsetOfMaxTimestampSoFar = batch.lastOffset
}
// Build offset index
if (validBytes - lastIndexEntry > indexIntervalBytes) {
offsetIndex.append(batch.lastOffset, validBytes)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
lastIndexEntry = validBytes
}
validBytes += batch.sizeInBytes()
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
leaderEpochCache.foreach { cache =>
if (batch.partitionLeaderEpoch > 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
}
updateProducerState(producerStateManager, batch)
}
}
} catch {
case e@ (_: CorruptRecordException | _: InvalidRecordException) =>
warn("Found invalid messages in log segment %s at byte offset %d: %s. %s"
.format(log.file.getAbsolutePath, validBytes, e.getMessage, e.getCause))
}
val truncated = log.sizeInBytes - validBytes
if (truncated > 0)
debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")
log.truncateTo(validBytes)
offsetIndex.trimToValidSize()
// A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true)
timeIndex.trimToValidSize()
truncated
}
下面展示recover的完整执行逻辑:

recover开始时,代码一次调用索引对象的reset方法清空所有的索引文件,之后会开始遍历日志段的所有消息集合或消息批次(RecordBatch)。对于读取到的每个消息集合,日志段必须要确保它们是合法的,这主要体现在两个方面:
1.该集合中的消息必须要符合kafka定义的二进制格式;
2.该集合中最后一条消息的位移值不能越界,即它与日志段起始位移的差值必须是一个正整数值。
校验完消息集合之后,代码会更新遍历过程中观测到的最大时间戳以及所属的位移值。同样,这两个数据用于后续构建索引项。最后是更新事务型Producer的状态以及Leader Epoch缓存。
遍历执行完成后,kakfa会将日志段当前总字节数和刚刚累加的已读取字节数进行比较,如果发现前者比后者大,说明日志段写入了一些非法消息,需要执行截断操作,将日志段调回合法的数值。同时,kafka还必须相应地调整索引文件的大小。把这些都做完之后,日志段恢复的操作也就宣告结束了。

接下来看一下Log源码结构
Log源码位于kafka core工程的log源码包下,文件名是Log.scala。总体上,该文件定义了10个类和对象,如下如所示:

1.LogAppendInfo
- LogAppendInfo(C):保存了一组待写入消息的各种元数据信息。比如,这组消息中第一条消息的位移值是多少、最后一条消息的位移值是多少;再比如,这组消息中最大的消息时间戳又是多少。
- LogAppendInfo(O):可以理解为其对应伴生类的工厂方法类,里面定义了一些工厂方法,用于创建特定的LogAppendInfo实例。
2.Log
- Log(C):Log源码中最核心的代码。
- Log(O):同理,Log半生类的工厂方法,定义了很多常量以及一些辅助方法。
3.RollParams
- RollParams(C):定义用于控制日志是否切分(Roll)的数据结构。
- RollParams(O):同理,RollParams半生类的工厂方法。
除了这3组伴生对象之外,还有4类源码。
- LogMetricNames:定义了Log对象的监控指标。
- LogOffsetSnapshot:封装分区所有位移元数据的容器类。
- LogReadInfo:封装读取日志返回的数据及其元数据。
- CompletedTxn:记录已完成事务的元数据,主要用于构建事务索引。
Log Class & Object
先看伴生类对象(即Log Object)的实现。
object Log {
val LogFileSuffix = ".log"
val IndexFileSuffix = ".index"
val TimeIndexFileSuffix = ".timeindex"
val ProducerSnapshotFileSuffix = ".snapshot"
val TxnIndexFileSuffix = ".txnindex"
val DeletedFileSuffix = ".deleted"
val CleanedFileSuffix = ".cleaned"
val SwapFileSuffix = ".swap"
val CleanShutdownFile = ".kafka_cleanshutdown"
val DeleteDirSuffix = "-delete"
val FutureDirSuffix = "-future"
……
}
这是Log Object定义的所有常量。
- .snapshot是kafka为幂等型或事务型Producer所作的快照文件。
- .deleted是删除日志段操作创建的文件。目前删除日志段文件是异步操作,Broker端把日志段文件从.log后缀修改为.deleted后缀。如果你看到一大堆.deleted后缀的文件名,这是kafka在执行日志段文件删除。
- .cleaned和.swap都是Compaction操作的产物。
- delete则是应用于文件夹的。当你删除一个主题的时候,主题的分区文件夹会被加上这个后缀。
- future是用于变更主题分区文件夹地址的,属于比较高阶的用法。
Log Object还定义了超多的工具类方法。
def filenamePrefixFromOffset(offset: Long): String = {
val nf = NumberFormat.getInstance()
nf.setMinimumIntegerDigits(20)
nf.setMaximumFractionDigits(0)
nf.setGroupingUsed(false)
nf.format(offset)
}
这个方法的作用是通过给定的位移值计算出对应的日志段文件名。kafka日志文件固定是20为的长度,filenamePrefixFromOffset方法就是用前面补0的方式,把给定位移值扩充成一个固定20位长度的字符串。
举个例子,我们给定一个位移值是12345,那么Broker端磁盘上对应的日志段文件名就应该是00000000000000012345.log。
下面来看Log源码部分的Log类。
class Log(@volatile var dir: File,
@volatile var config: LogConfig,
@volatile var logStartOffset: Long,
@volatile var recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
val time: Time,
val maxProducerIdExpirationMs: Int,
val producerIdExpirationCheckIntervalMs: Int,
val topicPartition: TopicPartition,
val producerStateManager: ProducerStateManager,
logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
……
}
看着好像有很多属性,但其实,只需要记住两个属性就够了:dir和logStartOffset。dir就是这个日志所在的文件夹路径,也就是主题分区的路径。而logStartOffset,表示日志的当前最早位移。dir和logStartOffset都是bolatile var类型,表示它们的值是变动的,而且可能被多个线程更新。
你可能听过日志的当前末端位移,也就是Log End Offset(LED),它是表示日志下一条待插入消息的位移值,而这个Log Start Offset是跟它相反的,它表示日志当前对外可见的最早一条消息的位移值。我用一张图来标识它们的区别:

图中绿色的位移值3是日志的Log Start Offset,而位移值15标识LEO。另外,位移值8是高水位值,它是区分已提交消息和未提交消息的分水岭。
有意思的是,Log End Offset可以简称为LEO,但Log Start Offset却不能简称为LSO。因为在kafka中,LSO特指Log Stable Offset,属于kafka事务的概念。
Log类的其他属性暂时不用理会,因为它们要么是很明显的工具类属性,比如timer和scheduler,要么是高阶用法才会用到的高级属性,比如producerStrateManager和logDirFailtureChannel。工具类的代码大多是做辅助用的。
其实,除了Log类签名定义的这些属性之外,Log类还定义了一些很重要的属性,比如下面这段代码:
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _
@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
@volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None
第一个属性nextOffsetMetadata,它封装了下一条待插入消息的位移值。
第二个属性highWatermarkMetadata,是分区日志高水位值。
第三个属性segments,我认为是Log类中最重要的属性。它保存了分区日志下所有的日志段信息,只不过是用的Map的数据结构来保存的。Map的key值是日志段的起始位移值,balue则是日志段对象本身。Kafka源码使用ConcurrentNavigableMap数据结构来保存日志段对象,就可以很轻松地利用该类提供的线程安全和各种支持排序的方法,来管理所有日志段对象。
第四个属性是Leader Epoch Cache对象。Leader Epoch是社区于0.11.0.0版本引入源码中的,主要是用来判断出现Failure时是否执行日志截断操作(Truncation)。之前靠高水位来判断的机制,可能会造成副本间数据不一致的情形。这里的Leader Epoch Cache是一个缓存类数据,里面保存了分区Leader的Epoch值与对应位移值的映射关系。
掌握了这些属性之后,我们看下Log类的初始化逻辑:
locally {
val startMs = time.milliseconds
// create the log directory if it doesn't exist
Files.createDirectories(dir.toPath)
initializeLeaderEpochCache()
val nextOffset = loadSegments()
/* Calculate the offset of the next message */
nextOffsetMetadata = LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)
leaderEpochCache.foreach(_.truncateFromEnd(nextOffsetMetadata.messageOffset))
logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)
// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
// Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
// from scratch.
if (!producerStateManager.isEmpty)
throw new IllegalStateException("Producer state must be empty during log initialization")
loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile)
info(s"Completed load of log with ${segments.size} segments, log start offset $logStartOffset and " +
s"log end offset $logEndOffset in ${time.milliseconds() - startMs}
在详细解释这段代码初始化代码之前,我是用一张图来说明它到底做了什么:

这里我们重点说说第三步,即加载日志段的实现逻辑,以下是loadSegments的实现代码:
private def loadSegments(): Long = {
// first do a pass through the files in the log directory and remove any temporary files
// and find any interrupted swap operations
val swapFiles = removeTempFilesAndCollectSwapFiles()
// Now do a second pass and load all the log and index files.
// We might encounter legacy log segments with offset overflow (KAFKA-6264). We need to split such segments. When
// this happens, restart loading segment files from scratch.
retryOnOffsetOverflow {
// In case we encounter a segment with offset overflow, the retry logic will split it after which we need to retry
// loading of segments. In that case, we also need to close all segments that could have been left open in previous
// call to loadSegmentFiles().
logSegments.foreach(_.close())
segments.clear()
loadSegmentFiles()
}
// Finally, complete any interrupted swap operations. To be crash-safe,
// log files that are replaced by the swap segment should be renamed to .deleted
// before the swap file is restored as the new segment file.
completeSwapOperations(swapFiles)
if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
val nextOffset = retryOnOffsetOverflow {
recoverLog()
}
// reset the index size of the currently active log segment to allow more entries
activeSegment.resizeIndexes(config.maxIndexSize)
nextOffset
} else {
if (logSegments.isEmpty) {
addSegment(LogSegment.open(dir = dir,
baseOffset = 0,
config,
time = time,
fileAlreadyExists = false,
initFileSize = this.initFileSize,
preallocate = false))
}
0
}
这段代码会对分区日志路径遍历两次。
首先,它会移除上次Failure遗留下来的各种临时文件(包括.cleaned、.swap、.deleted文件等),removeTempFilesAndCollectSwapFiles方法实现了这个逻辑。
之后,它会清空所有日志段对象,并且再次遍历分区路径,重建日志段segments Map以及索引文件。
待执行完这两次遍历之后,它会完成未完成的swap操作,即调用completeSwapOperations方法。等这些都做完之后,再调用recoverLog方法恢复日志段对象,然后返回恢复之后的分区日志LEO值。
我们首先来看第一步,removeTempFilesAndCollectSwapFiles方法的实现。
private def removeTempFilesAndCollectSwapFiles(): Set[File] = {
// 在方法内部定义一个名为deleteIndicesIfExist的方法,用于删除日志文件对应的索引文件
def deleteIndicesIfExist(baseFile: File, suffix: String = ""): Unit = {
info(s"Deleting index files with suffix $suffix for baseFile $baseFile")
val offset = offsetFromFile(baseFile)
Files.deleteIfExists(Log.offsetIndexFile(dir, offset, suffix).toPath)
Files.deleteIfExists(Log.timeIndexFile(dir, offset, suffix).toPath)
Files.deleteIfExists(Log.transactionIndexFile(dir, offset, suffix).toPath)
}
var swapFiles = Set[File]()
var cleanFiles = Set[File]()
var minCleanedFileOffset = Long.MaxValue
// 遍历分区日志路径下的所有文件
for (file <- dir.listFiles if file.isFile) {
if (!file.canRead) // 如果不可读,直接抛出IOException
throw new IOException(s"Could not read file $file")
val filename = file.getName
if (filename.endsWith(DeletedFileSuffix)) { // 如果以.deleted结尾
debug(s"Deleting stray temporary file ${file.getAbsolutePath}")
Files.deleteIfExists(file.toPath) // 说明是上次Failure遗留下来的文件,直接删除
} else if (filename.endsWith(CleanedFileSuffix)) { // 如果以.cleaned结尾
minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset) // 选取文件名中位移值最小的.cleaned文件,获取其位移值,并将该文件加入待删除文件集合中
cleanFiles += file
} else if (filename.endsWith(SwapFileSuffix)) { // 如果以.swap结尾
val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
info(s"Found file ${file.getAbsolutePath} from interrupted swap operation.")
if (isIndexFile(baseFile)) { // 如果该.swap文件原来是索引文件
deleteIndicesIfExist(baseFile) // 删除原来的索引文件
} else if (isLogFile(baseFile)) { // 如果该.swap文件原来是日志文件
deleteIndicesIfExist(baseFile) // 删除掉原来的索引文件
swapFiles += file // 加入待恢复的.swap文件集合中
}
}
}
// 从待恢复swap集合中找出那些起始位移值大于minCleanedFileOffset值的文件,直接删掉这些无效的.swap文件
val (invalidSwapFiles, validSwapFiles) = swapFiles.partition(file => offsetFromFile(file) >= minCleanedFileOffset)
invalidSwapFiles.foreach { file =>
debug(s"Deleting invalid swap file ${file.getAbsoluteFile} minCleanedFileOffset: $minCleanedFileOffset")
val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
deleteIndicesIfExist(baseFile, SwapFileSuffix)
Files.deleteIfExists(file.toPath)
}
// Now that we have deleted all .swap files that constitute an incomplete split operation, let's delete all .clean files
// 清除所有待删除文件集合中的文件
cleanFiles.foreach { file =>
debug(s"Deleting stray .clean file ${file.getAbsolutePath}")
Files.deleteIfExists(file.toPath)
}
// 最后返回当前有效的.swap文件集合
validSwapFiles
}
执行完了removeTempFilesAndCollectSwapFiles逻辑之后,源码开始清空已有日志段集合,并重新加载日志段文件。这就是第二步。这里调用的主要方法是loadSegmentFiles。
private def loadSegmentFiles(): Unit = {
// 按照日志段文件名中的位移值正序排列,然后遍历每个文件
for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) {
if (isIndexFile(file)) { // 如果是索引文件
val offset = offsetFromFile(file)
val logFile = Log.logFile(dir, offset)
if (!logFile.exists) { // 确保存在对应的日志文件,否则记录一个警告,并删除该索引文件
warn(s"Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.")
Files.deleteIfExists(file.toPath)
}
} else if (isLogFile(file)) { // 如果是日志文件
val baseOffset = offsetFromFile(file)
val timeIndexFileNewlyCreated = !Log.timeIndexFile(dir, baseOffset).exists()
// 创建对应的LogSegment对象实例,并加入segments中
val segment = LogSegment.open(dir = dir,
baseOffset = baseOffset,
config,
time = time,
fileAlreadyExists = true)
try segment.sanityCheck(timeIndexFileNewlyCreated)
catch {
case _: NoSuchFileException =>
error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " +
"recovering segment and rebuilding index files...")
recoverSegment(segment)
case e: CorruptIndexException =>
warn(s"Found a corrupted index file corresponding to log file ${segment.log.file.getAbsolutePath} due " +
s"to ${e.getMessage}}, recovering segment and rebuilding index files...")
recoverSegment(segment)
}
addSegment(segment)
}
}
}
第三步是处理第一步返回的有效.swap文件集合。completeSwapOperations方法就是做这件事的:
private def completeSwapOperations(swapFiles: Set[File]): Unit = {
// 遍历所有有效.swap文件
for (swapFile <- swapFiles) {
val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, "")) // 获取对应的日志文件
val baseOffset = offsetFromFile(logFile) // 拿到日志文件的起始位移值
// 创建对应的LogSegment实例
val swapSegment = LogSegment.open(swapFile.getParentFile,
baseOffset = baseOffset,
config,
time = time,
fileSuffix = SwapFileSuffix)
info(s"Found log file ${swapFile.getPath} from interrupted swap operation, repairing.")
// 执行日志段恢复操作
recoverSegment(swapSegment)
// We create swap files for two cases:
// (1) Log cleaning where multiple segments are merged into one, and
// (2) Log splitting where one segment is split into multiple.
//
// Both of these mean that the resultant swap segments be composed of the original set, i.e. the swap segment
// must fall within the range of existing segment(s). If we cannot find such a segment, it means the deletion
// of that segment was successful. In such an event, we should simply rename the .swap to .log without having to
// do a replace with an existing segment.
// 确认之前删除日志段是否成功,是否还存在老的日志段文件
val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.readNextOffset).filter { segment =>
segment.readNextOffset > swapSegment.baseOffset
}
// 如果存在,直接把.swap文件重命名成.log
replaceSegments(Seq(swapSegment), oldSegments.toSeq, isRecoveredSwapFile = true)
}
}
最后一步是recoverLog操作:
private def recoverLog(): Long = {
// if we have the clean shutdown marker, skip recovery
// 如果不存在以.kafka_cleanshutdown结尾的文件。通常都不存在
if (!hasCleanShutdownFile) {
// 获取到上次恢复点以外的所有unflushed日志段对象
val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).toIterator
var truncated = false
// 遍历这些unflushed日志段
while (unflushed.hasNext && !truncated) {
val segment = unflushed.next
info(s"Recovering unflushed segment ${segment.baseOffset}")
val truncatedBytes =
try {
// 执行恢复日志段操作
recoverSegment(segment, leaderEpochCache)
} catch {
case _: InvalidOffsetException =>
val startOffset = segment.baseOffset
warn("Found invalid offset during recovery. Deleting the corrupt segment and " +
s"creating an empty one with starting offset $startOffset")
segment.truncateTo(startOffset)
}
if (truncatedBytes > 0) { // 如果有无效的消息导致被截断的字节数不为0,直接删除剩余的日志段对象
warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}")
removeAndDeleteSegments(unflushed.toList, asyncDelete = true)
truncated = true
}
}
}
// 这些都做完之后,如果日志段集合不为空
if (logSegments.nonEmpty) {
val logEndOffset = activeSegment.readNextOffset
if (logEndOffset < logStartOffset) { // 验证分区日志的LEO值不能小于Log Start Offset值,否则删除这些日志段对象
warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
"This could happen if segment files were deleted from the file system.")
removeAndDeleteSegments(logSegments, asyncDelete = true)
}
}
// 这些都做完之后,如果日志段集合为空了
if (logSegments.isEmpty) {
// 至少创建一个新的日志段,以logStartOffset为日志段的起始位移,并加入日志段集合中
addSegment(LogSegment.open(dir = dir,
baseOffset = logStartOffset,
config,
time = time,
fileAlreadyExists = false,
initFileSize = this.initFileSize,
preallocate = config.preallocate))
}
// 更新上次恢复点属性,并返回
recoveryPoint = activeSegment.readNextOffset
re
coveryPoint
一般把Log的常见操作分为4个大部分。
- 高水位管理操作:高水位的概念在kafka中举足轻重,对它的管理,是Log最重要的功能之一。
- 日志段管理:Log是日志段的容器。高效组织与管理其下辖的所有日志段对象,是源码要解决的核心问题。
- 关键位移值管理:日志定义了很多重要的位移值,比如Log Start Offset和LEO等。确保这些位移值的正确性,是构建消息引擎一致性的基础。
- 读写操作:所谓的操作日志,大体上就是指读写日志。读写操作的作用之大,不言而喻。
高水位管理操作
在介绍高水位管理操作之前,我们先来了解一下高水位的定义。
定义
源码中日志对象定义高水位的语句只有一行:
@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
这行语句传达了两个重要的事实:
- 高水位值是volatile的。因为多个线程可能同时读取它,因此需要设置成volatile,保证内存可见性。另外,由于高水位值可能被多个线程同时修改,因此源码使用java Monitor锁来确保并发修改的线程安全。
- 高水位值初始值是Log Start Offset值。每个Log对象都会维护一个Log Start Offset值。当首次构建高水位时,它会被赋值成Log Start Offset值。
你可能会关心LogOffsetMetadata是什么对象。因为它比较重要,我们一起来看下这个类定义:
case class LogOffsetMetadata(messageOffset: Long,
segmentBaseOffset: Long = Log.UnknownOffset,
relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition)
显然,他就是一个pojo类,里面保存了三个重要的变量。
- messageOffset:消息位移值,这是最重要的信息。我们总说高水位值,其实指得就是这个变量的值。
- segementBaseOffset:保存该位移值所在日志段的起始位移。日志段起始位移值辅助计算两条消息在物理磁盘文件中位置的差值,即两条消息彼此隔离多少字节。这个计算有个前提条件,即两条消息必须处在同一个日志段对象上,不能跨日志段对象。斗则它们就位于不同的物理文件上,计算这个值就没有意义了。这里的segmentBaseOffset,就是用来判断两条消息是否处于同一个日志段的。
- relativePositionSegment:保存该位移值所在日志段的物理磁盘位置。这个字段在计算两个位移值之间的物理磁盘位置差值时非常有用。你可以想一想,kafka什么时候需要计算位置之间的字节数呢?答案就是读取日志的时候,假设每次读取时只能读取1MB的数据,那么,源码肯定需要关心两个位移之间所有消息的总字节数是否超过了1MB。
LogOffsetMetadata类的所有方法,都是围绕这3个变量展开的辅助类方法,非常容易理解。比如:
def onSameSegment(that: LogOffsetMetadata): Boolean = {
if (messageOffsetOnly)
throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")
this.segmentBaseOffset == that.segmentBaseOffset
}
看名字我们就知道了,这个方法是用来判断给定的两个LogOffsetMetadata对象是否处于同一个日志段的。判断的方法很简单,就是比较两个LogOffsetMetadata对象的segmentBaseOffset值是否相等。
获取和设置高水位值
关于获取高水位值的方法,起始很好理解。设置高水位值的方法,也就是setter方法更复杂一些:
// getter method:读取高水位的位移值
def highWatermark: Long = highWatermarkMetadata.messageOffset
// setter method:设置高水位值
private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
if (newHighWatermark.messageOffset < 0) // 高水位值不能是负数
throw new IllegalArgumentException("High watermark offset should be non-negative")
lock synchronized { // 保护Log对象修改的Monitor锁
highWatermarkMetadata = newHighWatermark // 赋值新的高水位值
producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset) // 处理事务状态管理器的高水位值更新逻辑,忽略它……
maybeIncrementFirstUnstableOffset() // First Unstable Offset是Kafka事务机制的一部分,忽略它……
}
trace(s"Setting high watermark $newHighWatermark")
}
网友评论