美文网首页数客联盟
Flink中notifyCheckpointComplete方法

Flink中notifyCheckpointComplete方法

作者: Woople | 来源:发表于2019-11-05 13:36 被阅读0次

定义

notifyCheckpointComplete方法在CheckpointListener接口中定义

/**
 * This interface must be implemented by functions/operations that want to receive
 * a commit notification once a checkpoint has been completely acknowledged by all
 * participants.
 */
@PublicEvolving
public interface CheckpointListener {

   /**
    * This method is called as a notification once a distributed checkpoint has been completed.
    * 
    * Note that any exception during this method will not cause the checkpoint to
    * fail any more.
    * 
    * @param checkpointId The ID of the checkpoint that has been completed.
    * @throws Exception
    */
   void notifyCheckpointComplete(long checkpointId) throws Exception;
}

简单说这个方法的含义就是在checkpoint做完之后,JobMaster会通知task执行这个方法,例如在FlinkKafkaProducernotifyCheckpointComplete中做了事务的提交。

样例

下面的程序会被分为两个task,task1是Source: Example Source和task2是Map -> Sink: Example Sink

DataStream<KafkaEvent> input = env.addSource(
      new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)
         .assignTimestampsAndWatermarks(new CustomWatermarkExtractor())).name("Example Source")
   .keyBy("word")
   .map(new MapFunction<KafkaEvent, KafkaEvent>() {
      @Override
      public KafkaEvent map(KafkaEvent value) throws Exception {
         value.setFrequency(value.getFrequency() + 1);
         return value;
      }
   });
input.addSink(
  new FlinkKafkaProducer<>(
    "bar",
    new KafkaSerializationSchemaImpl(),
    properties,
    FlinkKafkaProducer.Semantic.EXACTLY_ONCE)).name("Example Sink");

operator调用notifyCheckpointComplete

根据上面的例子,task1中只有一个source的operator,但是task2中有两个operator,分别是map和sink。

StreamTask中,调用task的notifyCheckpointComplete方法

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
   boolean success = false;
   synchronized (lock) {
      if (isRunning) {
         LOG.debug("Notification of complete checkpoint for task {}", getName());

         for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
            if (operator != null) {
               operator.notifyCheckpointComplete(checkpointId);
            }
         }

         success = true;
      }
      else {
         LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());
      }
   }
   if (success) {
      syncSavepointLatch.acknowledgeCheckpointAndTrigger(checkpointId, this::finishTask);
   }
}

其中关键的部分就是

for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
   if (operator != null) {
      operator.notifyCheckpointComplete(checkpointId);
   }
}

operator的调用顺序取决于allOperators变量,可以看到源码中的注释,operator是以逆序存放的

/**
 * Stores all operators on this chain in reverse order.
 */
private final StreamOperator<?>[] allOperators;

也就是说上面客户端的代码,虽然先调用了map后调用的sink,但是实际执行的时候,确实先调用sink的notifyCheckpointComplete方法,后调用map的。

对Exactly-Once语义的影响

注:以下讨论的都是基于kafka source和sink
上面的例子,是先执行source的notifyCheckpointComplete方法,再执行sink的notifyCheckpointComplete方法。但是如果把.keyBy("word")去掉,那么只会有一个task,所有operator逆序执行,也就是先调用sink的notifyCheckpointComplete方法再调用source的。
为了方便理解整个流程,下文只考察并发度为1的情况,不考虑部分subtask成功部分不成功的情况。具体分析之前要先搞清楚FlinkKafkaProducerFlinkKafkaConsumernotifyCheckpointComplete方法都做了什么事情,请先阅读Flink kafka source源码解析
Flink kafka sink源码解析

先sink后source

sink成功之后source执行之前 sink成功之前
checkpoint恢复 exactly-once 丢数据
__consumer_offsets恢复 重复消费 exactly-once

sink成功之后source执行之前,表示sink的notifyCheckpointComplete方法执行成功了,但是在执行source的notifyCheckpointComplete方法之前任务失败。
sink成功之前,表示sink的notifyCheckpointComplete方法执行失败,提交事务失败。

测试用例

测试代码主体架构如下:

DataStream<KafkaEvent> input = env.addSource(
      new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)
         .assignTimestampsAndWatermarks(new CustomWatermarkExtractor())).name("Example Source")
   .map(new MapFunction<KafkaEvent, KafkaEvent>() {
      @Override
      public KafkaEvent map(KafkaEvent value) throws Exception {
         value.setFrequency(value.getFrequency() + 1);
         return value;
      }
   });
input.addSink(
   new FlinkKafkaProducer<>(
      "bar",
      new KafkaSerializationSchemaImpl(),
         properties,
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE)).name("Example Sink");

测试环境采用的是flink 1.9.0 Standalone Cluster模式,一个JobManager,一个TaskManager,默认只保存一个checkpoint。
模拟异常的方法,通过kill -9杀掉JobManager和TaskManager进程。

  1. FlinkKafkaProducer#commit方法第一行设置断点,当程序走到这个断点的时候kill -9杀掉JobManager和TaskManager进程,模拟sink的notifyCheckpointComplete方法执行失败的场景;
  2. 监控1,通过bin/kafka-console-consumer.sh --topic bar --bootstrap-server 10.1.236.66:9092监控producer是否flush数据;监控2,通过bin/kafka-console-consumer.sh --topic bar --bootstrap-server 10.1.236.66:9092 --isolation-level read_committed监控producer的事务是否成功提交;监控3,通过bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server 10.1.236.66:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config /tmp/Consumer.properties监控consumer的offset是否提交到kafka
  3. 发送数据一条数据a,5,1572845161023,当走到断点的时候,说明consumer的checkpoint已经生成,但是还没有将offset提交到kafka,也就是checkpoint认为offset已经成功发送,但是kafka认为并没有发送,监控1有数据,监控2监控3都没有数据。kill -9杀掉JobManager和TaskManager进程
  4. 重新启动,并提交作业,不指定checkpoint路径。监控1,2,3,都有数据,所以这种情况,监控2,只收到了一次数据,也就是exactly-once。这时候监控3收到的数据为:partition0的offset=37,partition1的offset=43,partition2的offset=39
  5. 同样1-3步骤,发送数据一条数据b,6,1572845161023,第4步,启动作业的时候通过-s指定要恢复的checkpoint路径,启动后监控1,2都没有数据,但是监控3的数据为:partition0的offset=37,partition1的offset=43,partition2的offset=40,再查看task的日志FlinkKafkaConsumerBase - Consumer subtask 0 restored state: {KafkaTopicPartition{topic='foo', partition=0}=36, KafkaTopicPartition{topic='foo', partition=1}=42, KafkaTopicPartition{topic='foo', partition=2}=39}.,说明checkpoint认为上一次partition2的offset=39已经成功消费,所以恢复之后向kafka发送的offset为40。这样就导致了partition2的offset=39这条数据丢失

同样的方法可以测试sink成功之后source执行之前的场景,只是这时候需要将断点设置在TwoPhaseCommitSinkFunction#notifyCheckpointComplete方法的最后一行,这样就会发现故障之前,监控1,2都是有数据的,监控3没有数据。不指定checkpoint路径恢复,监控1,2都会收到数据,这样就导致了重复消费。如果指定checkpoint路径消费,那么监控1,2就不会收到数据,保证了exactly-once

原因分析

产生上面情况的原因主要就是因为checkpoint存储的offset和kafka中的offset不一致导致的。

先source后sink

需要说明的一点这个场景的两个task实际是并行的,并没有绝对的先后关系,只是会有这种前后关系的可能。

source成功之后sink执行之前 source成功之前
checkpoint恢复 丢数据 丢数据
__consumer_offsets恢复 丢数据 exactly-once

测试用例

模拟source成功之后sink执行之前

  1. 需要在上面的用例中加入keyby算子,确保生成两个task,监控3收到数据的时候说明consumer的notifyCheckpointComplete方法已经执行完。在FlinkKafkaProducer#commit方法第一行设置断点,当程序走到这个断点并且监控3收到数据的时候,kill -9杀掉JobManager和TaskManager进程,模拟sink执行notifyCheckpointComplete方法失败的场景;
  2. 这时候重启作业,checkpoint和kafka中offset已经是一致的了,无论是从checkpoint还是kafka,都是一样的。所以source认为已经成功消费了,不会再读上次的offset,都会导致数据丢失。

source成功之前
对于在source之前程序就挂掉,相当于所有的operator都没有执行notifyCheckpointComplete方法,但是source的checkpoint已经做过了,只是没有将offset发送到kafka,这样只有从__consumer_offsets恢复才能保证不丢数据。

总结

本文通过一种极端的测试场景希望让读者可以更深入的理解flink中的Exactly-Once语义。在程序挂了以后需要排查是什么原因和什么阶段导致的,才能通过合适的方式恢复作业。在实际的生产环境中,会有重试或者更多的方式保证高可用,也建议保留多个checkpoint,以便业务上可以恢复正确的数据。
注:本文基于flink 1.9.0和kafka 2.3

参考

Flink checkpoint流程源码分析

相关文章

网友评论

    本文标题:Flink中notifyCheckpointComplete方法

    本文链接:https://www.haomeiwen.com/subject/xfirbctx.html