美文网首页
flink kafka offset配置/提交

flink kafka offset配置/提交

作者: sunTengSt | 来源:发表于2020-08-14 16:01 被阅读0次

一:flink kafka offset配置


1. setStartFromGroupOffsets(默认的):

example:
Map specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);

myConsumer.setStartFromSpecificOffsets(specificStartOffsets);

查看partition offset

kafka-consumer-groups --bootstrap-server xxx:9092 --group groupId  --describe
TOPIC     PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   CONSUMER-ID                                       HOST            CLIENT-ID
xxx         0          13949           13949           0     xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439         /xxxx           xxx-1
xxx         1          13871           13871           0     xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439         /xxxx           xxx-1
xxx         2          13974           13974           0     xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439         /xxxx           xxx-1
xxx         3          14192           14192           0     xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439         /xxxx           xxx-1
xxx         4          14036           14036           0     xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439         /xxxx           xxx-1
  • 1.1、消费者组在kafka中提交的offsets开始读取partition;

  • 1.2、如果分区中offset没有找到,则使用kafka properties中的auto.offset.reset配置(比如:latest、earliest)


2. setStartFromEarliest()

从最早的记录开始,使用此配置,在kafka中已经提交的offset将被忽略,不会被使用


3. setStartFromLatest()

从最新的开始,使用此配置,在kafka中已经提交的offset将被忽略,不会被使用


4. setStartFromTimestamp(long)

  • 从指定的时间开始消费;
  • 对于每个partition,记录的时间大于等于指定的时间将作为起始消费点;
  • 如果partition的记录时间早于指定时间,则从最近的数据记录开始消费;
  • 此模式下,在kafka中已经提交的offset将被忽略不会作为消费起点。

5. properties配置offset

properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

解释:

  • earliest
    当各partition有消费者组已提交的offset时,从提交的offset开始消费;无提交的offset时,从起始开始消费
  • latest
    当各partition下有消费者组已提交的offset时,从提交的offset开始消费;无提交的offset时,消费最新的该partition下的数据
  • none
    topic各partition都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

二:kafka消费offset提交配置:


1. checkpoint禁用:

flink kafka消费依赖于内部kafka客户端自动定期的offset提交

配置:enable.auto.commit / auto.commit.interval.ms


2. checkpoint启用:

flink kafka consumer在checkpoint完成时自动提交offset在checkpoint state中;

配置:setCommitOffsetsOnCheckpoints(boolean) 来启用关闭;默认情况下,是开启的true
此模式下,配置在properties中自动周期性的offset提交将被忽略;


相关文章

网友评论

      本文标题:flink kafka offset配置/提交

      本文链接:https://www.haomeiwen.com/subject/oangdktx.html