为了防止Log文件过大,将Log切分成多个日志文件,每个日志文件对应一个LogSegment。在LogSegment中封装一个FileMessageSet和一个OffsetIndex对象,提供日志文件和索引文件的读写功能以及其他的辅助功能。
看下核心字段:
- log: 用于操作对应日志文件的FileMessageSet对象。
- index:用于操作对应索引文件的OffsetIndex对象。
- baseOffset: LogSegment中第一条消息的offset值。
- indexIntervalBytes:索引项之间间隔的最小字节数。
- bytesSinceLastIndexEntry:记录自从上次添加索引项之后,在日志文件中累计加入的Message集合的字节数,用于判断下次索引项添加的时机。
- created:标识LogSegment对象创建时间,当调用truncateTo()方法将整个日志文件清空时,会将此字段重置为当前时间。参与创建新LogSegment的条件判断,在介绍Log类时会介绍。
在LogSegment.append()方法中实现了追加消息的功能,可能有多个Handler线程并发写入同一个LogSegment,所以调用这个方法时必须要保证线程安全,后面的介绍Log类时会看到相应的同步代码。另外,注意append()方法的参数,其第二个参数messages表示的事待追加的消息集合,第一个参数offset表示messages中的第一个消息的offset,如果是压缩消息,则是第一条内层的offset。append()方法代码如下:
def append(offset: Long, messages: ByteBufferMessageSet) {
//检测是否在满足添加索引项的条件
if (messages.sizeInBytes > 0) {
trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
// append an entry to the index (if needed)
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
index.append(offset, log.sizeInBytes())
//成功添加索引后,bytesSinceLastIndexEntry重置为0
this.bytesSinceLastIndexEntry = 0
}
// append the messages 写日志文件
log.append(messages)
this.bytesSinceLastIndexEntry += messages.sizeInBytes
}
}
LogSegment索引append.png
读取消息的功能由LogSegment.read()方法实现,它有四个参数。
- startOffset:指定读取的起始消息offset。
- maxOffset:指定读取结束的offset,可以放空。
- maxSize:指定读取的最大字节数。
- maxPosition:指定读取的最大物理地址,可选参数,默认值是日志文件的大小。
在读取日志文件之前,需要将startOffset和maxOffset转化为对应的物理地址才能使用。这个转换在translateOffset()方法中实现,我们先用一个例子来介绍这个功能。假设startOffset是1017,下图展示了将1017这个offset转换成对应的物理地址的过程。
网友评论