背景:
- 主体业务使用的是基于async函数的异步处理的框架;
- 连接池等资源基于EventLoop进行缓存,复用和调用;
- 需要Celery进行后台任务,目前版本Celery对于async并不能良好支持,需要把async转为sync;
- 如果每次生成一个新的EventLoop实例会导致连接池等资源无法得到重用。
目标:
- 构建一个装饰器可以将async函数转为sync函数并在执行时重用EventLoop实例。
其他:
- 之前一直使用asgiref将async函数转化为sync进行,然而在一般使用场景下,async_to_sync每次调用会创建一个新的EventLoop实例,并以run_。所以每次都会重新创建一套连接池资源,并在下次获取资源时发现其对应的EventLoop实例已关闭后将其全部释放。
代码:
import asyncio
import functools
import threading
from typing import Any, Optional
# 设置全局的EventLoop
LOOP = asyncio.get_event_loop()
class CallResult:
result: Any = None
exception: Optional[BaseException] = None
# async_to_sync 装饰器
def async_to_sync(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
call_result = CallResult()
event = threading.Event() # 用于阻塞等待运行结果
async def wrapper2():
try:
call_result.result = await func(*args, **kwargs)
except BaseException as e:
call_result.exception = e # 写入异常
finally:
event.set()
# 使用 全局EventLoop将wrapper2以task的方式执行
LOOP.call_soon_threadsafe(LOOP.create_task, wrapper2())
event.wait() # 等待event激活 返回结果
if call_result.exception:
raise call_result.exception
return call_result.result
return wrapper
# 正常的异步函数
@async_to_sync
async def go():
print('current_loop: ', id(asyncio.get_event_loop()))
print('GLOBAL LOOP: ', id(LOOP))
print('current_loop is GLOBAL LOOP: ', LOOP is asyncio.get_event_loop())
# 抛出异常的异步函数
@async_to_sync
async def raise_value_error():
raise ValueError(id(LOOP))
# 下述loop_thread, start_loop, stop_loop可整合为一个类,这里为了方便阅读写成函数调用
loop_thread: threading
# 将全局EventLoop设为运行状态
def start_loop():
global loop_thread
loop_thread = threading.Thread(target=LOOP.run_forever)
loop_thread.start()
# 将全局EventLoop关闭,正常的服务可以不用写,这个是为了示例代码可以正常结束用的
def stop_loop():
@async_to_sync
async def stop():
print('Loop stop')
LOOP.stop()
stop()
print('Loop close')
LOOP.close()
loop_thread.join()
if __name__ == '__main__':
start_loop()
for i in range(10):
print(f'------------{i:02}------------')
go()
print('[run go] end')
try:
raise_value_error()
except ValueError:
import traceback
# 使用标准输出,确保内容输出顺序一致
print(traceback.format_exc())
print('[raise_value_error] end')
print('stop loop')
stop_loop()
print('stop loop called', flush=True)
输出如下:
------------00------------
current_loop: 2224110182984
GLOBAL LOOP: 2224110182984
current_loop is GLOBAL LOOP: True
------------01------------
current_loop: 2224110182984
GLOBAL LOOP: 2224110182984
current_loop is GLOBAL LOOP: True
------------02------------
current_loop: 2224110182984
GLOBAL LOOP: 2224110182984
current_loop is GLOBAL LOOP: True
------------03------------
current_loop: 2224110182984
GLOBAL LOOP: 2224110182984
current_loop is GLOBAL LOOP: True
------------04------------
current_loop: 2224110182984
GLOBAL LOOP: 2224110182984
current_loop is GLOBAL LOOP: True
------------05------------
current_loop: 2224110182984
GLOBAL LOOP: 2224110182984
current_loop is GLOBAL LOOP: True
------------06------------
current_loop: 2224110182984
GLOBAL LOOP: 2224110182984
current_loop is GLOBAL LOOP: True
------------07------------
current_loop: 2224110182984
GLOBAL LOOP: 2224110182984
current_loop is GLOBAL LOOP: True
------------08------------
current_loop: 2224110182984
GLOBAL LOOP: 2224110182984
current_loop is GLOBAL LOOP: True
------------09------------
current_loop: 2224110182984
GLOBAL LOOP: 2224110182984
current_loop is GLOBAL LOOP: True
[run go] end
Traceback (most recent call last):
File "<PythonFile>", line 86, in <module>
raise_value_error()
File "<PythonFile>", line 35, in wrapper
raise call_result.exception
File "<PythonFile>", line 25, in wrapper2
call_result.result = await func(*args, **kwargs)
File "<PythonFile>", line 52, in raise_value_error
raise ValueError(id(LOOP))
ValueError: 2224110182984
[raise_value_error] end
stop loop
Loop stop
Loop close
stop loop called
网友评论