美文网首页
Flink Kafka Connector 详解

Flink Kafka Connector 详解

作者: data4 | 来源:发表于2018-03-10 13:58 被阅读2667次

    Flink Source & Sink

    在 Flink 中,Source 代表从外部获取数据源,Transformation 代表了对数据进行转换操作,Sink 代表将内部数据写到外部数据源。
    一个 Flink Job 一般由 Source, Transformation, Sink 组成。

    Flink Kafka Source

    基本使用方式

    Properties properties = new Properties();
    properties.put("bootstrap.servers", "xxxxx");
    properties.put("group.id", "test_consumer_zgh");
    properties.put("enable.auto.commit", "true");
    properties.put("auto.commit.interval.ms", "5000");
    FlinkKafkaConsumer010 consumer010 = new FlinkKafkaConsumer010(
        "test_zgh_input",
        new SimpleStringSchema(),
        properties);
    

    设置起始 offset

    consumer010.setStartFromEarliest();   // 设置 offset 为最旧
    consumer010.setStartFromGroupOffsets();  // 设置 offset 为 group offset
    consumer010.setStartFromLatest();  // 设置 offset 为最新
    consumer010.setStartFromSpecificOffsets(specificStartupOffsets);  // 指定 offset
    

    注意事项:

    • setStartFromGroupOffsets,如果 group offset 不存在,或者 group offset 无效的话,将依据 "auto.offset.reset" 该属性来决定初始 offset。auto.offset.reset 默认为 largest。
    • setStartFromSpecificOffsets,如果指定 offset 无效时,则将该 topic partition 的 offset 将设置为 group offset。
    • 如果该作业是从 checkpoint 或 savepoint 中恢复,则所有设置初始 offset 的函数均将失效,初始 offset 将从 checkpoint 中恢复。

    DeserializationSchema

    如何将从 kafka 中获取的字节流转换为 Java Object,则通过 DeserializationSchema 来实现转换。其中 SimpleStringSchema 将 kafka 获取的字节流转换为字符串。
    其中 KeyedDeserializationSchema 支持 Key, Value 反序列化。

    Commit offset 策略

    • 不使用 checkpoint 的话,由 enable.auto.commit, auto.commit.interval.ms 一起决定 commit 行为,每隔一段时间向 kafka commit 一次 offset。
    • 使用 checkpoint 的话,kafka consumer 将 offset commit 到 checkpoint state 中。

    Consumer 分配策略

    task 消费 topic partition 的分配策略
    • 针对单个 topic 的 partitions,均采用 round-robin 策略,起始点取决于 topic hashCode 和 numParallelSubtasks。
    • 不同 topic 之间的 startIndex 是随机的,故解决了多 topic 负载均衡问题。
    • consumer 无法自动获取 topic 新增 partitions。

    Consumer Metrics

    Flink Job 默认会通过 consumer.metrics() 获取 metrics 之后,并将 metrics 注册到 Flink Source Kafka Metrics 上。因此,当需要定位 consumer 问题的时候,可以直接寻找该 metrics 来定位 kafka 相关问题。

    Flink Kafka Sink

    调用 DataStream 的 addSink(SinkFunction<T> sinkFunction) 函数来加入一个 FlinkKafkaProducer010。
    需要注意的是,FlinkKafkaProducer010 默认是采用 FlinkFixedPartitioner

    Properties produceConfig = new Properties();
    produceConfig.put("bootstrap.servers", "xxxxxx");
    wordcount.addSink(new FlinkKafkaProducer010<String>("test_zgh_output", new SimpleStringSchema(),
        produceConfig));
    

    FlinkFixedPartitioner 的分配策略

    FlinkKafkaPartitioner

    注意,这个分配策略并不是委托给 kafka producer 的 partitioner.class 这个参数,而是在 Flink 内部自己计算出来。

    if (internalProducer.flinkKafkaPartitioner == null) {
         record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
    } else {
        record = new ProducerRecord<>(targetTopic, internalProducer.flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue);
    }
    

    使用默认的 FlinkFixedPartitioner 的分配策略会有两个坑:

    • 当 Sink 的并发度低于 Topic 的 partition 个数时,一个 sink task 写一个 partition,会导致部分 partition 完全没有数据。
    • 当 topic 的 partition 扩容时,则需要重启作业,以便发现新的 partition。

    相关文章

      网友评论

          本文标题:Flink Kafka Connector 详解

          本文链接:https://www.haomeiwen.com/subject/yhbkfftx.html