介绍
生产环境中的Kafka,需要从CPU, Memory, 磁盘,Kafka自身的Metrics等多方面进行监控。这样才能在出现问题的时候,做到精确定位,及时响应。
在众多指标中有一个很重要的指标就是Lag,它表示的是消费者消费的滞后程度。如果生产者向kafka的某个topic分区中写入了1w条消息,消费者当前消费的offset的是9000, 则Lag就是1000。如果Lag很大,则表明消费者无法及时消费kafka topic中消息。此时需要定位具体原因,及时处理,避免更大的损失。
kafka常用的监控方法有三种
- 使用kafka命令行工具kafka-consumer-groups
- 使用kafka Java Consumer API编程
- 使用Kafka自带的JMX监控指标
本文主要介绍的是第二种方式。利用kafka java consumer API获取Lag信息,然后写入时序数据库InfluxDB,最后用grafana进行展示。
数据流图
data-flow.pngkafka monitor program
-
主要代码部分
首先定义AdminClient, KafkaConsumer, InfluxDB三个bean。
@Bean
public AdminClient adminClient() {
Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return KafkaAdminClient.create(properties);
}
@Bean
public KafkaConsumer kafkaConsumer() {
Properties properties = new Properties();
properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaMonitorConsumerGroup);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return new KafkaConsumer(properties);
}
@Bean
public InfluxDB influxDB() {
InfluxDB influxDB = InfluxDBFactory.connect(influxdbUrl, influxdbUsername, influxdbPassword);
influxDB.ping();
return influxDB;
}
然后获取集群所有ConsumerGroup信息,再通过ConsumerGroup ID获取分区和offset信息。
public List<OffsetEntity> getOffsetEntityFromCluster() {
ListConsumerGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups();
KafkaFuture<Collection<ConsumerGroupListing>> consumerGroupsFuture = listConsumerGroupsResult.all();
Collection<ConsumerGroupListing> consumerGroupListingCollection = null;
try {
consumerGroupListingCollection = consumerGroupsFuture.get();
} catch (InterruptedException e) {
logger.error("get consumer groups interrupted exception: " + e.getMessage());
System.exit(-1);
} catch (ExecutionException e) {
logger.error("get consumer groups execution exception: " + e.getMessage());
System.exit(-1);
}
List<OffsetEntity> offsetEntityList = new ArrayList<>();
for(ConsumerGroupListing consumerGroupListing : consumerGroupListingCollection) {
offsetEntityList.addAll(getOffsetEntityFromGroup(consumerGroupListing.groupId()));
}
return offsetEntityList;
}
public List<OffsetEntity> getOffsetEntityFromGroup(String groupId) {
ListConsumerGroupOffsetsResult offsetResult = adminClient.listConsumerGroupOffsets(groupId);
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> offsetFuture = offsetResult.partitionsToOffsetAndMetadata();
Map<TopicPartition, OffsetAndMetadata> offsetMap = null;
try {
offsetMap = offsetFuture.get();
} catch (InterruptedException e) {
logger.error("get offset interrupted exception: " + e.getMessage());
System.exit(-1);
} catch (ExecutionException e) {
logger.error("get offset execution exception: " + e.getMessage());
System.exit(-1);
}
Map<TopicPartition, Long> topicPartitionOffsetMap = kafkaConsumer.endOffsets(offsetMap.keySet());
List<OffsetEntity> offsetEntityList = new ArrayList<>();
for(Map.Entry<TopicPartition, OffsetAndMetadata> offsetEntry : offsetMap.entrySet()) {
TopicPartition topicPartition = offsetEntry.getKey();
OffsetAndMetadata offsetAndMetadata = offsetEntry.getValue();
OffsetEntity offsetEntity = new OffsetEntity();
offsetEntity.setRegion(region);
offsetEntity.setGroupId(groupId);
offsetEntity.setTopic(topicPartition.topic());
offsetEntity.setPartition(topicPartition.partition());
Long logEndOffset = topicPartitionOffsetMap.get(topicPartition);
Long offset = offsetAndMetadata.offset();
offsetEntity.setLogEndOffset(logEndOffset);
offsetEntity.setOffset(offset);
offsetEntity.setLag(logEndOffset - offset);
offsetEntityList.add(offsetEntity);
}
return offsetEntityList;
}
最终grafana中展示效果
grafana的配置可以通过https://github.com/samrui/kafkawarden/blob/master/grafana/dashboard.json 文件导入
grafana.png
网友评论