美文网首页
在sanic 中使用 aio kafka

在sanic 中使用 aio kafka

作者: 不如做一只猫 | 来源:发表于2018-08-24 15:24 被阅读134次

use aio kafka in sanic

一. producer

1. install

pip install aiokafka

2. initialization producer

eg:

@app.listener('before_server_start')
async def server_init(app, loop):
    app.producer = AIOKafkaProducer(loop=loop, value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                                    bootstrap_servers=kafka_host)
    await app.producer.start()

3. use

await app.producer.send("topic_name", dict)

二. consumer

1. initialization consumer

eg:

async def process(consumer):
    async for msg in consumer:
        await func(msg)  # msg 处理函数 必须使用协程

@app.listener("after_server_start")  # 必须 after_server_start
async def after_server(app, loop):
    app.consumer = AIOKafkaConsumer(
        'user',
        loop=loop, bootstrap_servers=kafka_host,
        group_id="my-group4343")
    await app.consumer.start()
    await process(app.consumer)

2. use

自定义协程 func function

相关文章

网友评论

      本文标题:在sanic 中使用 aio kafka

      本文链接:https://www.haomeiwen.com/subject/gdabiftx.html