美文网首页
python3读写kafka

python3读写kafka

作者: 身长脚短 | 来源:发表于2019-10-09 15:56 被阅读0次

    消费kafka数据,方式一

    #pip install kafka-python#安装kafkapython库
    from kafka import KafkaConsumer
    from kafka.structs import TopicPartition
    consumer = KafkaConsumer(
        #'topic1',#主题
        #重置偏移量,可以订阅最早的消息
        #earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        #latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
        #none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
        # auto_offset_reset='earliest',
      #默认为true,即默认是自动提交offset的,不过设置为false,需要手动提交。手动提交分为同步提交,异步提交,同步+异步提交。
        enable_auto_commit=False,
        group_id='group_id',#消费分组
        bootstrap_servers=['192.168.1.1:31553','192.168.1.2:31554'])
    consumer.subscribe(topics=('topic1','topic2'))#订阅多个主题的消息
    
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
        message.offset, message.key,
        message.value.decode()))
      #consumer.commit() #同步提交
      #consumer.commit_async(callback=function) #异步提交,function为回调函数
     #异步+同步的方式,即正常以异步提交,最后消费者退出时以同步的方式提交,同步提交可以放在finally块中。
    

    消费kafka数据,方式二

    from kafka import KafkaConsumer
    from kafka.structs import TopicPartition
    consumer = KafkaConsumer(
        enable_auto_commit=False,
        group_id='group_id',
        bootstrap_servers=['192.168.1.1:31553','192.168.1.2:31554'])
    tp = TopicPartition(topic='test', partition=0)
    consumer.assign([tp])#指定多个主题分区,list形式
    consumer.seek_to_beginning()#将偏移量设置为最早的
    consumer.seek(tp,888)#指定偏移量
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
        message.offset, message.key,
        message.value.decode()))
    

    将消息写入kafka

    from kafka import KafkaProducer
    import json
    
    producer = KafkaProducer(bootstrap_servers=['192.168.1.1:31553','192.168.1.2:31554'])
    msg = {'a':'xxx','b':'ccc'}
    producer.send('dcar-company-news',bytes(json.dumps(msg,ensure_ascii=False),'utf-8'),partition=0)
    producer.close()
    

    相关文章

      网友评论

          本文标题:python3读写kafka

          本文链接:https://www.haomeiwen.com/subject/pxegpctx.html