美文网首页
python3读写kafka

python3读写kafka

作者: 一飞冲不了天 | 来源:发表于2019-10-09 15:56 被阅读0次

消费kafka数据,方式一

#pip install kafka-python#安装kafkapython库
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(
    #'topic1',#主题
    #重置偏移量,可以订阅最早的消息
    #earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    #latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
    #none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
    # auto_offset_reset='earliest',
  #默认为true,即默认是自动提交offset的,不过设置为false,需要手动提交。手动提交分为同步提交,异步提交,同步+异步提交。
    enable_auto_commit=False,
    group_id='group_id',#消费分组
    bootstrap_servers=['192.168.1.1:31553','192.168.1.2:31554'])
consumer.subscribe(topics=('topic1','topic2'))#订阅多个主题的消息

for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
    message.offset, message.key,
    message.value.decode()))
  #consumer.commit() #同步提交
  #consumer.commit_async(callback=function) #异步提交,function为回调函数
 #异步+同步的方式,即正常以异步提交,最后消费者退出时以同步的方式提交,同步提交可以放在finally块中。

消费kafka数据,方式二

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(
    enable_auto_commit=False,
    group_id='group_id',
    bootstrap_servers=['192.168.1.1:31553','192.168.1.2:31554'])
tp = TopicPartition(topic='test', partition=0)
consumer.assign([tp])#指定多个主题分区,list形式
consumer.seek_to_beginning()#将偏移量设置为最早的
consumer.seek(tp,888)#指定偏移量
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
    message.offset, message.key,
    message.value.decode()))

将消息写入kafka

from kafka import KafkaProducer
import json

producer = KafkaProducer(bootstrap_servers=['192.168.1.1:31553','192.168.1.2:31554'])
msg = {'a':'xxx','b':'ccc'}
producer.send('dcar-company-news',bytes(json.dumps(msg,ensure_ascii=False),'utf-8'),partition=0)
producer.close()

相关文章

  • python3读写kafka

    消费kafka数据,方式一 消费kafka数据,方式二 将消息写入kafka

  • python从kafka消费数据且写入kafka

    简单记录一个读写kafka demo

  • Kafka高效读写

    Kafka高效读写数据 1)Kafka本身是分布式集群,同时采用分区技术,并发度高。 2)顺序写磁盘 Kafka的...

  • Java技术面试-Kafka

    1. activeMq与kafka的区别 吞吐量(1)activeMq较低,磁盘随机读写 ;(2)kafka较高...

  • flink与kafka结合

    1、概述 flink提供了一个特有的kafka connector去读写kafka topic的数据。flink消...

  • Kafka的可靠性分析

    阅读以下内容你将了解到:1.Kafka的副本中的可靠性剖析2.Kafka为什么不支持读写分离?(包括读写分离带来的...

  • Flink读写Kafka

    本文样例基于flink 1.8.0版本介绍如何通过flink读写kafka数据 完整样例代码 另一种方式为 注意上...

  • 「Kafka深度解析」快速入门

    Kafka特性 顺序读写的方式访问磁盘,从而避免随机读写磁盘导致的性能瓶颈2.支持批量读写消息,并且会对消息进行批...

  • 分布式

    ZK Kafka 分布式系统所依赖的基础设施 读写分离

  • 如何快速掌握一门编程语言(python为例)

    环境: IDE(anaconda,pycharm) 线上运行环境(python3) 语法: 基本读写语法(prin...

网友评论

      本文标题:python3读写kafka

      本文链接:https://www.haomeiwen.com/subject/pxegpctx.html