下面是Flink读取Kafka数据的代码,其中就有五种读取offset的方式,并配置相应的介绍
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
Properties props = new Properties();
props.setProperty("bootstrap.servers",KAFKA_BROKER);
props.setProperty("zookeeper.connect", ZK_HOST);
props.setProperty("group.id",GROUP_ID);
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(TOPIC, new SimpleStringSchema(), props);
/**
* Map<KafkaTopicPartition, Long> Long参数指定的offset位置
* KafkaTopicPartition构造函数有两个参数,第一个为topic名字,第二个为分区数
* 获取offset信息,可以用过Kafka自带的kafka-consumer-groups.sh脚本获取
*/
Map<KafkaTopicPartition, Long> offsets = new HashedMap();
offsets.put(new KafkaTopicPartition("maxwell_new", 0), 11111111l);
offsets.put(new KafkaTopicPartition("maxwell_new", 1), 222222l);
offsets.put(new KafkaTopicPartition("maxwell_new", 2), 33333333l);
/**
* Flink从topic中最初的数据开始消费
*/
consumer.setStartFromEarliest();
/**
* Flink从topic中指定的时间点开始消费,指定时间点之前的数据忽略
*/
consumer.setStartFromTimestamp(1559801580000l);
/**
* Flink从topic中指定的offset开始,这个比较复杂,需要手动指定offset
*/
consumer.setStartFromSpecificOffsets(offsets);
/**
* Flink从topic中最新的数据开始消费
*/
consumer.setStartFromLatest();
/**
* Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数
*/
consumer.setStartFromGroupOffsets();
网友评论