美文网首页
Kafka源码分析-Server-日志存储(5)-LogSegm

Kafka源码分析-Server-日志存储(5)-LogSegm

作者: 陈阳001 | 来源:发表于2019-03-08 18:44 被阅读0次

    为了防止Log文件过大,将Log切分成多个日志文件,每个日志文件对应一个LogSegment。在LogSegment中封装一个FileMessageSet和一个OffsetIndex对象,提供日志文件和索引文件的读写功能以及其他的辅助功能。
    看下核心字段:

    • log: 用于操作对应日志文件的FileMessageSet对象。
    • index:用于操作对应索引文件的OffsetIndex对象。
    • baseOffset: LogSegment中第一条消息的offset值。
    • indexIntervalBytes:索引项之间间隔的最小字节数。
    • bytesSinceLastIndexEntry:记录自从上次添加索引项之后,在日志文件中累计加入的Message集合的字节数,用于判断下次索引项添加的时机。
    • created:标识LogSegment对象创建时间,当调用truncateTo()方法将整个日志文件清空时,会将此字段重置为当前时间。参与创建新LogSegment的条件判断,在介绍Log类时会介绍。
      在LogSegment.append()方法中实现了追加消息的功能,可能有多个Handler线程并发写入同一个LogSegment,所以调用这个方法时必须要保证线程安全,后面的介绍Log类时会看到相应的同步代码。另外,注意append()方法的参数,其第二个参数messages表示的事待追加的消息集合,第一个参数offset表示messages中的第一个消息的offset,如果是压缩消息,则是第一条内层的offset。append()方法代码如下:
     def append(offset: Long, messages: ByteBufferMessageSet) {
        //检测是否在满足添加索引项的条件
        if (messages.sizeInBytes > 0) {
          trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
          // append an entry to the index (if needed)
          if(bytesSinceLastIndexEntry > indexIntervalBytes) {
            index.append(offset, log.sizeInBytes())
            //成功添加索引后,bytesSinceLastIndexEntry重置为0
            this.bytesSinceLastIndexEntry = 0
          }
          // append the messages 写日志文件
          log.append(messages)
          this.bytesSinceLastIndexEntry += messages.sizeInBytes
        }
      }
    
    LogSegment索引append.png

    读取消息的功能由LogSegment.read()方法实现,它有四个参数。

    • startOffset:指定读取的起始消息offset。
    • maxOffset:指定读取结束的offset,可以放空。
    • maxSize:指定读取的最大字节数。
    • maxPosition:指定读取的最大物理地址,可选参数,默认值是日志文件的大小。
      在读取日志文件之前,需要将startOffset和maxOffset转化为对应的物理地址才能使用。这个转换在translateOffset()方法中实现,我们先用一个例子来介绍这个功能。假设startOffset是1017,下图展示了将1017这个offset转换成对应的物理地址的过程。

    相关文章

      网友评论

          本文标题:Kafka源码分析-Server-日志存储(5)-LogSegm

          本文链接:https://www.haomeiwen.com/subject/tkddpqtx.html