指定offset消费
消费模式
在flink的kafka source中有以下5种模式指定offset消费
public enum StartupMode {
/** Start from committed offsets in ZK / Kafka brokers of a specific consumer group (default). */
GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),
/** Start from the earliest offset possible. */
EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),
/** Start from the latest offset. */
LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET),
/**
* Start from user-supplied timestamp for each partition.
* Since this mode will have specific offsets to start with, we do not need a sentinel value;
* using Long.MIN_VALUE as a placeholder.
*/
TIMESTAMP(Long.MIN_VALUE),
/**
* Start from user-supplied specific offsets for each partition.
* Since this mode will have specific offsets to start with, we do not need a sentinel value;
* using Long.MIN_VALUE as a placeholder.
*/
SPECIFIC_OFFSETS(Long.MIN_VALUE);
}
默认为GROUP_OFFSETS,表示根据上一次group id提交的offset位置开始消费。每个枚举的值其实是一个long型的负数,根据不同的模式,在每个partition初始化的时候会默认将offset设置为这个负数。其他的方式和kafka本身的语义类似,就不在赘述。
指定offset
本文只讨论默认的GROUP_OFFSETS方式,下文所有分析都是基于这种模式。但是还是需要区分是否开启了checkpoint。在开始分析之前需要对几个重要的变量进行说明:
-
subscribedPartitionsToStartOffsets
-
所属类:
FlinkKafkaConsumerBase.java
-
定义:
/** The set of topic partitions that the source will read, with their initial offsets to start reading from. */ private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;
-
说明:保存订阅topic的所有partition以及初始消费的offset
-
-
subscribedPartitionStates
-
所属类:
AbstractFetcher.java
-
定义:
/** All partitions (and their state) that this fetcher is subscribed to. */ private final List<KafkaTopicPartitionState<KPH>> subscribedPartitionStates;
-
说明:保存了所有订阅的partition的offset等详细信息,例如
-
/** The offset within the Kafka partition that we already processed. */ private volatile long offset; /** The offset of the Kafka partition that has been committed. */ private volatile long committedOffset;
每次消费完数据之后都会更新这些值,这个变量非常的重要,在做checkpoint的时候,保存的offset等信息都是来自于这个变量。这个变量的初始化如下:
// initialize subscribed partition states with seed partitions this.subscribedPartitionStates = createPartitionStateHolders( seedPartitionsWithInitialOffsets, timestampWatermarkMode, watermarksPeriodic, watermarksPunctuated, userCodeClassLoader);
消费之后更新相应的offset主要在
KafkaFetcher#runFetchLoop
方法while循环中调用emitRecord(value, partition, record.offset(), record);
-
-
restoredState
-
所属类:
FlinkKafkaConsumerBase.java
-
定义:
/** * The offsets to restore to, if the consumer restores state from a checkpoint. * * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} method. * * <p>Using a sorted map as the ordering is important when using restored state * to seed the partition discoverer. */ private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;
-
说明:如果指定了恢复的checkpoint路径,启动时候将会读取这个变量里面的内容获取起始offset,而不再是使用StartupMode中的枚举值作为初始的offset
-
-
unionOffsetStates
-
所属类:
FlinkKafkaConsumerBase.java
-
定义:
/** Accessor for state in the operator state backend. */ private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
-
说明:保存了checkpoint要持久化存储的内容,例如每个partition已经消费的offset等信息
-
非checkpoint模式
在没有开启checkpoint的时候,消费kafka中的数据,其实就是完全依靠kafka自身的机制进行消费。
checkpoint模式
开启checkpoint模式以后,会将offset等信息持久化存储以便恢复时使用。但是作业重启以后如果由于某种原因读不到checkpoint的结果,例如checkpoint文件丢失或者没有指定恢复路径等。
第一种情况,如果读取不到checkpoint的内容
subscribedPartitionsToStartOffsets
会初始化所有partition的起始offset为-915623761773L
这个值就表示了当前为GROUP_OFFSETS
模式。
default:
for (KafkaTopicPartition seedPartition : allPartitions) {
subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
}
第一次消费之前,指定读取offset位置的关键方法是KafkaConsumerThread#reassignPartitions
代码片段如下:
for (KafkaTopicPartitionState<TopicPartition> newPartitionState : newPartitions) {
if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
consumerTmp.seekToBeginning(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
} else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
consumerTmp.seekToEnd(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
} else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
// the KafkaConsumer by default will automatically seek the consumer position
// to the committed group offset, so we do not need to do it.
newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
} else {
consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);
}
}
因为是GROUP_OFFSET模式 ,所以会调用newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
需要说明的是,在state里面需要存储的是成功消费的最后一条数据的offset,但是通过position
这个方法返回的是下一次应该消费的起始offset,所以需要减1。这里更新这个的目的是为了checkpoint的时候可以正确的拿到offset。
这种情况由于读取不到上次checkpoint的结果,所以依旧是依靠kafka自身的机制,即根据__consumer_offsets
记录的内容消费。
第二种情况,checkpoint可以读取到
这种情况下,subscribedPartitionsToStartOffsets
初始的offset就是具体从checkpoint中恢复的内容,这样KafkaConsumerThread#reassignPartitions
实际走的分支就是
consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);
这里加1的原理同上,state保存的是最后一次成功消费数据的offset,所以加1才是现在需要开始消费的offset。
总结
本文介绍了程序启动时,如何确定从哪个offset开始消费,这也是《Flink kafka source源码解析》系列的最后一篇。后续会继续分析flink kafka sink的相关源码以及flink结合kafka端到端的EXACTLY_ONCE是如何实现的。
注:本文基于flink 1.9.0和kafka 2.3
网友评论