此连接器可访问由Apache Kafka提供的事件流。
Flink提供特殊的Kafka连接器,用于从/到Kafka主题读取和写入数据。 Flink Kafka Consumer集成了Flink的检查点机制,以提供一次性处理语义。为了达到这个目的,Flink并不完全依靠Kafka的消费者群体偏移跟踪,而是跟踪和检查点内部的偏移。
请为您的用例和环境选择一个包(maven artifact id)和类名。对于大多数用户来说,FlinkKafkaConsumer08(flink-connector-kafka的一部分)是适当的。
Maven Dependency | Supported since | Consumer and Producer Class name | Kafka version | Notes |
---|---|---|---|---|
flink-connector-kafka-0.8_2.11 | 1.0.0 | FlinkKafkaConsumer08 FlinkKafkaProducer08 | 0.8.x | 在内部使用Kafka 的SimpleConsumer API。偏移量是通过Flink提交给ZK的。 |
flink-connector-kafka-0.9_2.11 | 1.0.0 | FlinkKafkaConsumer09 FlinkKafkaProducer09 | 0.9.x | 使用新的Consumer API Kafka。 |
flink-connector-kafka-0.10_2.11 | 1.2.0 | FlinkKafkaConsumer010 FlinkKafkaProducer010 | 0.10.x | 此连接器支持带有时间戳的Kafka消息,用于生成和使用。 |
flink-connector-kafka-0.11_2.11 | 1.4.0 | FlinkKafkaConsumer011 FlinkKafkaProducer011 | 0.11.x | 由于0.11.x Kafka不支持scala 2.10。此连接器支持Kafka事务性消息传递,为生产者提供一次语义。 |
导入maven库:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>1.7-SNAPSHOT</version>
</dependency>
Kafka Consumer
我们需要编写一个Kafka Consumer,通过Flink计算引擎从Kafka相应的Topic中读取数据。在Flink中,我们可以通过FlinkKafkaConsumer08来实现,这个类提供了读取一个或者多个Kafka Topic的机制。它的构造函数接收以下几个参数:
- topic的名字,可以是String(用于读取一个Topic)List(用于读取多个Topic);
- 可以提供一个DeserializationSchema / KeyedDeserializationSchema用于反系列化Kafka中的字节数组;
- Kafka consumer的一些配置信息,而且我们必须指定bootstrap.servers、zookeeper.connect(这个属性仅仅在Kafka 0.8中需要)和group.id属性。
使用FlinkKafkaConsumer08类吧,初始化如下:
val kafkaProps = new Properties()
kafkaProps.setProperty("bootstrap.servers", "master.bigdata:9092,slave1.bigdata:9092,slave2.bigdata:9092")
kafkaProps.setProperty("zookeeper.connect", "spmaster.bigdata:2181,spslave1.bigdata:2181,spslave2.bigdata:2181")
kafkaProps.setProperty("group.id", "nealy_group")
val kafkaConsumer = new FlinkKafkaConsumer08[String]("train_appevent_topic", new SimpleStringSchema(), kafkaProps)
kafkaConsumer.print()
上面的例子中使用到SimpleStringSchema
来反系列化message,这个类是实现了DeserializationSchema
接口,并重写了T deserialize(byte[] message)
函数,DeserializationSchema
接口仅提供了反系列化data的接口,所以如果我们需要反系列化key,我们需要使用KeyedDeserializationSchema
的子类。KeyedDeserializationSchema
接口提供了T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)
方法,可以反系列化kafka消息的data和key。
为了方便使用,Flink内部提供了一序列的schemas:
-
TypeInformationSerializationSchema
和TypeInformationKeyValueSerializationSchema
,它可以根据Flink的TypeInformation信息来推断出需要选择的schemas。 此模式是其他通用序列化方法的高性能Flink替代方案。 -
JsonDeserializationSchema
和JSONKeyValueDeserializationSchema
将序列化的JSON转换为ObjectNode对象,访问字段可以使用objectNode.get(“field”)作为(Int / String / ...)()
,KeyValue objectNode包含一个“key”和“value”字段,其中包含所有字段,以及一个可选的“metadata”字段,用于获取offset/partition/topic的信息。 -
AvroDeserializationSchema
它使用静态模式来读取使用Avro格式序列化的数据。它可以从Avro生成的类(AvroDeserializationSchema.forSpecific(...))中推断出模式,也可以GenericRecords
使用手动提供的模式(with AvroDeserializationSchema.forGeneric(...))。此反序列化架构要求序列化记录不包含嵌入式架构。
- 还有一个可用的模式版本,可以在Confluent Schema Registry中查找编写器的模式(用于编写记录的 模式)。使用这些反序列化模式记录将使用从模式注册表中检索的模式进行读取,并转换为静态提供的模式(通过
ConfluentRegistryAvroDeserializationSchema.forGeneric(...)
或ConfluentRegistryAvroDeserializationSchema.forSpecific(...)
)。
要使用此反序列化模式,必须添加以下POM依赖项:
<!-- AvroDeserializationSchema 模式-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.7-SNAPSHOT</version>
</dependency>
<!-- ConfluentRegistryAvroDeserializationSchema 模式-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>1.7-SNAPSHOT</version>
</dependency>
Kafka Consumer 开始位置配置
Flink Kafka Consumer允许配置Kafka分区的起始位置。
val kafkaConsumer = new FlinkKafkaConsumer08[String]("train_appevent_topic", new SimpleStringSchema(), kafkaProps)
kafkaConsumer.setStartFromEarliest() // 从最早的记录开始
kafkaConsumer.setStartFromLatest() // 从最新的记录开始
kafkaConsumer.setStartFromTimestamp(时间戳) // 从指定的时代时间戳(毫秒)开始 kafka 0.10.X
kafkaConsumer.setStartFromGroupOffsets() // 默认的行为
kafkaConsumer.print()
Flink Kafka Consumer的所有版本都具有上述明确的起始位置配置方法。
-
setStartFromGroupOffsets() : 默认行为; 从group.id Kafka代理(或Zookeeper for Kafka 0.8)中的消费者组(在消费者属性中设置)提交的偏移量开始读取分区。如果找不到分区的偏移量,t将使用属性中的
auto.offset.rese
进行设置。 -
setStartFromEarliest() / setStartFromLatest():从最早/最新记录开始。在这种模式,Kafka中的已提交偏移将被忽略。
-
setStartFromTimestamp(long):从指定的时间戳开始消费。对于每个分区,将从时间戳大于或等于指定时间戳作为起始位置,开始消费。在此模式下,Kafka中的已提交偏移将被忽略,不会用作起始位置。
您还可以指定消费者应从每个分区开始的确切偏移量:
val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)
val kafkaConsumer = new FlinkKafkaConsumer010[String]("train_trajectory_appevent", new SimpleStringSchema(), kafkaProps)
kafkaConsumer.setStartFromSpecificOffsets(specificStartOffsets)
上述的代码配置了myTopic的partition 0,1,2在被Flink job消费的起始位置。假设myTopic总共有5个partition,那么剩下的两个partition没有被配置具体的offset的起始位,所以Flink会对这两个partition的采用默认的offset起始位的配置(setStartFromGroupOffsets)。
注意,如果你在这个job中配置了enableCheckpointing() 或者从某个savepoint来启动这个job,那么起始位会优先从savepoint或者checkpoint中获取。
Kafka Consumers和Fault Tolerance
如果我们启用了Flink的Checkpint机制,那么Flink Kafka Consumer将会从指定的Topic中消费消息,然后定期地将Kafka offsets信息、状态信息以及其他的操作信息进行Checkpint。所以,如果Flink作业出故障了,Flink将会从最新的Checkpint中恢复,并且从上一次偏移量开始读取Kafka中消费消息。
我们需要在程序中配置 Flink Kafkaconsumer的容错机制:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(1000) // 默认 500毫秒
还有一点需要注意的是,Flink只有在task slot的数量足够的情况下才可以成功的重启job,所以如果job是因为TaskManager down掉(或者无法连接到集群)导致task slot不足而失败,那么必须要恢复增加足够的task slot才能让job重启。而Flink on YARN 支持自动的重启丢失的YARN containers。
Kafka Consumers 发现新增Topic和分区
动态发现分区
Flink Kafka Consumer支持动态创建的Kafka分区,并可以准确的保证exactly-once 消费。当在Job运行时,发现有新增的分区,将从最可能早的偏移量中开始消费。
默认情况下,禁用发现分区。要启用它,可以在提供的属性配置中flink.partition-discovery.interval-millis
设置非负值的时间间隔。
限制 如果使用Flink 1.3.x之前版本的 Savepoint 恢复运行时不能启用分区发现。如果启用,则将恢复失败并出现异常。在这种情况下,为了使用分区发现,请首先在Flink 1.3.x中使用savepoint ,然后再从中恢复。
动态发现Topic
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val myConsumer = new FlinkKafkaConsumer08[String](
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema,
properties)
val stream = env.addSource(myConsumer)
在上面的示例中,当作业开始运行时,消费者将Topic名称 test-topic-,与指定的正则表达式匹配所有主题(以单个数字开头并以单个数字结尾)。
为了让消费者在作业开始运行后,可以发现动态创建的主题。请将其属性flink.partition-discovery.interval-millis
设置为非负值。允许消费者发现新的topic的分区, 也可以匹配指定的正则表达式。
offset提交行为的配置
Flink KafkaConsumer允许配置向 Kafka brokers(或者向Zookeeper)提交offset的行为。需要注意的是,Flink Kafka Consumer并不依赖于这些提交回Kafka或Zookeeper的offset来保证容错。这些被提交的offset只是意味着Flink将消费的状态暴露在外以便于监控。
-
Checkpointingdisabled: 如果禁用了检查点, Flink Kafka Consumer依赖于它使用的具体的Kafka client的自动定期提交offset的行为,相应的设置是 Kafka properties中的
enable.auto.commit
(或者 auto.commit.enable 对于Kafka 0.8) 以及auto.commit.interval.ms
。 -
Checkpointingenabled: 如果启用了检查点,Flink Kafka Consumer会将offset存到checkpoint中,当checkpoint 处于completed的状态时。这保证了在Kafka brokers中的
committed offset
和checkpointed states
中的offset保持一致。通过调用setCommitOffsetOnCheckpoints(boolean)
来调整 offset自动提交是否开启(默认情况下是true,即开启自动提交)。请注意,在这种情况下,配置在properties 中的offset的定时自动提交行为将会被忽略。
容错机制
发生错误的情况下,Flink会如何处理呢?在finally块中记录最后消费到的offset再向JobManager提交checkpoint吗?在通常情况下,比如发生了手动cancel或者userCode的异常时,这么做没有问题。可是如果是因为其他原因(如Full GC)使得TaskManagerhung住了,甚至是机器挂了,那么这个时候就不能通过finally 块来保证exactly-once了。Flink依赖的是带barrier的checkpointing机制来解决容错的问题。
我们通过下面一副图来简述这种机制:
barrier可以理解为checkpoint之间的分隔符,在它之前的data属于前一个checkpoint,而在它之后的data属于另一个checkpoint。同时,barrier会由source(如FlinkKafkaConsumer)发起,并混在数据中,同数据一样传输给下一级的operator,直到sink为止。假设我们的Streaming Job只有一个source、一个map operator 以及一个sink,属于barrier所分隔的checkpoint 的数据已经被处理完毕并sink,而barrier还处于source和map operator之间,barrier 还处于map和sink之间。由于barrier已经被sink收到,那么说明checkpoint已经完成了(这个checkpoint的状态为completed并被存到了state backend中),它之前的数据已经被处理完毕并sink。
但是由于sink还没有收到barrier,那么所有之前之后的数据都会被缓存在sink的Input Buffer中,也就是说这部分数据虽然已经经过source消费并经过map处理了,但是还是没有写入目的地。所以如果Job在这个时候失败了,最后一个成功committed的checkpoint是checkpoint,所以FlinkKafkaConsumer从checkpoint中恢复出相应的partitionoffset就可以了。
我们注意到,虽然之后的部分数据和之后的所有数据虽然已经被source消费,但是都没有被sink,这部分数据会被FlinkKafkaConsumer“重复”消费,我们并没有丢失任何的数据也没有重复写入任何数据,保证了exactly-once。
- 在配置了checkpointingenable的情况下,FlinkKafkaConsumer08在开始消费数据之前,会优先从checkpoint中恢复出被消费的partition的offset,如果没有从checkpoint中恢复某些partition的offset,它会从Zookeeper中恢复,若从Zookeeper中仍然没有恢复,它会根据配置的offset起始行为来配置起始offset。
2.FlinkKafkaConsumer08通过Kafka的低级API和Flink带barrier的轻量级checkpoint机制保证了在高吞吐量的情况下的exactly-once。
网友评论