美文网首页中间件
Flink消费Kafka数据时指定offset的五种方式

Flink消费Kafka数据时指定offset的五种方式

作者: 香山上的麻雀 | 来源:发表于2019-08-29 10:07 被阅读0次

下面是Flink读取Kafka数据的代码,其中就有五种读取offset的方式,并配置相应的介绍

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(5000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);

Properties props = new Properties();
props.setProperty("bootstrap.servers",KAFKA_BROKER);
props.setProperty("zookeeper.connect", ZK_HOST);
props.setProperty("group.id",GROUP_ID);
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(TOPIC, new SimpleStringSchema(), props);

/**
* Map<KafkaTopicPartition, Long> Long参数指定的offset位置
* KafkaTopicPartition构造函数有两个参数,第一个为topic名字,第二个为分区数
* 获取offset信息,可以用过Kafka自带的kafka-consumer-groups.sh脚本获取
*/
Map<KafkaTopicPartition, Long> offsets = new HashedMap();
offsets.put(new KafkaTopicPartition("maxwell_new", 0), 11111111l);
offsets.put(new KafkaTopicPartition("maxwell_new", 1), 222222l);
offsets.put(new KafkaTopicPartition("maxwell_new", 2), 33333333l);

/**
* Flink从topic中最初的数据开始消费
*/
consumer.setStartFromEarliest();

/**
* Flink从topic中指定的时间点开始消费,指定时间点之前的数据忽略
*/
consumer.setStartFromTimestamp(1559801580000l);

/**
* Flink从topic中指定的offset开始,这个比较复杂,需要手动指定offset
*/
consumer.setStartFromSpecificOffsets(offsets);

/**
* Flink从topic中最新的数据开始消费
*/
consumer.setStartFromLatest();

/**
* Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数
*/
consumer.setStartFromGroupOffsets();

相关文章

网友评论

    本文标题:Flink消费Kafka数据时指定offset的五种方式

    本文链接:https://www.haomeiwen.com/subject/xbvbectx.html