
parseOffsetSpec 根据 time 配置解析出获取 offset 的方式,earliest、lastest或者指定的时间戳
然后通过 KafkaAdminClient.listOffsets 方法获取 offset,通过 getListOffsetsCalls 构建了获取 offset 的调用

getListOffsetsCalls 首先构建 leader 节点和 topic 的关系

然后构建了对于 leader broker 的 ListOffset 的调用

调用会被放到 AdminClientRunnable 做异步请求,在 processRequests 方法中被处理

生成 request 通过 NetworkClinet.send 发送到对应的节点

访问的 ApiKey 为 LIST_OFFSETS

调用 selector.send 根据目的地 id 打开对应的 channel 发送请求


对于请求的处理在 KafkaApis.handleListOffsetRequest 中,0.x和1.x以上版本有不同的处理方式

对于 0.x 版本,调用 ReplicaManager.legacyFetchOffsetsForTimestamp 获取 offset

调用 Partition.legacyFetchOffsetsForTimestamp,从 local log file 获取小于制定时间的全部offset,然后把 大于 highWatermark 的 offset 丢弃后返回结果

对于 1.x 及以上版本,,调用 ReplicaManager.fetchOffsetForTimestamp 获取 offset

最后调用 Partition.fetchOffsetForTimestamp,通过 logManager 获取对应 partition 的 offset

网友评论