spark2.0的structureStreaming在开启了checkpoint后会将偏移量记录到hdfs中,如下为sources的目录结构,此处研究sources下的子目录(此处为0)是如何产生的,为流平台管理kafka偏移量提供依据。
[hdfs@hadoop-datanode1 ~]$>hadoop fs -ls /user/hdfs/streaming/175/1/checkpoint/sources
Found 1 items
drwxr-xr-x - hdfs hadoop 0 2021-10-27 14:47 /user/hdfs/streaming/175/1/checkpoint/sources/0
通过本地断点调试的方式终于找到代码位置,如下,子目录的命名是通过计数器的方式产生。
org.apache.spark.sql.execution.streaming.MicroBatchExecution#logicalPlan

nextSourceId的初始值为0
计数器代表的是每个kafka节点。
sources的文件中记录的内容为第一次创建checkpoint文件时对应的kafka的偏移量,如下:
hadoop fs -cat /user/hdfs/streaming/153/2/checkpoint/sources/0/0
v1
{"in":{"0":666}}
网友评论