定义
notifyCheckpointComplete
方法在CheckpointListener
接口中定义
/**
* This interface must be implemented by functions/operations that want to receive
* a commit notification once a checkpoint has been completely acknowledged by all
* participants.
*/
@PublicEvolving
public interface CheckpointListener {
/**
* This method is called as a notification once a distributed checkpoint has been completed.
*
* Note that any exception during this method will not cause the checkpoint to
* fail any more.
*
* @param checkpointId The ID of the checkpoint that has been completed.
* @throws Exception
*/
void notifyCheckpointComplete(long checkpointId) throws Exception;
}
简单说这个方法的含义就是在checkpoint做完之后,JobMaster会通知task执行这个方法,例如在FlinkKafkaProducer
中notifyCheckpointComplete
中做了事务的提交。
样例
下面的程序会被分为两个task,task1是Source: Example Source
和task2是Map -> Sink: Example Sink
DataStream<KafkaEvent> input = env.addSource(
new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor())).name("Example Source")
.keyBy("word")
.map(new MapFunction<KafkaEvent, KafkaEvent>() {
@Override
public KafkaEvent map(KafkaEvent value) throws Exception {
value.setFrequency(value.getFrequency() + 1);
return value;
}
});
input.addSink(
new FlinkKafkaProducer<>(
"bar",
new KafkaSerializationSchemaImpl(),
properties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE)).name("Example Sink");
operator调用notifyCheckpointComplete
根据上面的例子,task1中只有一个source的operator,但是task2中有两个operator,分别是map和sink。
在StreamTask
中,调用task的notifyCheckpointComplete
方法
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
boolean success = false;
synchronized (lock) {
if (isRunning) {
LOG.debug("Notification of complete checkpoint for task {}", getName());
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
if (operator != null) {
operator.notifyCheckpointComplete(checkpointId);
}
}
success = true;
}
else {
LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());
}
}
if (success) {
syncSavepointLatch.acknowledgeCheckpointAndTrigger(checkpointId, this::finishTask);
}
}
其中关键的部分就是
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
if (operator != null) {
operator.notifyCheckpointComplete(checkpointId);
}
}
operator的调用顺序取决于allOperators
变量,可以看到源码中的注释,operator是以逆序存放的
/**
* Stores all operators on this chain in reverse order.
*/
private final StreamOperator<?>[] allOperators;
也就是说上面客户端的代码,虽然先调用了map后调用的sink,但是实际执行的时候,确实先调用sink的notifyCheckpointComplete
方法,后调用map的。
对Exactly-Once语义的影响
注:以下讨论的都是基于kafka source和sink
上面的例子,是先执行source的notifyCheckpointComplete
方法,再执行sink的notifyCheckpointComplete
方法。但是如果把.keyBy("word")
去掉,那么只会有一个task,所有operator逆序执行,也就是先调用sink的notifyCheckpointComplete
方法再调用source的。
为了方便理解整个流程,下文只考察并发度为1的情况,不考虑部分subtask成功部分不成功的情况。具体分析之前要先搞清楚FlinkKafkaProducer
和FlinkKafkaConsumer
的notifyCheckpointComplete
方法都做了什么事情,请先阅读Flink kafka source源码解析和
Flink kafka sink源码解析。
先sink后source
sink成功之后source执行之前 | sink成功之前 | |
---|---|---|
checkpoint恢复 | exactly-once | 丢数据 |
__consumer_offsets 恢复 |
重复消费 | exactly-once |
sink成功之后source执行之前,表示sink的notifyCheckpointComplete
方法执行成功了,但是在执行source的notifyCheckpointComplete
方法之前任务失败。
sink成功之前,表示sink的notifyCheckpointComplete
方法执行失败,提交事务失败。
测试用例
测试代码主体架构如下:
DataStream<KafkaEvent> input = env.addSource(
new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor())).name("Example Source")
.map(new MapFunction<KafkaEvent, KafkaEvent>() {
@Override
public KafkaEvent map(KafkaEvent value) throws Exception {
value.setFrequency(value.getFrequency() + 1);
return value;
}
});
input.addSink(
new FlinkKafkaProducer<>(
"bar",
new KafkaSerializationSchemaImpl(),
properties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE)).name("Example Sink");
测试环境采用的是flink 1.9.0 Standalone Cluster模式,一个JobManager,一个TaskManager,默认只保存一个checkpoint。
模拟异常的方法,通过kill -9
杀掉JobManager和TaskManager进程。
- 在
FlinkKafkaProducer#commit
方法第一行设置断点,当程序走到这个断点的时候kill -9
杀掉JobManager和TaskManager进程,模拟sink的notifyCheckpointComplete方法执行失败的场景; -
监控1,通过
bin/kafka-console-consumer.sh --topic bar --bootstrap-server 10.1.236.66:9092
监控producer是否flush数据;监控2,通过bin/kafka-console-consumer.sh --topic bar --bootstrap-server 10.1.236.66:9092 --isolation-level read_committed
监控producer的事务是否成功提交;监控3,通过bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server 10.1.236.66:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config /tmp/Consumer.properties
监控consumer的offset是否提交到kafka - 发送数据一条数据
a,5,1572845161023
,当走到断点的时候,说明consumer的checkpoint已经生成,但是还没有将offset提交到kafka,也就是checkpoint认为offset已经成功发送,但是kafka认为并没有发送,监控1有数据,监控2和监控3都没有数据。kill -9
杀掉JobManager和TaskManager进程 - 重新启动,并提交作业,不指定checkpoint路径。监控1,2,3,都有数据,所以这种情况,监控2,只收到了一次数据,也就是exactly-once。这时候监控3收到的数据为:partition0的offset=37,partition1的offset=43,partition2的offset=39
- 同样1-3步骤,发送数据一条数据
b,6,1572845161023
,第4步,启动作业的时候通过-s
指定要恢复的checkpoint路径,启动后监控1,2都没有数据,但是监控3的数据为:partition0的offset=37,partition1的offset=43,partition2的offset=40,再查看task的日志FlinkKafkaConsumerBase - Consumer subtask 0 restored state: {KafkaTopicPartition{topic='foo', partition=0}=36, KafkaTopicPartition{topic='foo', partition=1}=42, KafkaTopicPartition{topic='foo', partition=2}=39}.
,说明checkpoint认为上一次partition2的offset=39已经成功消费,所以恢复之后向kafka发送的offset为40。这样就导致了partition2的offset=39这条数据丢失。
同样的方法可以测试sink成功之后source执行之前的场景,只是这时候需要将断点设置在TwoPhaseCommitSinkFunction#notifyCheckpointComplete
方法的最后一行,这样就会发现故障之前,监控1,2都是有数据的,监控3没有数据。不指定checkpoint路径恢复,监控1,2都会收到数据,这样就导致了重复消费。如果指定checkpoint路径消费,那么监控1,2就不会收到数据,保证了exactly-once。
原因分析
产生上面情况的原因主要就是因为checkpoint存储的offset和kafka中的offset不一致导致的。
先source后sink
需要说明的一点这个场景的两个task实际是并行的,并没有绝对的先后关系,只是会有这种前后关系的可能。
source成功之后sink执行之前 | source成功之前 | |
---|---|---|
checkpoint恢复 | 丢数据 | 丢数据 |
__consumer_offsets 恢复 |
丢数据 | exactly-once |
测试用例
模拟source成功之后sink执行之前
- 需要在上面的用例中加入keyby算子,确保生成两个task,监控3收到数据的时候说明consumer的
notifyCheckpointComplete
方法已经执行完。在FlinkKafkaProducer#commit
方法第一行设置断点,当程序走到这个断点并且监控3收到数据的时候,kill -9
杀掉JobManager和TaskManager进程,模拟sink执行notifyCheckpointComplete方法失败的场景; - 这时候重启作业,checkpoint和kafka中offset已经是一致的了,无论是从checkpoint还是kafka,都是一样的。所以source认为已经成功消费了,不会再读上次的offset,都会导致数据丢失。
source成功之前
对于在source之前程序就挂掉,相当于所有的operator都没有执行notifyCheckpointComplete
方法,但是source的checkpoint已经做过了,只是没有将offset发送到kafka,这样只有从__consumer_offsets
恢复才能保证不丢数据。
总结
本文通过一种极端的测试场景希望让读者可以更深入的理解flink中的Exactly-Once语义。在程序挂了以后需要排查是什么原因和什么阶段导致的,才能通过合适的方式恢复作业。在实际的生产环境中,会有重试或者更多的方式保证高可用,也建议保留多个checkpoint,以便业务上可以恢复正确的数据。
注:本文基于flink 1.9.0和kafka 2.3
网友评论