美文网首页
kafka09 自定义提交offset

kafka09 自定义提交offset

作者: 6c0fe9142f09 | 来源:发表于2018-09-14 18:15 被阅读230次

    自定义提交offset

    之前的实验中,采用的是自动提交offset的方式,这样下次进行pull操作的时候,就能根据我们之前提交的offset拿到最新的消息。
    但是这种方式可能没办法满足一些特定的业务场景,下面让我们来手写提交offset的程序吧。

    • 关闭自动提交offset
    // 关闭自动提交offset
    props.put("enable.auto.commit", "false");
    
    • 同步提交consumer.commitSync();
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "132.232.14.247:9092");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("group.id", "group1");
            // 关闭自动提交offset
            props.put("enable.auto.commit", "false");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
            consumer.subscribe(Arrays.asList("mySecondTopic"));
            while (true){
                try {
                    ConsumerRecords<String, String> records = consumer.poll(1000);
                    for (ConsumerRecord<String,String> record:records){
                        System.out.println("s consumption message:partition="+record.partition()+",offset="+record.offset()+",key="+record.key()+",value="+record.value());
                    }
                    // 同步提交修改
                    consumer.commitSync();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    
    • 异步提交consumer.commitAsync(new OffsetCommitCallback() {}
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "132.232.14.247:9092");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("group.id", "group1");
            // 关闭自动提交offset
            props.put("enable.auto.commit", "false");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
            consumer.subscribe(Arrays.asList("mySecondTopic"));
            while (true){
                try {
                    ConsumerRecords<String, String> records = consumer.poll(5000);
                    for (ConsumerRecord<String,String> record:records){
                        System.out.println("s consumption message:partition="+record.partition()+",offset="+record.offset()+",key="+record.key()+",value="+record.value());
                    }
                    // 异步提交修改
                    consumer.commitAsync(new OffsetCommitCallback() {
                        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                            if (exception != null){
                                //处理异常逻辑
                            }else {
                                System.out.println("成功提交offset:"+offsets.toString());
                            }
                        }
                    });
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    
    特定业务逻辑
    • 按照partition同步进行offset的提交
    try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    //遍历每个partition
                    for (TopicPartition partition : records.partitions()) {
                        //获取指定partition中的消息记录
                        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                        //遍历指定partition中的消息记录
                        for (ConsumerRecord<String, String> record : partitionRecords){
                            //此处使用输出操作模拟项目中的业务处理。
                            System.out.println("MySyncConsumer2 consume message:partition="+record.partition()+",offset="+record.offset()+",key="+record.key()+",value="+record.value());
                        }
                        //获取partition中最后一条记录的offset
                        long lastoffset = partitionRecords.get(partitionRecords.size() - 1).offset(); 
                        //同步提交一个partition中的offset
                        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastoffset+ 1)));
                    }
                }
            } finally {
                consumer.close();
            }
    
    • 按照处理的消息量同步进行提交
    try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    for (ConsumerRecord<String, String> record : records)
                    {
                        //此处使用输出操作模拟项目中的业务处理。
                        System.out.println("MySyncConsumer3 consume message:partition="+record.partition()+",offset="+record.offset()+",key="+record.key()+",value="+record.value());
                        TopicPartition tp = new TopicPartition(record.topic(), record.partition());
                        OffsetAndMetadata om=new OffsetAndMetadata(record.offset(),"");
                        currentOffsets.put(tp,om);
                        //为方便测试,我们设置每5条记录提交一次
                        if (count % 5 == 0){
                            consumer.commitSync(currentOffsets); 
                        }
                        count++;
                    }
                    consumer.commitSync();
                }
            } finally {
                consumer.close();
            }
    

    相关文章

      网友评论

          本文标题:kafka09 自定义提交offset

          本文链接:https://www.haomeiwen.com/subject/cogogftx.html