前面我们已经对Kafka的日志结构做了基本的讲解,相信大家也都有了一定的了解了。今天我们接着来讲kafka日志管理的部分,Kafka日志加载与恢复。
kafka在实例化Log对象时,Log会完成该分区目录下所有日志段的恢复操作,并将日志段加载到ConcurrentSkipListMap类型的segments集合中。ConcurrentSkipListMap具有跳跃表的功能,适用于高并发访问,多个线程可以安全地并发进行插入、删除、更新和访问操作。
Log恢复和加载日志段由Log.loadSegments()方法实现,具体逻辑如下:
1.检查分区目录
检查分区目录是否存在,若不存在则创建。
2.遍历分区目录下的文件
根据文件后缀名分别进行处理,若文件后缀为.delete或.cleaned,则直接删除该文件。后缀名是.delete表示该文件是需要被清除而还未执行删除操作,后缀名为.cleaned的文件表示是在日志清理操作第一阶段生成的临时文件。因此这两种类型的文件,在对日志段进行恢复操作时均直接删除。
若文件名后缀为.swap,则先去掉.swap后缀,然后再判断文件是偏移量索引文件还是日志文件。若是偏移量索引文件则直接删除该文件,若是日志文件则删除该日志文件对应的索引文件,同时将该文件添加到Set<File>类型的swapFiles集合中。
3.第二次遍历分区目录下的文件
根据文件后缀名分别进行处理,若是偏移量索引文件或时间戳索引文件,查找对应的日志文件是否存在,若日志文件不存在,则删除索引文件。
若是日志文件,则创建一个LogSegment对象,如果该日志文件对应的偏移量索引文件存在,则检查两个索引文件是否有效,若索引文件无效则删除两个索引文件,同时调用LogSegment.recover()方法重新创建索引文件,若偏移量索引文件不存在则直接调用LogSegment.recover()方法创建索引文件。recover()方法的核心思想就是读取日志文件,当累积字节数大于${index.interval.bytes}时插入一条索引记录,完成索引的重建。
4.遍历swapFiles集合
对.swap类型的文件进行处理,根据.swap文件名计算出基准偏移量,然后分别创建LogSegment对象并重建两个索引文件,查找以该swap段的基准偏移量开始与下一个日志段基准偏移量之间所有日志段文件,删除这些日志段对应的数据文件及其索引,然后去掉.swap后缀,并将该日志段加到Log对象的segments集合中。
5.创建与恢复日志段
若segments为空,则说明通过以上几步恢复操作没有得到任何有效的日志段,为了保证该Log对象至少有一个活跃段,需要创建一个日志段,即创建活跃段的数据文件及该日志段对应的两个索引文件。
若segments不为空,则调用Log.recoverLog()方法恢复日志段,通过日志段LogSegment的recover()方法重建两个索引文件,最后将活跃段的两个索引文件大小设置为${segment.index.bytes}。
关于大数据开发学习,Kafka日志加载与恢复,以上就为大家做了简单的介绍了。日志的加载与恢复,需要结合到具体的场景下去考虑,学习当中多理解,勤练习!
网友评论