美文网首页
华为kafka安全版重置group中的topic offset不

华为kafka安全版重置group中的topic offset不

作者: 锅碗瓢盆油盐酱醋 | 来源:发表于2021-01-13 00:24 被阅读0次

    业务场景

    华为kafka安全版,使用New Consumer API,采用group方式,消费者消费特定group下的topic数据,当group.id重置时,组中的所有topic就就会重头开始消费。如果只需要group中的某一个topic从头开始消费,而其他的topic接着上一次消费的位置继续消费,就不能简单的更换group.id来实现了。以下有两种方式可以用来重置offset (调整过程中必须保证消费者处于停止状态!!!)

    方式一:使用kafka的shell命令来对group进行操作

    1.完成用户认证
    source bigdata_env
    kinit username
    
    2.查看当前group详情
    kafka-consumer-groups.sh --bootstrap-server ip:port --new-consumer --describe --group yourGroupId --command-config consumer.properties
    
    3.重置topic的offset
    kafka-consumer-groups.sh --bootstrap-server ip:port --new-consumer --group yourGroupId --topic yourTopic --reset-offsets --to-earliest --execute --command-config consumer.properties
    

    以上操作完之后,正常情况下就可以执行命令1检查重置是否成功,当然笔者的情况是重置没有生效,执行操作1发现currentOffset依然没有变,于是就有了第二种方式 —— 使用consumer的java API

    方式二:使用java API重置(方式一不生效)

    1.连接kafka获取consumer
    // 定义kafka连接及认证信息
    private static final String BOOTSTRAP_SERVER = "ip:port";
    private static final String USERNAME = "username";
    private static final String KRB5_CONF_PATH = "./krb5.conf";
    private static final String USER_KEYTAB_PATH = "./user.keytab";
    // 定义groupid,topic及其分区数
    private static final String GROUP_ID = "yourGroupId";
    private static final Integer PARTITIONS = 3;
    private static final String TOPIC = "yourTopic";
    // consumer对象
    private static KafkaConsumer<String, String> consumer;
    static {
      // 设置认证信息
      System.setProperty("user.name", USERNAME);
      System.setProperty("zookeeper.server.principal", "zookeeper/hadoop.hadoop.com");
      System.setProperty("java.security.krb5.conf", KRB5_CONF_PATH);
      /**
       jaas.conf的路径,文件内容如下(可用代码动态创建):
       KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        userKeyTab=true
        keyTab="./user.keytab"
        principal="USERNAME"
        useTicketCache=false
        storeKey=true
        debug=true;
       } 
      */
      System.setProperty("java.security.auth.login.config", "./jaas.conf");
      // 设置kafka连接信息并创建连接
      Properties props = new Properties();
      props.put("bootstrap.servers", BOOTSTRAP_SERVER);
      props.put("group.id", GROUP_ID);
      props.put("kerberos.domain.name", "hadoop.hadoop.com");
      props.put("sasl.kerberos.service.name", "kafka");
      props.put("security.protocol", "SASL_PLAINTEXT");
      // 关闭自动提交,需要手动提交以重置offset
      props.put("enable.auto.commit", false);
      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      consumer = new KafkaConsumer<>(props);
      Collection<TopicPartition> partitionList = new ArrayList<>();
      for (int p = 0; p < PARTITIONS; p++) {
        partitionList.add(new TopicPartiton(TOPIC, p))
      }
      consumer.assign(partitionList);
    }
    
    2.重置offset
    public static void resetOffsetTo (long position) {
      Map<TopicPartition, OffsetAndMetadata> commitMap = new HashMap<>();
      Set<TopicPartiton> partitionSet = consumer.assignment();
      for (TopicPartition partition : partitionSet) {
        // 将当前分区的offset重置为position
        commitMap.put(partition, new OffsetAndMetadata(position));
      }
      // 提交offset信息
      consumer.commitSync(commitMap);
    }
    
    3.查看当前position(即当前的offset位置)
    // 查看指定分区的currentOffset
    consumer.position(partition);
    
    4.消费数据,看是否从头开始消费
    // 消费一次数据,超时时间为3秒
    consumer.poll(3000);
    
    4其他api
    // 查看所有topic
    consumer.listTopics()
    

    方式二使用的是消费者消费数据时提交的offset信息,一般来说只要topic可以正常被消费,方式二都能生效。

    相关文章

      网友评论

          本文标题:华为kafka安全版重置group中的topic offset不

          本文链接:https://www.haomeiwen.com/subject/lvwjaktx.html