    Log.scala定义了 10 个类和对象,图中括号里的 C 表示 Class,O 表示 Object。


    class Log(@volatile var dir: File,
              @volatile var config: LogConfig,
              @volatile var logStartOffset: Long,
              @volatile var recoveryPoint: Long,
              scheduler: Scheduler,
              brokerTopicStats: BrokerTopicStats,
              val time: Time,
              val maxProducerIdExpirationMs: Int,
              val producerIdExpirationCheckIntervalMs: Int,
              val topicPartition: TopicPartition,
              val producerStateManager: ProducerStateManager,
              logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {


    在kafka中,我们用Log End Offset(LEO)表示日志下一条待插入消息的位移值,也就是日志的末端位移。

    Log Start Offset表示日志当前对外可见的最早一条消息的位移值。


        @volatile private var nextOffsetMetadata: LogOffsetMetadata = _
        @volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
        private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
        @volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None
    • nextOffsetmetadata:封装了下一条待插入消息的位移值
    • highWatermarkMetadata:用来区分日志的高水位值,即已提交事务和未提交事务的分界
    • segment:保存了分区日志下所有的日志段信息,Map结构,key是日志段的起始位移,value是日志段本身对象
    • Leader Epoch Cache 对象保存了分区 Leader 的 Epoch 值与对应位移值的映射关系。用来判断出现Failure时是否执行日志截断操作。


      locally {
        val startMs = time.milliseconds
        // create the log directory if it doesn't exist
        //初始化Leader Epoch Cache
        val nextOffset = loadSegments()
        /* Calculate the offset of the next message */
        nextOffsetMetadata = LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)
        logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)
        // The earliest leader epoch may not be flushed during a hard failure. Recover it here.
        //更新Leader Epoch Cache,清除无效数据
        // Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
        // from scratch.
        if (!producerStateManager.isEmpty)
          throw new IllegalStateException("Producer state must be empty during log initialization")
        loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile)
        info(s"Completed load of log with ${segments.size} segments, log start offset $logStartOffset and " +
          s"log end offset $logEndOffset in ${time.milliseconds() - startMs} ms")

    Leader Epoch暂且不表,我们看看loadSegments是如何加载日志段的。


      private def loadSegments(): Long = {
        // first do a pass through the files in the log directory and remove any temporary files
        // and find any interrupted swap operations
        //移除上次 Failure 遗留下来的各种临时文件(包括.cleaned、.swap、.deleted 文件等)
        val swapFiles = removeTempFilesAndCollectSwapFiles()
        // Now do a second pass and load all the log and index files.
        // We might encounter legacy log segments with offset overflow (KAFKA-6264). We need to split such segments. When
        // this happens, restart loading segment files from scratch.
        //清空所有日志段对象,并且再次遍历分区路径,重建日志段 segments Map 并删除无对应日志段文件的孤立索引文件。
        retryOnOffsetOverflow {
          // In case we encounter a segment with offset overflow, the retry logic will split it after which we need to retry
          // loading of segments. In that case, we also need to close all segments that could have been left open in previous
          // call to loadSegmentFiles().
        // Finally, complete any interrupted swap operations. To be crash-safe,
        // log files that are replaced by the swap segment should be renamed to .deleted
        // before the swap file is restored as the new segment file.
        //完成未完成的 swap 操作
        if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
          val nextOffset = retryOnOffsetOverflow {
          // reset the index size of the currently active log segment to allow more entries
        } else {
           if (logSegments.isEmpty) {
              addSegment(LogSegment.open(dir = dir,
                baseOffset = 0,
                time = time,
                fileAlreadyExists = false,
                initFileSize = this.initFileSize,
                preallocate = false))

    这个方法首先会调用removeTempFilesAndCollectSwapFiles方法移除上次 Failure 遗留下来的各种临时文件(包括.cleaned、.swap、.deleted 文件等)。

    然后它会清空所有日志段对象,并且再次遍历分区路径,重建日志段 segments Map 并删除无对应日志段文件的孤立索引文件。

    待执行完这两次遍历之后,它会完成未完成的 swap 操作,即调用 completeSwapOperations 方法。等这些都做完之后,再调用 recoverLog 方法恢复日志段对象,然后返回恢复之后的分区日志 LEO 值。


      private def removeTempFilesAndCollectSwapFiles(): Set[File] = {
        // 在方法内部定义一个名为deleteIndicesIfExist的方法,用于删除日志文件对应的索引文件
        def deleteIndicesIfExist(baseFile: File, suffix: String = ""): Unit = {
          info(s"Deleting index files with suffix $suffix for baseFile $baseFile")
          val offset = offsetFromFile(baseFile)
          Files.deleteIfExists(Log.offsetIndexFile(dir, offset, suffix).toPath)
          Files.deleteIfExists(Log.timeIndexFile(dir, offset, suffix).toPath)
          Files.deleteIfExists(Log.transactionIndexFile(dir, offset, suffix).toPath)
        var swapFiles = Set[File]()
        var cleanFiles = Set[File]()
        var minCleanedFileOffset = Long.MaxValue
        for (file <- dir.listFiles if file.isFile) {
          if (!file.canRead)
            throw new IOException(s"Could not read file $file")
          val filename = file.getName
          if (filename.endsWith(DeletedFileSuffix)) {
            debug(s"Deleting stray temporary file ${file.getAbsolutePath}")
            // 说明是上次Failure遗留下来的文件,直接删除
          //  如果是以.cleaned结尾的文件
          } else if (filename.endsWith(CleanedFileSuffix)) {
            minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset)
            cleanFiles += file
          //  .swap结尾的文件
          } else if (filename.endsWith(SwapFileSuffix)) {
            // we crashed in the middle of a swap operation, to recover:
            // if a log, delete the index files, complete the swap operation later
            // if an index just delete the index files, they will be rebuilt
            val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
            info(s"Found file ${file.getAbsolutePath} from interrupted swap operation.")
            if (isIndexFile(baseFile)) {
              // 删除原来的索引文件
              // 如果该.swap文件原来是日志文件
            } else if (isLogFile(baseFile)) {
              // 删除掉原来的索引文件
              // 加入待恢复的.swap文件集合中
              swapFiles += file
        // KAFKA-6264: Delete all .swap files whose base offset is greater than the minimum .cleaned segment offset. Such .swap
        // files could be part of an incomplete split operation that could not complete. See Log#splitOverflowedSegment
        // for more details about the split operation.
        // 从待恢复swap集合中找出那些起始位移值大于minCleanedFileOffset值的文件,直接删掉这些无效的.swap文件
        val (invalidSwapFiles, validSwapFiles) = swapFiles.partition(file => offsetFromFile(file) >= minCleanedFileOffset)
        invalidSwapFiles.foreach { file =>
          debug(s"Deleting invalid swap file ${file.getAbsoluteFile} minCleanedFileOffset: $minCleanedFileOffset")
          val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
          deleteIndicesIfExist(baseFile, SwapFileSuffix)
        // Now that we have deleted all .swap files that constitute an incomplete split operation, let's delete all .clean files
        // 清除所有待删除文件集合中的文件
        cleanFiles.foreach { file =>
          debug(s"Deleting stray .clean file ${file.getAbsolutePath}")
        // 最后返回当前有效的.swap文件集合
    1. 定义了一个内部方法deleteIndicesIfExist,用于删除日志文件对应的索引文件。
    2. 遍历文件列表删除遗留文件,并筛选出.cleaned结尾的文件和.swap结尾的文件。
    3. 根据minCleanedFileOffset删除无效的.swap文件。
    4. 最后返回当前有效的.swap文件集合



      private def loadSegmentFiles(): Unit = {
        // load segments in ascending order because transactional data from one segment may depend on the
        // segments that come before it
        for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) {
          if (isIndexFile(file)) {
            // if it is an index file, make sure it has a corresponding .log file
            val offset = offsetFromFile(file)
            val logFile = Log.logFile(dir, offset)
            // 确保存在对应的日志文件,否则记录一个警告,并删除该索引文件
            if (!logFile.exists) {
              warn(s"Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.")
          //  如果是以.log结尾的文件
          } else if (isLogFile(file)) {
            // if it's a log file, load the corresponding log segment
            val baseOffset = offsetFromFile(file)
            val timeIndexFileNewlyCreated = !Log.timeIndexFile(dir, baseOffset).exists()
            // 创建对应的LogSegment对象实例,并加入segments中
            val segment = LogSegment.open(dir = dir,
              baseOffset = baseOffset,
              time = time,
              fileAlreadyExists = true)
            try segment.sanityCheck(timeIndexFileNewlyCreated)
            catch {
              case _: NoSuchFileException =>
                error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " +
                  "recovering segment and rebuilding index files...")
              case e: CorruptIndexException =>
                warn(s"Found a corrupted index file corresponding to log file ${segment.log.file.getAbsolutePath} due " +
                  s"to ${e.getMessage}}, recovering segment and rebuilding index files...")
    1. 遍历文件目录
    2. 如果文件是索引文件,那么检查一下是否存在相应的日志文件。
    3. 如果是日志文件,那么创建对应的LogSegment对象实例,并加入segments中。

    接下来调用completeSwapOperations方法处理有效.swap 文件集合。


      private def completeSwapOperations(swapFiles: Set[File]): Unit = {
        // 遍历所有有效.swap文件
        for (swapFile <- swapFiles) {
          val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, ""))
          val baseOffset = offsetFromFile(logFile)// 拿到日志文件的起始位移值
          // 创建对应的LogSegment实例
          val swapSegment = LogSegment.open(swapFile.getParentFile,
            baseOffset = baseOffset,
            time = time,
            fileSuffix = SwapFileSuffix)
          info(s"Found log file ${swapFile.getPath} from interrupted swap operation, repairing.")
          // 执行日志段恢复操作
          // We create swap files for two cases:
          // (1) Log cleaning where multiple segments are merged into one, and
          // (2) Log splitting where one segment is split into multiple.
          // Both of these mean that the resultant swap segments be composed of the original set, i.e. the swap segment
          // must fall within the range of existing segment(s). If we cannot find such a segment, it means the deletion
          // of that segment was successful. In such an event, we should simply rename the .swap to .log without having to
          // do a replace with an existing segment.
          // 确认之前删除日志段是否成功,是否还存在老的日志段文件
          val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.readNextOffset).filter { segment =>
            segment.readNextOffset > swapSegment.baseOffset
          // 如果存在,直接把.swap文件重命名成.log
          replaceSegments(Seq(swapSegment), oldSegments.toSeq, isRecoveredSwapFile = true)
    1. 遍历所有有效.swap文件;
    2. 创建对应的LogSegment实例;
    3. 执行日志段恢复操作,恢复部分的源码已经在LogSegment里面讲了;
    4. 把.swap文件重命名成.log;



      private def recoverLog(): Long = {
        // if we have the clean shutdown marker, skip recovery
        // 如果不存在以.kafka_cleanshutdown结尾的文件。通常都不存在
        if (!hasCleanShutdownFile) {
          // okay we need to actually recover this log
          // 获取到上次恢复点以外的所有unflushed日志段对象
          val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).toIterator
          var truncated = false
          // 遍历这些unflushed日志段
          while (unflushed.hasNext && !truncated) {
            val segment = unflushed.next
            info(s"Recovering unflushed segment ${segment.baseOffset}")
            val truncatedBytes =
              try {
                // 执行恢复日志段操作
                recoverSegment(segment, leaderEpochCache)
              } catch {
                case _: InvalidOffsetException =>
                  val startOffset = segment.baseOffset
                  warn("Found invalid offset during recovery. Deleting the corrupt segment and " +
                    s"creating an empty one with starting offset $startOffset")
            if (truncatedBytes > 0) {// 如果有无效的消息导致被截断的字节数不为0,直接删除剩余的日志段对象
              // we had an invalid message, delete all remaining log
              warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}")
              removeAndDeleteSegments(unflushed.toList, asyncDelete = true)
              truncated = true
        // 这些都做完之后,如果日志段集合不为空
        if (logSegments.nonEmpty) {
          val logEndOffset = activeSegment.readNextOffset
          if (logEndOffset < logStartOffset) {
            warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
              "This could happen if segment files were deleted from the file system.")
            removeAndDeleteSegments(logSegments, asyncDelete = true)
        // 这些都做完之后,如果日志段集合为空了
        if (logSegments.isEmpty) {
          // no existing segments, create a new mutable segment beginning at logStartOffset
          // 至少创建一个新的日志段,以logStartOffset为日志段的起始位移,并加入日志段集合中
          addSegment(LogSegment.open(dir = dir,
            baseOffset = logStartOffset,
            time = time,
            fileAlreadyExists = false,
            initFileSize = this.initFileSize,
            preallocate = config.preallocate))
        // 更新上次恢复点属性,并返回
        recoveryPoint = activeSegment.readNextOffset




