美文网首页pythonPython精选
python3 交互操作 kafka 之 kafka-pytho

python3 交互操作 kafka 之 kafka-pytho

作者: Devops海洋的渔夫 | 来源:发表于2019-06-11 20:15 被阅读78次

    Github地址

    https://github.com/dpkp/kafka-python

    kafka-python库的官网

    https://pypi.org/project/kafka-python/

    kafka-python官网文档

    https://kafka-python.readthedocs.io/en/master/

    使用pip3安装kafka-python

    在阅读kafka-python文档会说明很多安装的方式,这里采用pip3的安装方式。
    pip3 install kafka-python

    D:\pythonProject\kafka_test>pip3 install kafka-python
    Collecting kafka-python
      Downloading https://files.pythonhosted.org/packages/82/39/aebe3ad518513bbb2260dd84ac21e5c30af860cc4c95b32acbd64b9d9d0d/kafka_python-1.4.6-py2.py3-none-any.whl (259kB)
        100% |████████████████████████████████| 266kB 123kB/s
    Installing collected packages: kafka-python
    Successfully installed kafka-python-1.4.6
    You are using pip version 18.1, however version 19.1.1 is available.
    You should consider upgrading via the 'python -m pip install --upgrade pip' command.
    
    D:\pythonProject\kafka_test>
    

    好了,这样就安装完成。下面就根据文档示例执行一下。
    当然,这个执行之前首先要安装好kafka的环境。

    先别急着操作,先来看看这个kafka-python库客户端的相关说明。

    Kafka Python客户端

    用于Apache Kafka分布式流处理系统的Python客户端。kafka-python的功能与官方java客户端非常相似,带有多个pythonic接口(例如,消费者迭代器)。

    kafka-python最适用于较新的代理broker(0.9+),但与旧版本(向0.8.0)向后兼容。某些功能仅在较新的代理上启用。例如,完全协调的消费者群体 - 如果向同一群体中的多个消费者分配动态分区 - 需要使用0.9+ kafka broker。为早期的代理发布支持此功能需要编写和维护自定义领导选举和成员/健康检查代码(可能使用zookeeper或consul)。对于较旧的代理,您可以通过使用诸如chef,ansible等配置管理工具为每个消费者实例手动分配不同的分区来实现类似的功能。这种方法可以正常工作,但它不支持故障时的重新平衡。见<https://kafka-python.readthedocs.io/en/master/compatibility.html >了解更多详情。

    请注意,主分支可能包含未发布的功能。有关发布文档,请参阅readthedocs和/或python的内联帮助。

    >>> pip install kafka-python
    

    看了上面的说明之后,心里大概有了一些概念了,下面来进行一下生产者和消费者的调用示例看看。

    注意:在开始调用之前,首先要配置好kafka的远程调用,避免调试老是报错的坑。
    如果不清楚kafka如何配置远程调用,可以访问这里

    KafkaProducer

    二话不说,直接按照官方文档写出一个示例,如下:

    from kafka import KafkaProducer
    from time import sleep
    
    def start_producer():
        producer = KafkaProducer(bootstrap_servers='192.168.196.129:9092')
        for i in range(0,100000):
            msg = 'msg is ' + str(i)
            producer.send('my_favorite_topic2', msg.encode('utf-8'))
            sleep(3)
    
    if __name__ == '__main__':
        start_producer()
    

    运行启动服务如下:

    执行起来之后,生产者循环发送消息给kafka,这里我没有打印返回的结果。下面来看看消费者端是怎么处理的。

    KafkaConsumer

    上面的进程我一直运行生产者不断发送消息,下面我这边就执行开启消费者接收最新的消息。

    from kafka import KafkaConsumer
    import time
    
    def start_consumer():
        consumer = KafkaConsumer('my_favorite_topic2', bootstrap_servers = '192.168.196.129:9092')
        for msg in consumer:
            print(msg)
            print("topic = %s" % msg.topic) # topic default is string
            print("partition = %d" % msg.offset)
            print("value = %s" % msg.value.decode()) # bytes to string
            print("timestamp = %d" % msg.timestamp)
            print("time = ", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime( msg.timestamp/1000 )) )
    
    if __name__ == '__main__':
        start_consumer()
    

    运行如下:

    从上图可以看到,消费者通过循环就可以不断接收消息进行处理,另外我还对消息的内容进行了相关的拆分解析。

    相关文章

      网友评论

        本文标题:python3 交互操作 kafka 之 kafka-pytho

        本文链接:https://www.haomeiwen.com/subject/kmgvtctx.html