美文网首页数客联盟
Flink kafka source源码解析(二)

Flink kafka source源码解析(二)

作者: Woople | 来源:发表于2019-11-01 15:06 被阅读0次

    offset提交模式(非checkpoint)

    消费kafka topic最为重要的部分就是对offset的管理,对于kafka提交offset的机制,可以参考kafka官方网

    而在flink kafka source中offset的提交模式有3种:

    public enum OffsetCommitMode {
    
       /** Completely disable offset committing. */
       DISABLED,
    
       /** Commit offsets back to Kafka only when checkpoints are completed. */
       ON_CHECKPOINTS,
    
       /** Commit offsets periodically back to Kafka, using the auto commit functionality of internal Kafka clients. */
       KAFKA_PERIODIC;
    }
    

    初始化offsetCommitMode

    FlinkKafkaConsumerBase#open方法中初始化offsetCommitMode

    // determine the offset commit mode
    this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
                    getIsAutoCommitEnabled(),
                    enableCommitOnCheckpoints,
            ((StreamingRuntimeContext)getRuntimeContext()).isCheckpointingEnabled());
    
    • 方法getIsAutoCommitEnabled()的实现如下:

      protected boolean getIsAutoCommitEnabled() {
         return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
            PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
      }
      

      也就是说只有enable.auto.commit=true并且auto.commit.interval.ms>0这个方法才会返回true

    • 变量enableCommitOnCheckpoints默认是true,可以调用setCommitOffsetsOnCheckpoints改变这个值

    • 当代码中调用了env.enableCheckpointing方法,isCheckpointingEnabled才会返回true

    通过下面的代码返回真正的提交模式

    /**
     * Determine the offset commit mode using several configuration values.
     *
     * @param enableAutoCommit whether or not auto committing is enabled in the provided Kafka properties.
     * @param enableCommitOnCheckpoint whether or not committing on checkpoints is enabled.
     * @param enableCheckpointing whether or not checkpoint is enabled for the consumer.
     *
     * @return the offset commit mode to use, based on the configuration values.
     */
    public static OffsetCommitMode fromConfiguration(
          boolean enableAutoCommit,
          boolean enableCommitOnCheckpoint,
          boolean enableCheckpointing) {
    
       if (enableCheckpointing) {
          // if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
          return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
       } else {
          // else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
          return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
       }
    }
    

    本文暂时不考虑checkpoint的场景,所以只考虑(enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;

    也就是如果客户端设置了enable.auto.commit=true那么就是KAFKA_PERIODIC,否则就是DISABLED

    offset的提交

    自动提交

    这种方式完全依靠kafka自身的特性进行提交,如下方式指定参数即可

    Properties properties = new Properties();
    properties.put("enable.auto.commit", "true");
    properties.setProperty("auto.commit.interval.ms", "1000");
    new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)
    

    非自动提交

    通过上面的分析,如果enable.auto.commit=false,那么offsetCommitMode就是DISABLED
    kafka官方文档中,提到当enable.auto.commit=false时候需要手动提交offset,也就是需要调用consumer.commitSync();方法提交。
    但是在flink中,非checkpoint模式下,不会调用consumer.commitSync();,一旦关闭自动提交,意味着kafka不知道当前的consumer group每次消费到了哪。
    可以从两方面证实这个问题:

    • 源码
      KafkaConsumerThread#run方法中是有consumer.commitSync();,但是只有当commitOffsetsAndCallback != null的时候才会调用。只有开启了checkpoint功能才会不为null,这个变量会在后续的文章中详细分析。

    • 测试

      1. 可以通过消费__consumer_offsets观察是否有offset的提交
      2. 重启程序,还是会重复消费之前消费过的数据

    总结

    本文介绍了在非checkpoint模式下,flink kafka source提交offset的方式,后续会重点介绍checkpoint模式下提交offset的流程。

    注:本文基于flink 1.9.0和kafka 2.3

    参考

    Flink kafka source源码解析(一)

    相关文章

      网友评论

        本文标题:Flink kafka source源码解析(二)

        本文链接:https://www.haomeiwen.com/subject/zxnfbctx.html