美文网首页DjangoPython
Python 将async函数转为基于同一个EventLoop实

Python 将async函数转为基于同一个EventLoop实

作者: slords | 来源:发表于2022-10-23 13:39 被阅读0次

    背景:

    1. 主体业务使用的是基于async函数的异步处理的框架;
    2. 连接池等资源基于EventLoop进行缓存,复用和调用;
    3. 需要Celery进行后台任务,目前版本Celery对于async并不能良好支持,需要把async转为sync;
    4. 如果每次生成一个新的EventLoop实例会导致连接池等资源无法得到重用。

    目标:

    • 构建一个装饰器可以将async函数转为sync函数并在执行时重用EventLoop实例。

    其他:

    • 之前一直使用asgiref将async函数转化为sync进行,然而在一般使用场景下,async_to_sync每次调用会创建一个新的EventLoop实例,并以run_。所以每次都会重新创建一套连接池资源,并在下次获取资源时发现其对应的EventLoop实例已关闭后将其全部释放。

    代码:

    import asyncio
    import functools
    import threading
    from typing import Any, Optional
    
    # 设置全局的EventLoop
    LOOP = asyncio.get_event_loop()
    
    
    class CallResult:
    
        result: Any = None
        exception: Optional[BaseException] = None
    
    
    # async_to_sync 装饰器
    def async_to_sync(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            call_result = CallResult()
            event = threading.Event()  # 用于阻塞等待运行结果
    
            async def wrapper2():
                try:
                    call_result.result = await func(*args, **kwargs)
                except BaseException as e:
                    call_result.exception = e  # 写入异常
                finally:
                    event.set()
    
            # 使用 全局EventLoop将wrapper2以task的方式执行
            LOOP.call_soon_threadsafe(LOOP.create_task, wrapper2())
            event.wait()  # 等待event激活 返回结果
            if call_result.exception:
                raise call_result.exception
            return call_result.result
    
        return wrapper
    
    
    # 正常的异步函数
    @async_to_sync
    async def go():
        print('current_loop: ', id(asyncio.get_event_loop()))
        print('GLOBAL LOOP: ', id(LOOP))
        print('current_loop is GLOBAL LOOP: ', LOOP is asyncio.get_event_loop())
    
    
    # 抛出异常的异步函数
    @async_to_sync
    async def raise_value_error():
        raise ValueError(id(LOOP))
    
    # 下述loop_thread, start_loop, stop_loop可整合为一个类,这里为了方便阅读写成函数调用
    loop_thread: threading
    
    
    # 将全局EventLoop设为运行状态
    def start_loop():
        global loop_thread
        loop_thread = threading.Thread(target=LOOP.run_forever)
        loop_thread.start()
    
    
    # 将全局EventLoop关闭,正常的服务可以不用写,这个是为了示例代码可以正常结束用的
    def stop_loop():
    
        @async_to_sync
        async def stop():
            print('Loop stop')
            LOOP.stop()
    
        stop()
        print('Loop close')
        LOOP.close()
        loop_thread.join()
    
    
    if __name__ == '__main__':
        start_loop()
        for i in range(10):
            print(f'------------{i:02}------------')
            go()
        print('[run go] end')
        try:
            raise_value_error()
        except ValueError:
            import traceback
            # 使用标准输出,确保内容输出顺序一致
            print(traceback.format_exc())
            print('[raise_value_error] end')
    
        print('stop loop')
        stop_loop()
        print('stop loop called', flush=True)
    

    输出如下:

    ------------00------------
    current_loop:  2224110182984
    GLOBAL LOOP:  2224110182984
    current_loop is GLOBAL LOOP:  True
    ------------01------------
    current_loop:  2224110182984
    GLOBAL LOOP:  2224110182984
    current_loop is GLOBAL LOOP:  True
    ------------02------------
    current_loop:  2224110182984
    GLOBAL LOOP:  2224110182984
    current_loop is GLOBAL LOOP:  True
    ------------03------------
    current_loop:  2224110182984
    GLOBAL LOOP:  2224110182984
    current_loop is GLOBAL LOOP:  True
    ------------04------------
    current_loop:  2224110182984
    GLOBAL LOOP:  2224110182984
    current_loop is GLOBAL LOOP:  True
    ------------05------------
    current_loop:  2224110182984
    GLOBAL LOOP:  2224110182984
    current_loop is GLOBAL LOOP:  True
    ------------06------------
    current_loop:  2224110182984
    GLOBAL LOOP:  2224110182984
    current_loop is GLOBAL LOOP:  True
    ------------07------------
    current_loop:  2224110182984
    GLOBAL LOOP:  2224110182984
    current_loop is GLOBAL LOOP:  True
    ------------08------------
    current_loop:  2224110182984
    GLOBAL LOOP:  2224110182984
    current_loop is GLOBAL LOOP:  True
    ------------09------------
    current_loop:  2224110182984
    GLOBAL LOOP:  2224110182984
    current_loop is GLOBAL LOOP:  True
    [run go] end
    Traceback (most recent call last):
      File "<PythonFile>", line 86, in <module>
        raise_value_error()
      File "<PythonFile>", line 35, in wrapper
        raise call_result.exception
      File "<PythonFile>", line 25, in wrapper2
        call_result.result = await func(*args, **kwargs)
      File "<PythonFile>", line 52, in raise_value_error
        raise ValueError(id(LOOP))
    ValueError: 2224110182984
    
    [raise_value_error] end
    stop loop
    Loop stop
    Loop close
    stop loop called
    

    相关文章

      网友评论

        本文标题:Python 将async函数转为基于同一个EventLoop实

        本文链接:https://www.haomeiwen.com/subject/rqxgzrtx.html