美文网首页
如何用 influxDB + grafana 监控 kafka

如何用 influxDB + grafana 监控 kafka

作者: 慢手暗夜 | 来源:发表于2019-07-30 23:37 被阅读0次

    介绍

    生产环境中的Kafka,需要从CPU, Memory, 磁盘,Kafka自身的Metrics等多方面进行监控。这样才能在出现问题的时候,做到精确定位,及时响应。

    在众多指标中有一个很重要的指标就是Lag,它表示的是消费者消费的滞后程度。如果生产者向kafka的某个topic分区中写入了1w条消息,消费者当前消费的offset的是9000, 则Lag就是1000。如果Lag很大,则表明消费者无法及时消费kafka topic中消息。此时需要定位具体原因,及时处理,避免更大的损失。

    kafka常用的监控方法有三种

    1. 使用kafka命令行工具kafka-consumer-groups
    2. 使用kafka Java Consumer API编程
    3. 使用Kafka自带的JMX监控指标

    本文主要介绍的是第二种方式。利用kafka java consumer API获取Lag信息,然后写入时序数据库InfluxDB,最后用grafana进行展示。

    数据流图

    data-flow.png

    kafka monitor program

    首先定义AdminClient, KafkaConsumer, InfluxDB三个bean。

    
      @Bean
      public AdminClient adminClient() {
        Properties properties = new Properties();
        properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        return KafkaAdminClient.create(properties);
      }
    
      @Bean
      public KafkaConsumer kafkaConsumer() {
        Properties properties = new Properties();
        properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaMonitorConsumerGroup);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    
        return new KafkaConsumer(properties);
      }
    
      @Bean
      public InfluxDB influxDB() {
        InfluxDB influxDB = InfluxDBFactory.connect(influxdbUrl, influxdbUsername, influxdbPassword);
        influxDB.ping();
        return influxDB;
      }
      
    

    然后获取集群所有ConsumerGroup信息,再通过ConsumerGroup ID获取分区和offset信息。

      public List<OffsetEntity> getOffsetEntityFromCluster() {
        ListConsumerGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups();
        KafkaFuture<Collection<ConsumerGroupListing>> consumerGroupsFuture = listConsumerGroupsResult.all();
        Collection<ConsumerGroupListing> consumerGroupListingCollection = null;
    
        try {
          consumerGroupListingCollection = consumerGroupsFuture.get();
        } catch (InterruptedException e) {
          logger.error("get consumer groups interrupted exception: " + e.getMessage());
          System.exit(-1);
        } catch (ExecutionException e) {
          logger.error("get consumer groups execution exception: " + e.getMessage());
          System.exit(-1);
        }
        List<OffsetEntity> offsetEntityList = new ArrayList<>();
        for(ConsumerGroupListing consumerGroupListing : consumerGroupListingCollection) {
          offsetEntityList.addAll(getOffsetEntityFromGroup(consumerGroupListing.groupId()));
        }
        return offsetEntityList;
      }
    
      public List<OffsetEntity> getOffsetEntityFromGroup(String groupId) {
        ListConsumerGroupOffsetsResult offsetResult = adminClient.listConsumerGroupOffsets(groupId);
        KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> offsetFuture = offsetResult.partitionsToOffsetAndMetadata();
        Map<TopicPartition, OffsetAndMetadata> offsetMap = null;
        try {
          offsetMap = offsetFuture.get();
        } catch (InterruptedException e) {
          logger.error("get offset interrupted exception: " + e.getMessage());
          System.exit(-1);
        } catch (ExecutionException e) {
          logger.error("get offset execution exception: " + e.getMessage());
          System.exit(-1);
        }
    
        Map<TopicPartition, Long> topicPartitionOffsetMap = kafkaConsumer.endOffsets(offsetMap.keySet());
    
        List<OffsetEntity> offsetEntityList = new ArrayList<>();
        for(Map.Entry<TopicPartition, OffsetAndMetadata> offsetEntry : offsetMap.entrySet()) {
          TopicPartition topicPartition = offsetEntry.getKey();
          OffsetAndMetadata offsetAndMetadata = offsetEntry.getValue();
    
          OffsetEntity offsetEntity = new OffsetEntity();
          offsetEntity.setRegion(region);
          offsetEntity.setGroupId(groupId);
          offsetEntity.setTopic(topicPartition.topic());
          offsetEntity.setPartition(topicPartition.partition());
    
          Long logEndOffset = topicPartitionOffsetMap.get(topicPartition);
          Long offset = offsetAndMetadata.offset();
          offsetEntity.setLogEndOffset(logEndOffset);
          offsetEntity.setOffset(offset);
          offsetEntity.setLag(logEndOffset - offset);
          offsetEntityList.add(offsetEntity);
        }
        return offsetEntityList;
      }
    

    最终grafana中展示效果

    grafana的配置可以通过https://github.com/samrui/kafkawarden/blob/master/grafana/dashboard.json 文件导入

    grafana.png

    相关文章

      网友评论

          本文标题:如何用 influxDB + grafana 监控 kafka

          本文链接:https://www.haomeiwen.com/subject/jzoorctx.html