美文网首页
Python Kafka 样例

Python Kafka 样例

作者: LoveAnny | 来源:发表于2020-11-05 16:17 被阅读0次

随着数据挖掘和人工智能的兴起,Python语言逐渐火爆起来,鉴于现在大量流式数据,Python作为一门热门语言自然要对接Kafka。

依赖包安装

Python如果要接入kafka 自然首先要安装 Kafka JDK 安装包,可以通过如下命令安装

pip install kafka-python --trusted-host mirrors.aliyun.com -i http://mirrors.aliyun.com/pypi/simple/

Demo 代码

当安装依赖库之后就可以进行消费和发送消息了,与Java 类似,需要提供 bootstrap_servers 与Topic 等信息,比如拿消费为例。

  1. 首先建立Consumer 实例
  2. 选择消费方式,是定时拉取,还是实时处理
  3. 实现消费逻辑
from kafka import KafkaConsumer, KafkaProducer
import json


# 定义 Topic 与 kafka_server  等变量
topic = "Demo"
kafka_server = "kafka.server:9200"
group_id = "Demo-Group"

## 创建 Consumer 实例
consumer = KafkaConsumer(
    topic, bootstrap_servers=kafka_server, group_id=group_id, value_deserializer=lambda v: json.loads(v, encoding="utf-8"))

# 定义Kafka 消息处理逻辑
def work(data):
    print("Kafka 消息处理逻辑")

# 实时处理kafka 消息
for msg in consumer:
    _topic = msg.topic
    _partition = msg.partition
    _offset = msg.offset
    _key = msg.key
    work(msg.value)

如上代码如果指定了 group_id 则下次重启程序时会从上次停止消费地方处理,如果未指定则从最新位置消息忽略之前所有消息。

指定Offset

有时在程序启动时需要从指定Offset 消费,则可以采用如下代码


# 指定消费开始位置
offset = 11000
# 获取当前consumer分配的Partition列表
pts = consumer.assignment()

# 到最旧可以用消息位置
#consumer.seek_to_beginning()
# 移到最新可用消息位置
#consumer.seek_to_end()

# 指定 partition 的Offset
for pt in pts:
    consumer.seek(pt,offset)

定时拉取

在特定时候为了性能考虑,需要以固定时间从kafka中拉取数据列表,这样可以降低服务端压力


# 指定拉取数据间隔
poll_interval = 2000

while True:
    msgs = consumer.poll(poll_interval,max_records=50)
    for msg in msgs:
        work(msg.value)

如上代码 max_records 表示一次拉取最多纪录条数。

发送消息

除了接受消息同样可以发送消息,发送消息相对而言简单一些。大致流程就两个步骤。

  1. 创建 Producer 实例
  2. 通过Producer 发送
import json

bootstrap_servers = "kafka-server"
producer_topic="demo-topic"
int_key = "key"
msg = {"data":"我是测试消息"}

producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                                          key_serializer=lambda v: json.dumps(
                                              v, ensure_ascii=False).encode('utf-8'),
                                          value_serializer=lambda v: json.dumps(
                                              v, ensure_ascii=False).encode('utf-8'),
                                          max_block_ms=5000)

producer.send(topic=producer_topic, key=int_key, value=msg)
producer.flush()

上面代码定义了Kafka 地址和Topic ,需要注意的地方是 key_serializer 和value_serializer,这两个属性表示了发送的Key 和Value 如何进行序列化。


以上只是 Kafka 基本用法,在机器学习和深度学习应用中,特别是针对自然语言处理任务,Kafka作为模型接入端是非常常用的。

相关文章

网友评论

      本文标题:Python Kafka 样例

      本文链接:https://www.haomeiwen.com/subject/mxqavktx.html