Flink Source & Sink
在 Flink 中,Source 代表从外部获取数据源,Transformation 代表了对数据进行转换操作,Sink 代表将内部数据写到外部数据源。
一个 Flink Job 一般由 Source, Transformation, Sink 组成。
Flink Kafka Source
基本使用方式
Properties properties = new Properties();
properties.put("bootstrap.servers", "xxxxx");
properties.put("group.id", "test_consumer_zgh");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "5000");
FlinkKafkaConsumer010 consumer010 = new FlinkKafkaConsumer010(
"test_zgh_input",
new SimpleStringSchema(),
properties);
设置起始 offset
consumer010.setStartFromEarliest(); // 设置 offset 为最旧
consumer010.setStartFromGroupOffsets(); // 设置 offset 为 group offset
consumer010.setStartFromLatest(); // 设置 offset 为最新
consumer010.setStartFromSpecificOffsets(specificStartupOffsets); // 指定 offset
注意事项:
- setStartFromGroupOffsets,如果 group offset 不存在,或者 group offset 无效的话,将依据 "auto.offset.reset" 该属性来决定初始 offset。auto.offset.reset 默认为 largest。
- setStartFromSpecificOffsets,如果指定 offset 无效时,则将该 topic partition 的 offset 将设置为 group offset。
- 如果该作业是从 checkpoint 或 savepoint 中恢复,则所有设置初始 offset 的函数均将失效,初始 offset 将从 checkpoint 中恢复。
DeserializationSchema
如何将从 kafka 中获取的字节流转换为 Java Object,则通过 DeserializationSchema 来实现转换。其中 SimpleStringSchema 将 kafka 获取的字节流转换为字符串。
其中 KeyedDeserializationSchema 支持 Key, Value 反序列化。
Commit offset 策略
- 不使用 checkpoint 的话,由 enable.auto.commit, auto.commit.interval.ms 一起决定 commit 行为,每隔一段时间向 kafka commit 一次 offset。
- 使用 checkpoint 的话,kafka consumer 将 offset commit 到 checkpoint state 中。
Consumer 分配策略
task 消费 topic partition 的分配策略- 针对单个 topic 的 partitions,均采用 round-robin 策略,起始点取决于 topic hashCode 和 numParallelSubtasks。
- 不同 topic 之间的 startIndex 是随机的,故解决了多 topic 负载均衡问题。
- consumer 无法自动获取 topic 新增 partitions。
Consumer Metrics
Flink Job 默认会通过 consumer.metrics() 获取 metrics 之后,并将 metrics 注册到 Flink Source Kafka Metrics 上。因此,当需要定位 consumer 问题的时候,可以直接寻找该 metrics 来定位 kafka 相关问题。
Flink Kafka Sink
调用 DataStream 的 addSink(SinkFunction<T> sinkFunction) 函数来加入一个 FlinkKafkaProducer010。
需要注意的是,FlinkKafkaProducer010 默认是采用 FlinkFixedPartitioner
Properties produceConfig = new Properties();
produceConfig.put("bootstrap.servers", "xxxxxx");
wordcount.addSink(new FlinkKafkaProducer010<String>("test_zgh_output", new SimpleStringSchema(),
produceConfig));
FlinkFixedPartitioner 的分配策略
FlinkKafkaPartitioner注意,这个分配策略并不是委托给 kafka producer 的 partitioner.class 这个参数,而是在 Flink 内部自己计算出来。
if (internalProducer.flinkKafkaPartitioner == null) {
record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
} else {
record = new ProducerRecord<>(targetTopic, internalProducer.flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue);
}
使用默认的 FlinkFixedPartitioner 的分配策略会有两个坑:
- 当 Sink 的并发度低于 Topic 的 partition 个数时,一个 sink task 写一个 partition,会导致部分 partition 完全没有数据。
- 当 topic 的 partition 扩容时,则需要重启作业,以便发现新的 partition。
网友评论