美文网首页
kafka重置消费位点java代码实现

kafka重置消费位点java代码实现

作者: b335eb9201c3 | 来源:发表于2022-10-30 13:56 被阅读0次
    package cn.yzw.jc.ppfs.biz.shared.mq;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.collections4.CollectionUtils;
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.IntegerDeserializer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Service;
    
    import java.util.*;
    
    @Service
    @Slf4j
    public class KafkaConsumerOffsetManager {
    
        private static String topic = "dba.mssql.jcmalldb.YZ_Mall.Order";
    
        @Value("${spring.kafka.bootstrap-servers}")
        private String kafkaService;
        @Value("${ppfs.kafka.group}")
        private String groupId;
    
        /**
         * 重置kafka消费位点
         *
         * @param topicPartitionMap
         */
        public void resetPartitionSites(Map<String, Map<Integer, Long>> topicPartitionMap) {
            Map<String, Object> config = new HashMap<String, Object>();
            //bootserverscon
            config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaService);
            //valuedeserilizer
            config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            //groupid
            config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            //如果找不到偏移量,设置earliest,则从最新消费开始,也就是消费者一开始最新消费的时候
            //一定要注意顺序,读取时候的顺序会影响
    //        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            //消费者
            KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(config);
            for (Map.Entry<String, Map<Integer, Long>> entry : topicPartitionMap.entrySet()) {
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
                List<PartitionInfo> partitionInfos = consumer.partitionsFor(entry.getKey());
                if (CollectionUtils.isEmpty(partitionInfos)) {
                    log.warn("topic{}未获取到分区", entry.getKey());
                    continue;
                }
                if (partitionInfos.size() > 0) {
                    for (PartitionInfo p : partitionInfos) {
                        if (entry.getValue().get(p.partition()) == null) {
                            log.warn("topic{}未获取到{}分区位点", entry.getKey(), p.partition());
                            continue;
                        }
                        offset.put(new TopicPartition(entry.getKey(), p.partition()), new OffsetAndMetadata(entry.getValue().get(p.partition())));
                    }
                }
                if (!offset.isEmpty()) {
                    consumer.commitSync(offset);
                }
            }
    
        }
    
        public static void main(String[] args) {
            //配置必要的参数
            //准备一个map集合放置参数
            Map<String, Object> config = new HashMap<String, Object>();
            //bootserverscon
            config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.1.43:6667,172.16.1.44:6667,172.16.1.45:6667");
            //valuedeserilizer
            config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            //groupid
            config.put(ConsumerConfig.GROUP_ID_CONFIG, "ppfs-qa");
            //如果找不到偏移量,设置earliest,则从最新消费开始,也就是消费者一开始最新消费的时候
            //一定要注意顺序,读取时候的顺序会影响
            config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            //此处是把消费者的偏移量重置到生产者最顶端
            Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>();
            hashMaps.put(new TopicPartition(topic, 0), new OffsetAndMetadata(0));
            //消费者
            KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(config);
            //放置刚刚设置的偏移量
            consumer.commitSync(hashMaps);
            //先订阅后消费
            consumer.subscribe(Arrays.asList(topic));
            // 批量从主题的分区拉取消息
            //final ConsumerRecords<Integer, String> consumerRecords = consumer.poll(3_000);
            ConsumerRecords<Integer, String> consumerRecords = consumer.poll(3000);
    
            //遍历本次从主题的分区拉取的批量消息
            consumerRecords.forEach(new java.util.function.Consumer<ConsumerRecord<Integer, String>>() {
                @Override
                public void accept(ConsumerRecord<Integer, String> consumerRecord) {
                    System.out.println(
                            consumerRecord.topic() + "\t"
                                    + consumerRecord.offset() + "\t"
                                    + consumerRecord.key() + "\t"
                                    + consumerRecord.value() + "\t"
                    );
                }
            });
            consumer.close();
    
    
        }
    }
    
    
    

    相关文章

      网友评论

          本文标题:kafka重置消费位点java代码实现

          本文链接:https://www.haomeiwen.com/subject/oxgktdtx.html