美文网首页
Python Kafka 样例

Python Kafka 样例

作者: LoveAnny | 来源:发表于2020-11-05 16:17 被阅读0次

    随着数据挖掘和人工智能的兴起,Python语言逐渐火爆起来,鉴于现在大量流式数据,Python作为一门热门语言自然要对接Kafka。

    依赖包安装

    Python如果要接入kafka 自然首先要安装 Kafka JDK 安装包,可以通过如下命令安装

    pip install kafka-python --trusted-host mirrors.aliyun.com -i http://mirrors.aliyun.com/pypi/simple/
    

    Demo 代码

    当安装依赖库之后就可以进行消费和发送消息了,与Java 类似,需要提供 bootstrap_servers 与Topic 等信息,比如拿消费为例。

    1. 首先建立Consumer 实例
    2. 选择消费方式,是定时拉取,还是实时处理
    3. 实现消费逻辑
    from kafka import KafkaConsumer, KafkaProducer
    import json
    
    
    # 定义 Topic 与 kafka_server  等变量
    topic = "Demo"
    kafka_server = "kafka.server:9200"
    group_id = "Demo-Group"
    
    ## 创建 Consumer 实例
    consumer = KafkaConsumer(
        topic, bootstrap_servers=kafka_server, group_id=group_id, value_deserializer=lambda v: json.loads(v, encoding="utf-8"))
    
    # 定义Kafka 消息处理逻辑
    def work(data):
        print("Kafka 消息处理逻辑")
    
    # 实时处理kafka 消息
    for msg in consumer:
        _topic = msg.topic
        _partition = msg.partition
        _offset = msg.offset
        _key = msg.key
        work(msg.value)
    
    

    如上代码如果指定了 group_id 则下次重启程序时会从上次停止消费地方处理,如果未指定则从最新位置消息忽略之前所有消息。

    指定Offset

    有时在程序启动时需要从指定Offset 消费,则可以采用如下代码

    
    # 指定消费开始位置
    offset = 11000
    # 获取当前consumer分配的Partition列表
    pts = consumer.assignment()
    
    # 到最旧可以用消息位置
    #consumer.seek_to_beginning()
    # 移到最新可用消息位置
    #consumer.seek_to_end()
    
    # 指定 partition 的Offset
    for pt in pts:
        consumer.seek(pt,offset)
    
    

    定时拉取

    在特定时候为了性能考虑,需要以固定时间从kafka中拉取数据列表,这样可以降低服务端压力

    
    # 指定拉取数据间隔
    poll_interval = 2000
    
    while True:
        msgs = consumer.poll(poll_interval,max_records=50)
        for msg in msgs:
            work(msg.value)
    

    如上代码 max_records 表示一次拉取最多纪录条数。

    发送消息

    除了接受消息同样可以发送消息,发送消息相对而言简单一些。大致流程就两个步骤。

    1. 创建 Producer 实例
    2. 通过Producer 发送
    import json
    
    bootstrap_servers = "kafka-server"
    producer_topic="demo-topic"
    int_key = "key"
    msg = {"data":"我是测试消息"}
    
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                                              key_serializer=lambda v: json.dumps(
                                                  v, ensure_ascii=False).encode('utf-8'),
                                              value_serializer=lambda v: json.dumps(
                                                  v, ensure_ascii=False).encode('utf-8'),
                                              max_block_ms=5000)
    
    producer.send(topic=producer_topic, key=int_key, value=msg)
    producer.flush()
    

    上面代码定义了Kafka 地址和Topic ,需要注意的地方是 key_serializer 和value_serializer,这两个属性表示了发送的Key 和Value 如何进行序列化。


    以上只是 Kafka 基本用法,在机器学习和深度学习应用中,特别是针对自然语言处理任务,Kafka作为模型接入端是非常常用的。

    相关文章

      网友评论

          本文标题:Python Kafka 样例

          本文链接:https://www.haomeiwen.com/subject/mxqavktx.html