消费kafka数据,方式一
#pip install kafka-python#安装kafkapython库
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(
#'topic1',#主题
#重置偏移量,可以订阅最早的消息
#earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
#latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
#none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
# auto_offset_reset='earliest',
#默认为true,即默认是自动提交offset的,不过设置为false,需要手动提交。手动提交分为同步提交,异步提交,同步+异步提交。
enable_auto_commit=False,
group_id='group_id',#消费分组
bootstrap_servers=['192.168.1.1:31553','192.168.1.2:31554'])
consumer.subscribe(topics=('topic1','topic2'))#订阅多个主题的消息
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value.decode()))
#consumer.commit() #同步提交
#consumer.commit_async(callback=function) #异步提交,function为回调函数
#异步+同步的方式,即正常以异步提交,最后消费者退出时以同步的方式提交,同步提交可以放在finally块中。
消费kafka数据,方式二
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(
enable_auto_commit=False,
group_id='group_id',
bootstrap_servers=['192.168.1.1:31553','192.168.1.2:31554'])
tp = TopicPartition(topic='test', partition=0)
consumer.assign([tp])#指定多个主题分区,list形式
consumer.seek_to_beginning()#将偏移量设置为最早的
consumer.seek(tp,888)#指定偏移量
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value.decode()))
将消息写入kafka
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers=['192.168.1.1:31553','192.168.1.2:31554'])
msg = {'a':'xxx','b':'ccc'}
producer.send('dcar-company-news',bytes(json.dumps(msg,ensure_ascii=False),'utf-8'),partition=0)
producer.close()
网友评论