除了用kafka自带的命令行方式和java方式外,现在可以用kafka-python中的KafkaAdminClient来获取kafka运行时消费者在topic的分区中的偏移量信息。
首先需要安装kafka-python包,一条命令就搞定:
pip install -U kafka-python
然后就创建KafkaAdminClient对象
from kafka.admin import KafkaAdminClient
adminClient = KafkaAdminClient(bootstrap_servers='192.168.*.*:9092') #运行时填入kafka服务器实际地址,如果是多台服务器,该参数就写成列表形式。
然后就可以获取消费者组的列表:
adminClient.list_consumer_groups()
返回值: [('console-consumer-95318', 'consumer'),
('cgtrcSpring', 'consumer'),
('msgSealBackup', 'consumer'),
('es2', 'consumer'),
('pushSubscription1', 'consumer'),
('es1', 'consumer'),
('es4', 'consumer'),
('es3', 'consumer'),
('python_client_1', 'consumer')]
list_consumer_groups()的返回值是一个元组组成的列表,每个元组前一个元素是消费者组的名称,第二个元素是消费组协议类型。
得到消费者组后,利用list_consumer_group_offsets就可以获得某一个消费者组在各topic和分区的偏移量。下面的语句取es4消费组的偏移量。返回值是一个字典,字典的key是TopicPartition,值是OffsetAndMetada 。
adminClient.list_consumer_group_offsets('es4')
返回值: {TopicPartition(topic='log-eport-bizlog-push', partition=0): OffsetAndMetadata(offset=1781083, metadata=''),
TopicPartition(topic='log-eport-bizlog-push', partition=1): OffsetAndMetadata(offset=1784580, metadata=''),
TopicPartition(topic='log-eport-bizlog-push', partition=2): OffsetAndMetadata(offset=1784585, metadata=''),
TopicPartition(topic='log-eport-msglog-gatewaylog-eport-msglog-gateway', partition=0): OffsetAndMetadata(offset=17740, metadata=''),
TopicPartition(topic='log-eport-msglog-gatewaylog-eport-msglog-gateway', partition=1): OffsetAndMetadata(offset=17736, metadata=''),
TopicPartition(topic='log-eport-msglog-gatewaylog-eport-msglog-gateway', partition=2): OffsetAndMetadata(offset=17739, metadata='')}
从返回值中很容易就可以得到消费者组在每个topic的各个分区的偏移量。
用python来获取这种运行时的信息比java简单快捷,相比命令行方式,可以对取到的信息进行后续处理,和其他应用集成也比较方便。
kafka-python还提供了往kafka发送消息和从kafka读取消息的功能,详情参见如下:
kafka-python 项目地址: https://github.com/dpkp/kafka-python
文档地址:https://kafka-python.readthedocs.io/en/master/apidoc/modules.html
网友评论