业务场景
华为kafka安全版,使用New Consumer API,采用group方式,消费者消费特定group下的topic数据,当group.id重置时,组中的所有topic就就会重头开始消费。如果只需要group中的某一个topic从头开始消费,而其他的topic接着上一次消费的位置继续消费,就不能简单的更换group.id来实现了。以下有两种方式可以用来重置offset (调整过程中必须保证消费者处于停止状态!!!)
方式一:使用kafka的shell命令来对group进行操作
1.完成用户认证
source bigdata_env
kinit username
2.查看当前group详情
kafka-consumer-groups.sh --bootstrap-server ip:port --new-consumer --describe --group yourGroupId --command-config consumer.properties
3.重置topic的offset
kafka-consumer-groups.sh --bootstrap-server ip:port --new-consumer --group yourGroupId --topic yourTopic --reset-offsets --to-earliest --execute --command-config consumer.properties
以上操作完之后,正常情况下就可以执行命令1检查重置是否成功,当然笔者的情况是重置没有生效,执行操作1发现currentOffset依然没有变,于是就有了第二种方式 —— 使用consumer的java API
方式二:使用java API重置(方式一不生效)
1.连接kafka获取consumer
// 定义kafka连接及认证信息
private static final String BOOTSTRAP_SERVER = "ip:port";
private static final String USERNAME = "username";
private static final String KRB5_CONF_PATH = "./krb5.conf";
private static final String USER_KEYTAB_PATH = "./user.keytab";
// 定义groupid,topic及其分区数
private static final String GROUP_ID = "yourGroupId";
private static final Integer PARTITIONS = 3;
private static final String TOPIC = "yourTopic";
// consumer对象
private static KafkaConsumer<String, String> consumer;
static {
// 设置认证信息
System.setProperty("user.name", USERNAME);
System.setProperty("zookeeper.server.principal", "zookeeper/hadoop.hadoop.com");
System.setProperty("java.security.krb5.conf", KRB5_CONF_PATH);
/**
jaas.conf的路径,文件内容如下(可用代码动态创建):
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
userKeyTab=true
keyTab="./user.keytab"
principal="USERNAME"
useTicketCache=false
storeKey=true
debug=true;
}
*/
System.setProperty("java.security.auth.login.config", "./jaas.conf");
// 设置kafka连接信息并创建连接
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVER);
props.put("group.id", GROUP_ID);
props.put("kerberos.domain.name", "hadoop.hadoop.com");
props.put("sasl.kerberos.service.name", "kafka");
props.put("security.protocol", "SASL_PLAINTEXT");
// 关闭自动提交,需要手动提交以重置offset
props.put("enable.auto.commit", false);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
Collection<TopicPartition> partitionList = new ArrayList<>();
for (int p = 0; p < PARTITIONS; p++) {
partitionList.add(new TopicPartiton(TOPIC, p))
}
consumer.assign(partitionList);
}
2.重置offset
public static void resetOffsetTo (long position) {
Map<TopicPartition, OffsetAndMetadata> commitMap = new HashMap<>();
Set<TopicPartiton> partitionSet = consumer.assignment();
for (TopicPartition partition : partitionSet) {
// 将当前分区的offset重置为position
commitMap.put(partition, new OffsetAndMetadata(position));
}
// 提交offset信息
consumer.commitSync(commitMap);
}
3.查看当前position(即当前的offset位置)
// 查看指定分区的currentOffset
consumer.position(partition);
4.消费数据,看是否从头开始消费
// 消费一次数据,超时时间为3秒
consumer.poll(3000);
4其他api
// 查看所有topic
consumer.listTopics()
方式二使用的是消费者消费数据时提交的offset信息,一般来说只要topic可以正常被消费,方式二都能生效。
网友评论