美文网首页Java 杂谈
获取kafka最新offset-java

获取kafka最新offset-java

作者: Java架构学习者 | 来源:发表于2019-02-22 11:14 被阅读0次

    之前笔者曾经写过通过scala的方式获取kafka最新的offset

    但是大多数的情况我们需要使用java的方式进行获取最新offset

    以下是通过java代码获取kafka最新offset

    GetOffsetShellWrap

    publicclass GetOffsetShellWrap {privatestaticLoggerlog= LoggerFactory.getLogger(GetOffsetShellWrap.class);privateStringtopic;privateintport;privateStringhost;privateinttime;publicGetOffsetShellWrap(Stringtopic,intport,Stringhost,inttime) {this.topic = topic;this.port = port;this.host = host;this.time = time; }publicMap getEveryPartitionMaxOffset() {//1.获取topic所有分区 以及每个分区的元数据 => 返回 Map<分区id,分区元数据>TreeMap partitionIdAndMeta = findTopicEveryPartition(); Mapmap=newHashMap();for(Entry entry : partitionIdAndMeta.entrySet()) {intleaderPartitionId = entry.getKey();//2.根据每个分区的元数据信息 ==> 获取leader分区的主机StringleadBroker = entry.getValue().leader().host();StringclientName ="Client_"+ topic +"_"+ leaderPartitionId; SimpleConsumer consumer =newSimpleConsumer(leadBroker, port,100000,64*1024, clientName);//3.从leader主机获取分区的offsetlongreadOffset = getLastOffset(consumer, topic, leaderPartitionId, clientName);map.put(String.valueOf(leaderPartitionId),String.valueOf(readOffset));if(consumer !=null) consumer.close(); }returnmap; }privateTreeMap findTopicEveryPartition(){ TreeMapmap=newTreeMap(); SimpleConsumer consumer =null;try{ consumer =newSimpleConsumer(host, port,100000,64*1024,"leaderLookup"+newDate().getTime()); List topics = Collections.singletonList(topic); TopicMetadataRequest req =newTopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List metaData = resp.topicsMetadata();if(metaData!=null&& !metaData.isEmpty()){ TopicMetadata item = metaData.get(0);for(PartitionMetadata part : item.partitionsMetadata()) {map.put(part.partitionId(), part); } } }catch(Exception e) { e.printStackTrace(); }finally{if(consumer !=null) consumer.close(); }returnmap; }privatelonggetLastOffset(SimpleConsumer consumer,Stringtopic,intleaderPartitionId,StringclientName) { TopicAndPartition topicAndPartition =newTopicAndPartition(topic,leaderPartitionId); Map requestInfo =newHashMap(); requestInfo.put(topicAndPartition,newPartitionOffsetRequestInfo(time,1));  kafka.javaapi.OffsetRequest request =newkafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName); OffsetResponse response = consumer.getOffsetsBefore(request);if(response.hasError()) {log.error("Error fetching data Offset Data the Broker. Reason: "+ response.errorCode(topic, leaderPartitionId));return0; }long[] offsets = response.offsets(topic, leaderPartitionId);returnoffsets[0]; } }

    GetOffsetShellWrapJavaTest

    publicclass GetOffsetShellWrapJavaTest {publicstaticvoidmain(String[] args) {intport =9092;Stringtopic ="2017-11-6-test";inttime =-1; GetOffsetShellWrap offsetSearch =newGetOffsetShellWrap(topic,port,"hadoop-01",time); Mapmap= offsetSearch.getEveryPartitionMaxOffset();for(Stringkey:map.keySet()) { System.out.println(key+"---"+map.get(key)); } }}

    结果输出:

    0---160961---159302---16099

    ”我自己是一名从事了十余年的后端的老程序员,辞职后目前在做讲师,近期我花了一个月整理了一份最适合2018年学习的JAVA干货(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)从事后端的小伙伴们都可以来了解一下的,这里是程序员秘密聚集地,各位还在架构师的道路上挣扎的小伙伴们速来。“

    加QQ群:611481448(名额有限哦!)

    相关文章

      网友评论

        本文标题:获取kafka最新offset-java

        本文链接:https://www.haomeiwen.com/subject/tmisyqtx.html