美文网首页Apache Kafka消息中间件
kafka消费者状态检查—消费的offset是否滞后/堆积

kafka消费者状态检查—消费的offset是否滞后/堆积

作者: sheen口开河 | 来源:发表于2018-12-13 11:24 被阅读27次

[TOC]

对于kafka消费者来说,我们通常需要监控或者检查一下consumer的状态,除了进程本身的状态和系统状态,最关心的可能就是消费者的消费速度是不是太慢,有没有消息滞后/堆积了,对于高版本的kafka,kafka自身就提供了比较方便的命令,而对于低版本的kafka,可能需要自己通过命令组合查看,或者安装第三方监控应用。

本文提供了高低版本通过命令的方式查看consumer消费状态的方法,供参考。

1. kafka_0.8.x系列

1.1 要求

  • 消费者存储offset在zk
  • 需要放一个zk运行包到脚本执行的环境,执行时需要zk客户端;即需要将zk的bin加入到PATH
  • 需要放一个kafka运行包到脚本执行环境,执行时需要kafka客户端;即需要将kakfa的bin加入到PATH

最简单的做法就是在脚本部署机器上放上zk和kafka的安装包,然后配置好各自的环境变量,也就是要能够直接执行zkCli.sh/kafka-topics.sh等命令

1.2 脚本

#kafka broker节点地址,如果有多个,以逗号隔开
KAFKA_BROKER_LIST=172.21.37.178:39091
#监控的consumer所消费的kafka topic
KAFKA_TOPIC=risk_info
#topic的分区数量
KAFKA_TOPIC_PARTITION_COUNT=12
#zk地址和端口列表,,如果有多个,以逗号隔开
ZOOKEEPER_LIST=172.21.34.92:32181
#consumer的group id
CONSUMER_GROUP_ID=kafka0822_test1

CHECK_RES_FILE=consumer_check.log
echo "ATTENTION: this script is useful for kafka 0.8.x and consumer with zookeeper."
echo "start to fetch kafka consumer status now. please wait for about 10s. result will be put in file $CHECK_RES_FILE"
offsets[0]=0
consumed[0]=0
#kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKA_BROKER_LIST --topic $KAFKA_TOPIC --time -2
echo "START----------------------------------------------------------------------------" >>  $CHECK_RES_FILE
echo "check kafka latest offset command: kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKA_BROKER_LIST --topic $KAFKA_TOPIC --time -1" >> $CHECK_RES_FILE
eval $(kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKA_BROKER_LIST --topic $KAFKA_TOPIC --time -1|grep -v kafka|grep -v grep|awk -F: '{print "offsets["(FNR-1)"]="$3}')
#echo "kafka latest offset:"  >> $CHECK_RES_FILE
#for var in ${offsets[@]};
#do
#    echo $var  >> $CHECK_RES_FILE
#done

for ((i=0; i<$KAFKA_TOPIC_PARTITION_COUNT; i++))
do
    if [ $i == 0 ]; then
        echo "check kafka consumed offset command:zkCli.sh -server $ZOOKEEPER_LIST get /consumers/$CONSUMER_GROUP_ID/offsets/$KAFKA_TOPIC/$i|tail -1"  >> $CHECK_RES_FILE
    fi
    consumed[$i]=`zkCli.sh -server $ZOOKEEPER_LIST get /consumers/$CONSUMER_GROUP_ID/offsets/$KAFKA_TOPIC/$i|tail -1`
done

for ((i=0; i<$KAFKA_TOPIC_PARTITION_COUNT; i++))
do
    LAG=`expr ${offsets[$i]} - ${consumed[$i]}`
    echo "topic:$KAFKA_TOPIC, partition:$i, consumer group:$CONSUMER_GROUP_ID, latest offset:${offsets[$i]}, consumed offset:${consumed[$i]}, lag:$LAG"  >> $CHECK_RES_FILE
done
echo "END----------------------------------------------------------------------------" >> $CHECK_RES_FILE

执行时,需要修改的地方为

#kafka 节点host和端口,如果写多个,用逗号隔开
KAFKA_BROKER_LIST=172.21.37.178:39091
#要查看的topic
KAFKA_TOPIC=risk_info
#topic的分区数量,需要确定好
KAFKA_TOPIC_PARTITION_COUNT=12

#zookeeper的host和端口,如果写多个,用逗号隔开
ZOOKEEPER_LIST=172.21.34.92:32181
#consumer group
CONSUMER_GROUP_ID=kafka0822_test1

执行结果为:

结果文件默认保存在kafka_consumer_check_res.log中
cat kafka_consumer_check_res.log 

显示:

START----------------------------------------------------------------------------
check kafka latest offset command: kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.21.37.178:39091 --topic risk_info --time -1
check kafka consumed offset command:zkCli.sh -server 172.21.34.92:32181 get /consumers/kafka0822_test1/offsets/risk_info/0|tail -1
topic:risk_info, partition:0, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:1, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:2, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:3, consumer group:kafka0822_test1, latest offset:1083, consumed offset:1082, lag:1
topic:risk_info, partition:4, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:5, consumer group:kafka0822_test1, latest offset:1083, consumed offset:1081, lag:2
topic:risk_info, partition:6, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:7, consumer group:kafka0822_test1, latest offset:1083, consumed offset:1082, lag:1
topic:risk_info, partition:8, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:9, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:10, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:11, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1080, lag:2
END----------------------------------------------------------------------------

具体只需要关注结果文件最后的lag值,以确定是否滞后

1.3 检查建议

  • 如果每个分区的堆积在两位数以内,且没有增长趋势,则表示consumer没有大问题
  • 如果堆积数量不断增长,则需要检查消费者是否有问题

由于命令行的zk客户端执行较慢,因此如果分区较多的时候,在kafka吞吐量高峰时期,靠后的分区检查结果可能显示LAG为负数,这是因为高峰时期offset更新的比较快,我们从kafka里查询offset只在脚本的开头执行了一次,而zk命令的执行过程却要很久(每个分区要执行一次查询请求命令),此时zk里的offset已经被consumer提交成相对更新的,因此出现了二者相减出现了负数

2. kafka 0.9.x系列

具体是不是0.9不记得了,kafka的版本比较多,如果0.9.x不支持,可以继续使用上一节的脚本

命令:

//首先我们需要知道当前有哪些消费者group,如果已知,此步骤可忽略
bin/kafka-consumer-groups.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --list
bin/kafka-consumer-groups.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --group GROUP_NAME --describe
  • BROKER_HOST是kafka server的ip地址,PORTt是server的监听端口。多个host port之间用逗号隔开
  • 第一条命令是获取group列表,一般而言,应用是知道消费者group的,通常在应用的配置里,如果已知,该步骤可以省略
  • 第二条命令是查看具体的消费者group的详情信息,需要给出group的名称

例如,首先列出消费者group列表

bin/kafka-consumer-groups.sh --bootstrap-server 172.21.37.194:39092 --list
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

console-consumer-89764
console-consumer-45728

我们以console-consumer-8976为例,查看详情

bin/kafka-consumer-groups.sh --bootstrap-server 172.21.37.194:39092 --group console-consumer-89764 --describe
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).


TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
tttttttt_topic                 0          641             641             0          consumer-1-c313db2b-7758-4de0-8cbd-025997d1a4cc   /172.21.37.194                 consumer-1
tttttttt_topic                 1          632             632             0          consumer-1-c313db2b-7758-4de0-8cbd-025997d1a4cc   /172.21.37.194                 consumer-1
tttttttt_topic                 2          699             699             0          consumer-1-c313db2b-7758-4de0-8cbd-025997d1a4cc   /172.21.37.194                 consumer-1

其中

  • TOPIC:该group里消费的topic名称
  • PARTITION:分区编号
  • CURRENT-OFFSET:该分区当前消费到的offset
  • LOG-END-OFFSET:该分区当前latest offset
  • LAG:消费滞后区间,为LOG-END-OFFSET-CURRENT-OFFSET,具体大小需要看应用消费速度和生产者速度,一般过大则可能出现消费跟不上,需要引起应用注意
  • CONSUMER-ID:server端给该分区分配的consumer编号
  • HOST:消费者所在主机
  • CLIENT-ID:消费者id,一般由应用指定

相关文章

网友评论

    本文标题:kafka消费者状态检查—消费的offset是否滞后/堆积

    本文链接:https://www.haomeiwen.com/subject/vyuvhqtx.html