消息的存储,消息的持久化
消息发送端发送消息到broker上以后,消息是如何持久化的呢?那么接下来去分析下消息的存储首先我们需要了解的是,kafka是使用日志文件的方式来保存生产者和发送者的消息,每条消息都有一个offset值来表示它在分区中的偏移量。Kafka中存储的一般都是海量的消息数据,为了避免日志文件过大,Log并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的命名规则是<topic_name>-<partition_id>
消息的文件存储机制
一个topic的多个partition在物理磁盘上的保存路径,路径保存在 /tmp/kafka-logs/topic-partition,包含日志文件、索引文件和时间索引文件
imagekafka是通过分段的方式将Log分为多个LogSegment,LogSegment是一个逻辑上的概念,一个LogSegment对应磁盘上的一个日志文件和一个索引文件,其中日志文件是用来记录消息的。索引文件是用来保存消息的索引。
segment的常用配置有:
#server.properties
#segment文件的大小,默认为 1G
log.segment.bytes=1024*1024*1024
#滚动生成新的segment文件的最大时长
log.roll.hours=24*7
#segment文件保留的最大时长,超时将被删除
log.retention.hours=24*7
那么这个LogSegment是什么呢?
LogSegment
假设kafka以partition为最小存储单位,那么我们可以想象当kafka producer不断发送消息,必然会引起partition文件的无线扩张,这样对于消息文件的维护以及被消费的消息的清理带来非常大的挑战,所以kafka 以segment为单位又把partition进行细分。每个partition相当于一个巨型文件被平均分配到多个大小相等的segment数据文件中(每个segment文件中的消息不一定相等),这种特性方便已经被消费的消息的清理,提高磁盘的利用率。
-
log.segment.bytes=107370 (设置分段大小),默认是1gb,我们把这个值调小以后,可以看到日志分段的效果
-
抽取其中3个分段来进行分析
imagesegment file由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀".index"和“.log”分别表示为segment索引文件、数据文件.
segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值进行递增。数值最大为64位long大小,20位数字字符长度,没有数字用0填充
查看segment文件命名规则
通过下面这条命令可以看到kafka消息日志的内容,注意grep必须加-a参数
grep -a 'logId' 00000000000000000000.log
假如第一个log文件的最后一个offset为:5376,所以下一个segment的文件命名为:
00000000000000005376.log。对应的index为00000000000000005376.index
segment中index和log的对应关系
从所有分段中,找一个分段进行分析
为了提高查找消息的性能,为每一个日志文件添加2个索引索引文件:OffsetIndex 和 TimeIndex,分别对应.index以及.timeindex, TimeIndex索引文件格式:它是映射时间戳和相对offset
查看索引内容:
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.index --print-data-log
展示如下
offset: 4561 position: 683858
offset: 4566 position: 688769
offset: 4573 position: 693871
offset: 4578 position: 700261
offset: 4583 position: 704568
offset: 4586 position: 714114
offset: 4591 position: 720123
offset: 4594 position: 727926
offset: 4601 position: 733000
offset: 4603 position: 742220
offset: 4605 position: 753894
offset: 4607 position: 764212
offset: 4609 position: 771133
offset: 4614 position: 776029
offset: 4617 position: 780633
offset: 4622 position: 785519
offset: 4628 position: 796098
offset: 4633 position: 1198989
offset: 4637 position: 1204712
index采用稀疏存储
的方式,它不会为每一条message都建立索引,而是每隔一定的字节数建立一条索引,避免索引文件占用过多的空间。缺点是没有建立索引的offset不能一次定位到message的位置,需要做一次顺序扫描,但是扫描的范围很小。
如图所示,.index文件中存储了索引以及物理偏移量(position),.log文件存储了消息的内容。
索引包含两个部分(均为4个字节的数字),分别为相对offset和position。相对offset表示segment文件中的offset,其实就是消息的唯一标识,同一个partition内的消息offset是唯一的,position表示在消息在.log文件中在数据文件中的位置,其实是消息真实的物理偏移地址。
Kafka采用整数值position记录单个分区的消费状态
,当消费成功broker收到确认,position指向下一个offset。 由于消息一定时间内不清除,那么可以重置offset来重新消费消息。
在partition中如何通过offset查找message
查找的算法是
- 根据offset的值,查找segment段中的index索引文件。由于索引文件命名是以上一个文件的最后一个offset+1进行命名的,所以,使用二分查找算法能够根据offset快速定位到指定的索引文件。
- 找到索引文件后,根据offset进行定位,找到索引文件中的符合范围的索引。(kafka采用稀疏索引的方式来提高查找性能)
- 得到position以后,再到对应的log文件中,从position出开始查找offset对应的消息,将每条消息的offset与目标offset进行比较,直到找到消息
比如说,我们要查找offset=2490这条消息,那么先找到00000000000000000000.index, 然后找到[2487,49111]这个索引,再到log文件中,根据49111这个position开始查找,比较每条消息的offset是否大于等于2490。最后查找到对应的消息以后返回
Log文件的消息内容分析
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log | grep position
前面我们通过kafka提供的命令,可以查看二进制的日志文件信息,一条消息,会包含很多的字段。
offset: 5371 position: 102124 CreateTime: 1531477349286 isvalid: true keysize:
-1 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1
sequence: -1 isTransactional: false headerKeys: [] payload: message_5371
offset和position这两个前面已经讲过了、 createTime表示创建时间、keysize和valuesize表示key和value的大小、 compresscodec表示压缩编码、payload:表示消息的具体内容
kafka提供的命令的参数
sh kafka-run-class.sh kafka.tools.DumpLogSegments
Parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.
Option Description
------ -----------
--deep-iteration if set, uses deep instead of shallow
iteration.
--files <String: file1, file2, ...> REQUIRED: The comma separated list of data
and index log files to be dumped.
--help Print usage information.
--index-sanity-check if set, just checks the index sanity
without printing its content. This is
the same check that is executed on
broker startup to determine if an index
needs rebuilding or not.
--key-decoder-class [String] if set, used to deserialize the keys. This
class should implement kafka.serializer.
Decoder trait. Custom jar should be
available in kafka/libs directory.
(default: kafka.serializer.StringDecoder)
--max-message-size <Integer: size> Size of largest message. (default: 5242880)
--offsets-decoder if set, log data will be parsed as offset
data from the __consumer_offsets topic.
--print-data-log if set, printing the messages content when
dumping data logs. Automatically set if
any decoder option is specified.
--transaction-log-decoder if set, log data will be parsed as
transaction metadata from the
__transaction_state topic.
--value-decoder-class [String] if set, used to deserialize the messages.
This class should implement kafka.
serializer.Decoder trait. Custom jar
should be available in kafka/libs
directory. (default: kafka.serializer.
StringDecoder)
--verify-index-only if set, just verify the index log without
printing its content.
日志的清除策略以及压缩策略
日志清除策略
前面提到过,日志的分段存储,一方面能够减少单个文件内容的大小,另一方面,方便kafka进行日志清理。日志的清理策略有两个:
- 根据消息的保留时间,当消息在kafka中保存的时间超过了指定的时间,就会触发清理过程
- 根据topic存储的数据大小,当topic所占的日志文件大小大于一定的阀值,则可以开始删除最旧的消息。kafka会启动一个后台线程,定期检查是否存在可以删除的消息
通过log.retention.bytes和log.retention.hours这两个参数来设置,当其中任意一个达到要求,都会执行删除。
默认的保留时间是:7天
日志压缩策略
Kafka还提供了“日志压缩(Log Compaction)”功能,通过这个功能可以有效的减少日志文件的大小,缓解磁盘紧张的情况,在很多实际场景中,消息的key和value的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心key对应的最新的value。因此,我们可以开启kafka的日志压缩功能,服务端会在后台启动启动Cleaner线程池,定期将相同的key进行合并,只保留最新的value值。日志的压缩原理是
image磁盘存储的性能问题
磁盘存储的性能优化
我们现在大部分企业仍然用的是机械结构的磁盘,如果把消息以随机的方式写入到磁盘,那么磁盘首先要做的就是寻址,也就是定位到数据所在的物理地址,在磁盘上就要找到对应的柱面、磁头以及对应的扇区;这个过程相对内存来说会消耗大量时间,为了规避随机读写带来的时间消耗,kafka采用顺序写的方式存储数据。即使是这样,但是频繁的I/O操作仍然会造成磁盘的性能瓶颈
零拷贝
消息从发送到落地保存,broker维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不动的通过socket发送给消费者。虽然这个操作描述起来很简单,但实际上经历了很多步骤。
操作系统将数据从磁盘读入到内核空间的页缓存:
▪ 应用程序将数据从内核空间读入到用户空间缓存中
▪ 应用程序将数据写回到内核空间到socket缓存中
▪ 操作系统将数据从socket缓冲区复制到网卡缓冲区,以便将数据经网络发出
通过“零拷贝”技术,可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数。现代的unix操作系统提供一个优化的代码路径,用于将数据从页缓存传输到socket;在Linux中,是通过sendfile系统调用来完成的。Java提供了访问这个系统调用的方法:FileChannel.transferTo API
使用sendfile,只需要一次拷贝就行,允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的
页缓存
页缓存是操作系统实现的一种主要的磁盘缓存,但凡设计到缓存的,基本都是为了提升i/o性能,所以页缓存是用来减少磁盘I/O操作的。
磁盘高速缓存有两个重要因素:
第一,访问磁盘的速度要远低于访问内存的速度,若从处理器L1和L2高速缓存访问则速度更快。
第二,数据一旦被访问,就很有可能短时间内再次访问。正是由于基于访问内存比磁盘快的多,所以磁盘的内存缓存将给系统存储性能带来质的飞越。
当 一 个进程准备读取磁盘上的文件内容时, 操作系统会先查看待读取的数据所在的页(page)是否在页缓存(pagecache)中,如果存在(命中)则直接返回数据, 从而避免了对物理磁盘的I/0操作;如果没有命中, 则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存, 之后再将数据返回给进程。
同样,如果 一 个进程需要将数据写入磁盘, 那么操作系统也会检测数据对应的页是否在页缓存中,如果不存在, 则会先在页缓存中添加相应的页, 最后将数据写入对应的页。 被修改过后的页也就变成了脏页, 操作系统会在合适的时间把脏页中的数据写入磁盘, 以保持数据的 一 致性
Kafka中大量使用了页缓存, 这是Kafka实现高吞吐的重要因素之 一 。 虽然消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务的, 但在Kafka中同样提供了同步刷盘及间断性强制刷盘(fsync),可以通过 log.flush.interval.messages 和 log.flush.interval.ms 参数来控制。
同步刷盘能够保证消息的可靠性,避免因为宕机导致页缓存数据还未完成同步时造成的数据丢失。但是实际使用上,我们没必要去考虑这样的因素以及这种问题带来的损失,消息可靠性可以由多副本来解决,同步刷盘会带来性能的影响。 刷盘的操作由操作系统去完成即可
网友评论