美文网首页
kafka 积压监控

kafka 积压监控

作者: 曹建发 | 来源:发表于2019-05-16 11:31 被阅读0次

kafka 中心化采集方案

概述

通过中心化采集获取kafkaconsumer端的offset指标。kafka的offset指标可以从两个地方获取(本文仅讨论kafka集群管理offset的情况,业务也可以自己管理consumer的offset,比如spark,flink都使用hdfs或者其他存储代替了kafka 内置的consumer offset管理)

  1. zk目录
  2. kafka的内置topic

Older versions of Kafka (pre 0.9) store offsets in ZK only, while newer version of Kafka, by default store offsets in an internal Kafka topic called __consumer_offsets (newer version might still commit to ZK though).

The advantage of committing offsets to the broker is, that the consumer does not depend on ZK and thus clients only need to talk to brokers which simplifies the overall architecture. Also, for large deployments with a lot of consumers, ZK can become a bottleneck while Kafka can handle this load easily (committing offsets is the same thing as writing to a topic and Kafka scales very well here -- in fact, by default __consumer_offsets is created with 50 partitions IIRC).

KAFKA 版本 / 客户端版本 0.9 之前的版本 0.9 之后(包括0.9)
0.9 之前的版本 Offset Storage : Zookeeper Offset Storage : Zookeeper
0.9 之后(包括0.9) Offset Storage : Zookeeper Offset Storage : KAFKA

如何得到积压指标?

第一步:zk 目录获取kafka积压指标

kafka积压指标在zk中保存的路径:

格式:/consumers/{CONSUMER_GROUP_ID}/offsets/{TOPIC_NAME}/{PARTITION_NUMBER}

示例:/consumers/buy_send_fragment/offsets/buy_send_fragment/5

[zk: goback-4-006.m6.momo.com:2281(CONNECTED) 10] get  /consumers/buy_send_fragment/offsets/buy_send_fragment/5
768371297
cZxid = 0x72d31db19
ctime = Mon Nov 27 18:10:19 CST 2017
mZxid = 0x9a42fc9d9
mtime = Sun May 05 12:05:23 CST 2019
pZxid = 0x72d31db19
cversion = 0
dataVersion = 12950345
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0

第二步:kafka内置topic

topic名称:__consumer_offsets

消息格式:[consumer group id ,topic name, partition] :: [offset, metadata, commitTimestamp, expireTimestamp]

[momobot@hubble-kafka-001.dx.momo.com kafka_2.12-0.10.2.1]$ bin/kafka-console-consumer.sh --bootstrap-server  hubble-kafka-001.dx.momo.com:9092  --topic __consumer_offsets  --value-deserializer org.apache.kafka.common.serialization.ByteArrayDeserializer  --key-deserializer org.apache.kafka.common.serialization.ByteArrayDeserializer --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
[kafka_metric_storm_consumer,hubble-kafka-broker-monitor,1]::[OffsetMetadata[43810024168,NO_METADATA],CommitTime 1557042759641,ExpirationTime 1557129159641]
[kafka_metric_storm_consumer,hubble-kafka-broker-monitor,3]::[OffsetMetadata[72656598502,NO_METADATA],CommitTime 1557042759677,ExpirationTime 1557129159677]

结构化数据:

{
    "topic":"topic-name",
    "partition":11,
    "group":"console-consumer-45567",
    "version":2,
    "offset":15,
    "metadata":"",
    "commitTimestamp":1501542796444,
    "expireTimestamp":1501629196444
}

第三步: 获取集群topic 的LEO

通过kafka admin接口获取topic的LEO,不同版本的api有所不同。

第四步:records-lag

Records-lag = LEO - consumerOffset

总结

积压量等于topic的LEO减去消费者最新的偏移位置offset.即:

records-lag = LEO - consumerOffset

相关文章

网友评论

      本文标题:kafka 积压监控

      本文链接:https://www.haomeiwen.com/subject/sfgbaqtx.html