美文网首页
python操作kafka

python操作kafka

作者: Hmcf | 来源:发表于2019-11-23 15:34 被阅读0次

官方手册,点关注不迷路
https://kafka-python.readthedocs.io/en/master/usage.html

更多内容可参考手册,本实例只讲几个基本常用内容。

import json
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['192.168.64.11:9092', '192.168.64.12:9092', '192.168.64.13:9092'],
                         value_serializer=lambda m: json.dumps(m).encode('ascii')
                         )

msg = {
    "sleep_time": 10,
    "db_config": {
        "database": "test_1",
        "host": "xxxx",
        "user": "root",
        "password": "root"
    },
    "table": "msg",
    "msg": "Hello World"
}
# 向指定topic 的 指定partition发送数据
producer.send('hmcf_test', msg, partition=0)
print('over')
producer.flush()
print('flush over')
producer.close()
print('guanbi')

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

def main():
    consumer = KafkaConsumer(bootstrap_servers=['192.168.64.11:9092', '192.168.64.12:9092', '192.168.64.13:9092'],
                             group_id='001',
                             auto_offset_reset='earliest')

    # ========================================================================================================
    # 指定消费哪个topic和topic里面的哪个partition
    # consumer.assign([TopicPartition(topic='hmcf_test', partition=0), TopicPartition(topic='hmcf_test', partition=1)])
    # # 查看topic 有哪几个分区
    # # print(consumer.partitions_for_topic("hmcf_test"))
    #
    # # 动态偏移指针,获取指定位置开始的数据
    # consumer.seek(TopicPartition(topic='hmcf_test', partition=0), 0)
    # consumer.seek(TopicPartition(topic='hmcf_test', partition=1), 0)
    # for msg in consumer:
    #     recv = "topic:%s  partition:%d offset:%d key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    #     print(recv)
    # ========================================================================================================
    # subscribe 和 assign 只能2选1
    consumer.subscribe(topics=['hmcf_test'])
    while 1:
        # 主动拉取数据
        msg = consumer.poll(timeout_ms=5)
        print(msg)
        time.sleep(2)


if __name__ == '__main__':
    main()

相关文章

网友评论

      本文标题:python操作kafka

      本文链接:https://www.haomeiwen.com/subject/svyhwctx.html