官方手册,点关注不迷路
https://kafka-python.readthedocs.io/en/master/usage.html
更多内容可参考手册,本实例只讲几个基本常用内容。
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['192.168.64.11:9092', '192.168.64.12:9092', '192.168.64.13:9092'],
value_serializer=lambda m: json.dumps(m).encode('ascii')
)
msg = {
"sleep_time": 10,
"db_config": {
"database": "test_1",
"host": "xxxx",
"user": "root",
"password": "root"
},
"table": "msg",
"msg": "Hello World"
}
# 向指定topic 的 指定partition发送数据
producer.send('hmcf_test', msg, partition=0)
print('over')
producer.flush()
print('flush over')
producer.close()
print('guanbi')
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
def main():
consumer = KafkaConsumer(bootstrap_servers=['192.168.64.11:9092', '192.168.64.12:9092', '192.168.64.13:9092'],
group_id='001',
auto_offset_reset='earliest')
# ========================================================================================================
# 指定消费哪个topic和topic里面的哪个partition
# consumer.assign([TopicPartition(topic='hmcf_test', partition=0), TopicPartition(topic='hmcf_test', partition=1)])
# # 查看topic 有哪几个分区
# # print(consumer.partitions_for_topic("hmcf_test"))
#
# # 动态偏移指针,获取指定位置开始的数据
# consumer.seek(TopicPartition(topic='hmcf_test', partition=0), 0)
# consumer.seek(TopicPartition(topic='hmcf_test', partition=1), 0)
# for msg in consumer:
# recv = "topic:%s partition:%d offset:%d key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
# print(recv)
# ========================================================================================================
# subscribe 和 assign 只能2选1
consumer.subscribe(topics=['hmcf_test'])
while 1:
# 主动拉取数据
msg = consumer.poll(timeout_ms=5)
print(msg)
time.sleep(2)
if __name__ == '__main__':
main()
网友评论