基本概念
Kafka使用日志文件的方式保存生产者发送的消息。每条消息都有一个offset值来表示它在分区的偏移量,这个offset值是逻辑值,并不是消息实际的存放物理地址。offset类似数据库表的主键,主键唯一确定了数据库表中的一条记录,offset唯一确定了分区的一条消息。Kafka存储机制在逻辑上如下图:
日志存储结构.png
为了提高写入的性能,同一个分区中的消息是顺序写入的,这就避免了随机写入带来的性能问题。一个topic可以有n个分区,每个分区也有多个副本。当一个分区的副本(无论是Leader副本还是Follower副本)被划分到某个Broker上时,Kafka就要在此Broker上为此分区建立相应的Log,生产者发送的消息会存储在Log里,然后被消费者拉取消费。
Kafka中存储的数据都是海量的,为了避免日志文件太大,Log并不是直接对应磁盘上的一个日志文件,而是对应磁盘上的一个目录,目录的命名规则是<topic_name>_<partition_id>,Log和分区直接的关系是一一对应的,对应分区的全部消息都存储在这个目录中。
Kafka通过分段的方式将Log分为多个LogSegment,LogSegment是一个逻辑上的概念,一个LogSegment对应磁盘上的一个日志文件和一个索引文件,,日志文件用于记录消息,索引文件保存消息的索引。日志文档到一个阈值时,就会创建新的日志文件继续写入后续的消息和索引信息。日志文件的文件名的命名规则是[baseOffset].log,baseOffset是日志文件中的第一条消息的offset。下面是Log的结构:
Log结构.png
为了提高查询消息的效率,每个日志文件都对应一个索引文件,这个索引文件并没有为每条消息都建索引,而是使用稀疏索引方式为日志文件中的部分消息建立了索引。下面的图展示了所有文件和日志文件的关系:
log的索引.png
FileMessageSet
Kafka使用FileMessageSet管理日志文件,它对应磁盘上一个真正的日志文件。FileMessageSet继承了MessageSet抽象类,如下图:
FileMessageSet类图.png
MessageSet中保存的数据格式有三个部分:8个字节的offset值,4个字节的size表示message data的大小,这两个结合成为LogOverhead, message data保存了消息的数据,逻辑上对应一个Message对象:
FileMessageSet定义的消息结构.png
Kafka使用Message类表示消息,Message类使用ByteBuffer保存数据,格式和各个部分的含义如下:
Message类定义的保存数据.png
- CRC32:4个字节,消息的校验码。
- magic:1个字节,魔数标识,与消息的格式相关,取值为0或1。当magic为0时,消息的offset使用绝对offset且消息格式中没有timestamp部分;当magic为1时,消息的offset使用相对的offset且消息格式中有timestamp部分。所以,magic不同,消息的长度不同。
- attributes:1个字节,消息的属性。其中0~2位表示消息使用的压缩类型,0表示gzip压缩,2表示snappy压缩,3表示lz4压缩。第3位表示时间戳类型,0表示创建时间,1表示追加时间。
- timestamp:时间戳,含义有attributes的第3位确定。
- key length:消息key的长度。
- key:消息的key。
- value length:消息value的长度。
- value: 消息的value。
MessageSet抽象类中定义了两个比较关键的方法:
/** Write the messages in this set to the given channel starting at the given offset byte.
* Less than the complete amount may be written, but no more than maxSize can be. The number
* of bytes written is returned
* 将当前MessageSet中的消息写入到Channel中
* */
def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int
/**
* Provides an iterator over the message/offset pairs in this set
* 提供迭代器,顺序读取MessageSet中的消息
*/
def iterator: Iterator[MessageAndOffset]
这两个方法说明MessageSet具有顺序写入消息和顺序读取的特性。后面介绍FileMessageSet和ByteBufferMessageSet时会说明这两个方法的实现。
分析FileMessageSet实现类
核心字段:
*file: java.io.File类型,指向磁盘上对应的日志文件。
*channel:FileChannel类型,用于读写对应的日志文件。
*start和end:MessageSet对象除了表示一个完整的日志文件,还可以表示日志文件分片(Slice),start和end分别表示分片的起始位置和结束位置。文件分配的相关概念可以找资料了解下。
- isSlice:Boolean类型,表示当前FileMessageSet是日志文件的分片。
- _size:FileMessageSet大小,单位是字节。如果FileMessageSet是日志文件的分片,就表示分片的大小(即end-start的值);如果不是分片,则表示整个日志文件的大小。因为会有多个Handler线程并发向一个分区写入消息,所有的_size是AtomicInteger类型。
FileMessageSet中有多个重载的构造方法,这里选择一个比较重要的构造方法来介绍。此构造方法会创建一个非分片的FileMessageSet对象。在Window NTFS文件系统或老版本的Linux文件系统上,进行文件的预分配会提高后续写操作的性能,为此FileMessageSet提供了preallocate的选项,决定是否开启预分配的功能。我们也可以通过FileMessageSet构造方法的mutable参数决定是否创建只读的FileMessageSet。
* Create a file message set with no slicing, and with initFileSize and preallocate.
* For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
* with one value (for example 512 * 1024 *1024 ) can improve the kafka produce performance.
* If it's new file and preallocate is true, end will be set to 0. Otherwise set to Int.MaxValue.
*/
def this(file: File, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean) =
this(file,
//如果使用preallocate进行预分配,end会初始化为零
channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate),
start = 0,
end = ( if ( !fileAlreadyExists && preallocate ) 0 else Int.MaxValue),
isSlice = false)
/**
* Open a channel for the given file
* For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
* with one value (for example 512 * 1025 *1024 ) can improve the kafka produce performance.
* @param file File path
* @param mutable mutable
* @param fileAlreadyExists File already exists or not
* @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024
* @param preallocate Pre allocate file or not, gotten from configuration.
* FileMessageSet.openChannel()方法的具体实现。
*/
def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = {
if (mutable) {//根据mutable参数创建的FileChannel是否可写
if (fileAlreadyExists)
new RandomAccessFile(file, "rw").getChannel()
else {
if (preallocate) {//进行文件预分配
val randomAccessFile = new RandomAccessFile(file, "rw")
randomAccessFile.setLength(initFileSize)
randomAccessFile.getChannel()
}
else
new RandomAccessFile(file, "rw").getChannel()//创建可读可写的FileChannel
}
}
else
new FileInputStream(file).getChannel()//创建只读的FileChannel
}
在FileMessageSet对象初始化的过程中,会移动FileChannel的position指针,原因是为了每次写入的消息都在日志文件的尾部,避免重启服务后的写入操作覆盖之前的操作。对应新创建的且进行了预分配空间的日志文件,其end会初始化为0,所以也是从文件的起始写入数据的。
/* if this is not a slice, update the file pointer to the end of the file
* 将position移动到最后一个字节,之后从此position开始写消息,这样防止重启后覆盖之前的操作
*
* */
if (!isSlice)
/* set the file position to the last byte in the file */
channel.position(math.min(channel.size.toInt, end))
介绍完FileMessageSet的构造过程,下面介绍其读写过程。FileMessageSet.append()方法实现了写日志文件的功能,其参数必须是ByteBufferMessageSet对象,下面是FileMessageSet.append()方法的代码:
/**
* Append these messages to the message set
*/
def append(messages: ByteBufferMessageSet) {
val written = messages.writeFullyTo(channel)//写文件
_size.getAndAdd(written)//修改FileMessageSet的大小
}
/** Write the messages in this set to the given channel
* 下面是 ByteBufferMessageSet.writeFullyTo()方法
* */
def writeFullyTo(channel: GatheringByteChannel): Int = {
buffer.mark()
var written = 0
while (written < sizeInBytes)//将ByteBufferMessageSet中的数据全部写入文件
written += channel.write(buffer)
buffer.reset()
written
}
查找指定消息的功能在FileMessageSet.searchFor()方法中实现。searchFor()方法的逻辑是从指定的startPosition开始逐条遍历FileMessageSet中的消息,并将每个消息的offset和targetOffset,最后返回查找到的offset。在遍历过程中不会将消息的key和value读取到内存,只是只读取LogOverhead(即offset和size),并通过size定位到下一条消息的开始位置。FileMessageSet.searchFor()方法代码如下:
/**
* Search forward for the file position of the last offset that is greater than or equal to the target offset
* and return its physical position. If no such offsets are found, return null.
* @param targetOffset The offset to search for.
* @param startingPosition The starting position in the file to begin searching from.
*/
def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
var position = startingPosition //起始位置
//创建用于读取 LogOverhead(即offset和size)的ByteBuffer(长度12)
val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
val size = sizeInBytes()//当前FileMessageSet的大小,单位是字节
//从position开始逐条消息遍历
while(position + MessageSet.LogOverhead < size) {
buffer.rewind()//重置ByteBuffer的position指针,准备读入数据
//读取LogOverhead。这里会确保startingPosition位于一个消息的开头,否则
//读取到的并不是 LogOverhead,这个条件的保证会在后面提到
channel.read(buffer, position)
if(buffer.hasRemaining)//未读取到12个字节的LogOverhead,抛出异常
throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s"
.format(targetOffset, startingPosition, file.getAbsolutePath))
buffer.rewind()//重置ByteBuffer的position指针,准备从ByteBuffer中读取数据
val offset = buffer.getLong()//读取消息的offset,8个字节
if(offset >= targetOffset)//判断是否符合退出条件
return OffsetPosition(offset, position)//得到消息的位置
val messageSize = buffer.getInt()//获取消息的size,4个字节
if(messageSize < Message.MinMessageOverhead)
throw new IllegalStateException("Invalid message size: " + messageSize)
//移动Position,准备读取下个消息
position += MessageSet.LogOverhead + messageSize
}
null//找不到offset大于等于targetOffset,则返回Null
}
FileMessageSet.writeTo()方法是将FileMessageSet中的数据写入指定的其他Channel中,这里先了解此方法的功能,具体实现会在后面介绍“零拷贝”的时候一起介绍。FileMessageSet.read*()方法是从FileMessageSet中读取数据,可以将FileMessageSet中的数据读入到别的ByteBuffer中返回,也可以按照指定位置和长度形成分片的FileMessageSet对象返回。FileMessageSet.delete()方法是将整个日志文件删除。
FileMessageSet还有一个truncateTo()方法,主要负责将日志文件截断到targetSize大小。此方法在后面介绍分区中Leader副本切换时还会提到。下面是truncateTo()方法的具体实现:
/**
* Truncate this file message set to the given size in bytes. Note that this API does no checking that the
* given size falls on a valid message boundary.
* In some versions of the JDK truncating to the same size as the file message set will cause an
* update of the files mtime, so truncate is only performed if the targetSize is smaller than the
* size of the underlying FileChannel.
* It is expected that no other threads will do writes to the log when this function is called.
* @param targetSize The size to truncate to. Must be between 0 and sizeInBytes.
* @return The number of bytes truncated off
*/
def truncateTo(targetSize: Int): Int = {
val originalSize = sizeInBytes
if(targetSize > originalSize || targetSize < 0)//检测targetSize的有效性
throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " +
" size of this log segment is " + originalSize + " bytes.")
if (targetSize < channel.size.toInt) {
channel.truncate(targetSize)//裁剪文件
channel.position(targetSize)//移动position
_size.set(targetSize)//修改_size
}
originalSize - targetSize//返回剪裁掉的字节数
}
FileMessageSet还实现了iterator()方法,返回一个迭代器。FileMessageSet迭代器读取消息的逻辑是:先读取消息的LogOverhead部分,然后按照size分配合适的ByteBuffer,再读取message data部分,最后将message data和offset封装成MessageOffset对象返回。迭代器的实现和searchFor()方法类似。
网友评论