出现问题的日志
kafka版本:0.10.2.1

__consumer_offsets日志清除流程
以清除__consumer_offsets-45分区为例:

问题出现的原因就是在判断偏移量是否超出最大值的时候出错
超出最大值的原因是在清理日志文件分组的时候,通过索引(index)文件中的索引相减判断是否大于Integer.MAX_VALUE,而在清除过程中通过log中文件中实际的偏移量相减来确认
如:清理45的时候,segment文件如下:

00000000000000000000.log和00000000002147462149.log的索引文件都为0,所以分为一个组,
而实际上00000000002147462149.log最大偏移量-0已经超出最大值所以在清理的时候报错。
目前分组的参数只有log.config.segmentSize, log.config.maxIndexSize无法进行更改,而且log-cleaner只有在kafka启动的时候才会重启,无其他启动方法。
解决方法只有升级kafka,目前看2.0中分组的代码是通过实际的偏移量来进行,不会出现这个问题
00000000002147462149.log文件内容:

源码分析(对应上面流程)
1、首先确认异常代码出现的位置

canConvert的方法实现是:

所以,结论和网上说的一样,offset-baseOffset大于了Integer的最大值
2、为什么会大于呢?
需要找下baseOffset和offset的的值是怎么计算出来的
baseOffset的查找:
baseOffset是场景LogSegment的时候构造参数

往上追发现创建方法如下,baseOffset为segments第一个的baseOffset

segments说明:segments是个集合,集合里面是各个分区的segment,比如下图中,

00000000000000000000.log和00000000002147462149.log都是segment(包含index,和timeIndex文件)
lastOffset的查找:
往上追代码发现是这样获取的,可以理解为log文件offset及segments里面最后一个segment的最大偏移量

所以,需要知道segments是怎么来的,追代码发现这段代码:

意思是:将分区下所有需要清除的日志文件分组,进行然后进行合并清除,分组代码如下:

最坑的一句话如图,通过判断索引文件index里的偏移量来,确定要不要合成一个组。查看我们的问题分区日志:

00000000000000000000.log和00000000002147462149.log的index文件都为0,所以会将00000000000000000000.log和00000000002147462149.log都当作一个分组来进行清除
解析查看00000000002147462149.log的内容发现,最大的log已经超出了最大值:

所以会抛出移除,导致log-cleaner线程挂掉
结论:
在进行合并清除判断的时候,通过两个文件中index中记录的偏移量来判断是否大于Integer的最大值,但是在创建.clean文件的时候,通过两个文件的实际位移来判断偏移量是否大于Integer的最大值,所以抛出异常
解决方法:
1、想办法在分组的时候不再合并一个组,但是分组的参数中只有segmentsSize和maxIndexSize的判断,所以没有办法来通过配置参数分组(segmentsSize和maxIndexSize会对实际的文件有影响)

2、是否可以不重启kafka进行log线程的启动呢?答案是不行的,源码中没有单独的启动logger的线程,所以只能通过重启
目前看来解决方法只有一个:升级kafka的版本,来看下2.0kafka版本分组的实现,2.0中会直接通过log中的偏移量来判断是否要分为一个组,所以不会出现log-cleaner线程挂掉的情况:

网友评论