随着数据挖掘和人工智能的兴起,Python语言逐渐火爆起来,鉴于现在大量流式数据,Python作为一门热门语言自然要对接Kafka。
依赖包安装
Python如果要接入kafka 自然首先要安装 Kafka JDK 安装包,可以通过如下命令安装
pip install kafka-python --trusted-host mirrors.aliyun.com -i http://mirrors.aliyun.com/pypi/simple/
Demo 代码
当安装依赖库之后就可以进行消费和发送消息了,与Java 类似,需要提供 bootstrap_servers 与Topic 等信息,比如拿消费为例。
- 首先建立Consumer 实例
- 选择消费方式,是定时拉取,还是实时处理
- 实现消费逻辑
from kafka import KafkaConsumer, KafkaProducer
import json
# 定义 Topic 与 kafka_server 等变量
topic = "Demo"
kafka_server = "kafka.server:9200"
group_id = "Demo-Group"
## 创建 Consumer 实例
consumer = KafkaConsumer(
topic, bootstrap_servers=kafka_server, group_id=group_id, value_deserializer=lambda v: json.loads(v, encoding="utf-8"))
# 定义Kafka 消息处理逻辑
def work(data):
print("Kafka 消息处理逻辑")
# 实时处理kafka 消息
for msg in consumer:
_topic = msg.topic
_partition = msg.partition
_offset = msg.offset
_key = msg.key
work(msg.value)
如上代码如果指定了 group_id 则下次重启程序时会从上次停止消费地方处理,如果未指定则从最新位置消息忽略之前所有消息。
指定Offset
有时在程序启动时需要从指定Offset 消费,则可以采用如下代码
# 指定消费开始位置
offset = 11000
# 获取当前consumer分配的Partition列表
pts = consumer.assignment()
# 到最旧可以用消息位置
#consumer.seek_to_beginning()
# 移到最新可用消息位置
#consumer.seek_to_end()
# 指定 partition 的Offset
for pt in pts:
consumer.seek(pt,offset)
定时拉取
在特定时候为了性能考虑,需要以固定时间从kafka中拉取数据列表,这样可以降低服务端压力
# 指定拉取数据间隔
poll_interval = 2000
while True:
msgs = consumer.poll(poll_interval,max_records=50)
for msg in msgs:
work(msg.value)
如上代码 max_records 表示一次拉取最多纪录条数。
发送消息
除了接受消息同样可以发送消息,发送消息相对而言简单一些。大致流程就两个步骤。
- 创建 Producer 实例
- 通过Producer 发送
import json
bootstrap_servers = "kafka-server"
producer_topic="demo-topic"
int_key = "key"
msg = {"data":"我是测试消息"}
producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
key_serializer=lambda v: json.dumps(
v, ensure_ascii=False).encode('utf-8'),
value_serializer=lambda v: json.dumps(
v, ensure_ascii=False).encode('utf-8'),
max_block_ms=5000)
producer.send(topic=producer_topic, key=int_key, value=msg)
producer.flush()
上面代码定义了Kafka 地址和Topic ,需要注意的地方是 key_serializer 和value_serializer,这两个属性表示了发送的Key 和Value 如何进行序列化。
以上只是 Kafka 基本用法,在机器学习和深度学习应用中,特别是针对自然语言处理任务,Kafka作为模型接入端是非常常用的。
网友评论