kafka消费者偏移量手动管理-同步提交
首先,我们将参数enable.auto.commit参数的值设为false。不进行自动偏移量提交。进行如下代码。手动提交偏移量。为了模拟再均衡或者消费者异常终止的情况出现的偏移量未提交的现象,我们在提交偏移量之前,让线程休眠10s。代码如下
package com.xx.cn.kafka;
import cn.hutool.setting.dialect.Props;
import com.xx.cn.common.KafkaParams;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Properties;
/**
* 〈一句话功能简述〉<br>
* 〈提交偏移量〉
*共四个步骤
* 1. 准备连接参数
* 2. 订阅主题
* 3. 拉取数据
* 4. 同步提交偏移量
*/
public class ConsumerCommitOffset {
public static final Logger logger = LoggerFactory.getLogger(ConsumerCommitOffset.class);
public static void main(String[] args) {
//校验参数
if(args.length<2){
System.out.println("args must be [config] runMode[prd|prev]");
System.exit(-1);
}
String config = args[0];
String runMode = args[1];
Properties props = kafakParams(config, runMode);
//创建消费者客户端
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅主题
consumer.subscribe(Collections.singletonList("country"));
while (true){
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
try {
Thread.sleep(10000);
//在每个批次后,同步提交偏移量。等待提交成功后,进行下一批次数据拉取
consumer.commitSync();
logger.info("offset commit finish" );
} catch (InterruptedException e) {
logger.error(e.getMessage());
} catch (CommitFailedException e ){
logger.error(e.getMessage());
}
}
}
private static void processRecord(ConsumerRecord<String, String> record) {
System.out.printf(
"topic = %s , partition = %s ,offset= %s ,key =%s ,value =%s\n",
record.topic(),record.partition(),record.offset(),record.key(),record.value()
);
}
private static Properties kafakParams(String config,String runMode){
Props properties = new Props(config);
Properties props = new Properties();
props.put(KafkaParams.BOOTSTRAP_SERVERS, properties.getStr(KafkaParams.BOOTSTRAP_SERVERS));
props.put(KafkaParams.GROUP_ID, "pen");
props.put(KafkaParams.KEY_DESERIALIZER, StringDeserializer.class);
props.put(KafkaParams.VALUE_DESERIALIZER, StringDeserializer.class);
props.put(KafkaParams.AUTO_OFFSET_RESET, properties.getStr(KafkaParams.AUTO_OFFSET_RESET));
props.put(KafkaParams.ENABLE_AUTO_COMMIT,properties.getBool(KafkaParams.ENABLE_AUTO_COMMIT));
if (runMode.equals("prev")){
props.put(KafkaParams.SECURITY_PROTOCOL, properties.getStr(KafkaParams.SECURITY_PROTOCOL));
props.put(KafkaParams.SASL_MECHANISM, properties.getStr(KafkaParams.SASL_MECHANISM));
props.put(KafkaParams.SASL_JAAS_CONFIG, properties.getStr(KafkaParams.SASL_JAAS_CONFIG));
}
return props;
}
}
使用客户端模拟生产者,写入数据 (jingzhuang + num的方式)。
情况1:在完成偏移量提交后,终止消费者。按照理论,完成了最后消费,并提交偏移量之后,重启消费者消息不会出现重复。
终止程序前的日志
page1
重启程序的日志,可以看到重启消费者后,没有重复消费
page2
这次我们在没有进行最后一次 jingzhuang 6 这次消息的偏移量的提交时候,终止程序。理论上来说,再次启动会重复消费 jingzhuang6这条数据,因为本消息的偏移量没有被提交。
page3
实际情况与我们预期相符和。
因此,用不提交偏移量,如果程序在最后一个批次没有处理完成的情况下终止,下次重启会出现重复消费的情况。另外我们使用consumer.commitSync() 同步提交偏移量,会在没有提交成功的时候一直等待,降低消费吞吐。但是优点是,在没有提交成功,会按照配置进行重试,可以降低消费重复的数量。
网友评论