kafka中提供了很多api管理,但官方文档中并未写出,查看源码才能看到。
- 查看集群所有group
private static void allGroups(){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
AdminClient client = AdminClient.create(props);
Map<Node, List<GroupOverview>> groups = JavaConversions.mapAsJavaMap(client.listAllGroups());
for(Map.Entry<Node,List<GroupOverview>> entry : groups.entrySet()){
Iterator<GroupOverview> groupOverviewIterator = JavaConversions.asJavaIterator(entry.getValue().iterator);
while (groupOverviewIterator.hasNext()) {
System.out.println(groupOverviewIterator.next().groupId());
}
}
}
- 查看给定group的消费位移情况
private static void offset(String groupId){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
AdminClient client = AdminClient.create(props);
Map<TopicPartition, Object> offsets = JavaConversions.mapAsJavaMap(client.listGroupOffsets(groupId));
Long offset = (Long) offsets.get(new TopicPartition("test", 0));
System.out.println(offset);
client.close();
}
网友评论