美文网首页
在sanic 中使用 aio kafka

在sanic 中使用 aio kafka

作者: 不如做一只猫 | 来源:发表于2018-08-24 15:24 被阅读134次

    use aio kafka in sanic

    一. producer

    1. install

    pip install aiokafka
    

    2. initialization producer

    eg:

    @app.listener('before_server_start')
    async def server_init(app, loop):
        app.producer = AIOKafkaProducer(loop=loop, value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                                        bootstrap_servers=kafka_host)
        await app.producer.start()
    

    3. use

    await app.producer.send("topic_name", dict)
    

    二. consumer

    1. initialization consumer

    eg:

    async def process(consumer):
        async for msg in consumer:
            await func(msg)  # msg 处理函数 必须使用协程
    
    @app.listener("after_server_start")  # 必须 after_server_start
    async def after_server(app, loop):
        app.consumer = AIOKafkaConsumer(
            'user',
            loop=loop, bootstrap_servers=kafka_host,
            group_id="my-group4343")
        await app.consumer.start()
        await process(app.consumer)
    

    2. use

    自定义协程 func function

    相关文章

      网友评论

          本文标题:在sanic 中使用 aio kafka

          本文链接:https://www.haomeiwen.com/subject/gdabiftx.html