美文网首页
kafka08 消费指定offset的消息

kafka08 消费指定offset的消息

作者: 6c0fe9142f09 | 来源:发表于2018-09-14 16:35 被阅读375次

从头开始消费消息seekToBeginning()

  • 配置信息
Properties props = new Properties();
props.put("bootstrap.servers","132.232.14.247:9092");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id","group1");
  • 新建一个KafkaConsumer
KafkaConsumer<String,byte[]> consumer = new KafkaConsumer<String, byte[]>(props);
  • 通过TopicPartition指定要消费的partition
TopicPartition seekToEndPartition = new TopicPartition("mySecondTopic",3);
  • 对consumer指定partitionassign
consumer.assign(Arrays.asList(seekToEndPartition));
  • 调用consumer.seekToBeginning指定从头开始消费
consumer.seekToBeginning(Arrays.asList(seekToEndPartition));
  • 开始消费
ConsumerRecords<String, byte[]> records = consumer.poll(5000);
for(ConsumerRecord<String,byte[]> record:records){
System.out.println("s consumption message:partition="+record.partition()+",offset="+record.offset()+",key="+record.key()+",value="+record.value());
}
  • demo
public class MySeekToBeginningConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers","132.232.14.247:9092");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id","group1");

        KafkaConsumer<String,byte[]> consumer = new KafkaConsumer<String, byte[]>(props);

        try {
            TopicPartition seekToEndPartition = new TopicPartition("mySecondTopic",3);
            consumer.assign(Arrays.asList(seekToEndPartition));
            consumer.seekToBeginning(Arrays.asList(seekToEndPartition));
            ConsumerRecords<String, byte[]> records = consumer.poll(5000);
            for(ConsumerRecord<String,byte[]> record:records){
                System.out.println("s consumption message:partition="+record.partition()+",offset="+record.offset()+",key="+record.key()+",value="+record.value());
            }
        }catch (java.lang.Exception e){
            e.printStackTrace();
        }finally {
            consumer.close();
        }
    }
}

从尾开始消费消息seekToEnd()

  • 调用consumer.seekToEnd从末尾开始进行消费
···
consumer.seekToEnd(Arrays.asList(seekToEndPartition ));
···
  • demo
public class MySeekToEndConsumer {

    public static void main(String[] args) {
        Properties props=new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "group1");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String,byte[]> consumer=new KafkaConsumer<String,byte[]>(props);
        try {
            TopicPartition seekToEndPartition = new TopicPartition("mySecondTopic", 1);
            consumer.assign(Arrays.asList(seekToEndPartition));
            consumer.seekToEnd(Arrays.asList(seekToEndPartition ));
            ConsumerRecords<String, byte[]> records=consumer.poll(1000);
            for(ConsumerRecord<String, byte[]> record : records){
                System.out.println("MySeekToEndConsumer consumer message:partition="+record.partition()+",offset="+record.offset()+",key="+record.key()+",value="+record.value());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            consumer.close();
        }
    }
}

消费指定offset消息seek()

  • consumer.seek
//指定seekPartition和offset
consumer.seek(seekPartition,10);
  • demo

public class MySeekConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "132.232.14.247:9092");
        props.put("group.id", "group1");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        try {
            TopicPartition seekPartition = new TopicPartition("mySecondTopic", 1);
            consumer.assign(Arrays.asList(seekPartition));
            consumer.seek(seekPartition,10);
            ConsumerRecords<String, String> records = consumer.poll(5000);
            for (ConsumerRecord record:records){
                System.out.println("s consumption message:partition="+record.partition()+",offset="+record.offset()+",key="+record.key()+",value="+record.value());
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            consumer.close();
        }
    }
}

相关文章

网友评论

      本文标题:kafka08 消费指定offset的消息

      本文链接:https://www.haomeiwen.com/subject/jwfpgftx.html