美文网首页
FastAPI利用装饰器实现定时任务

FastAPI利用装饰器实现定时任务

作者: 何小有 | 来源:发表于2022-05-31 19:46 被阅读0次

    因为 FastAPI 本身就是高性能异步框架,所以在不使用任何第三方定时任务模块的情况下,FastAPI 也可以很方便的实现定时任务。

    创建一个 tasks.py 文件, 复制下面的装饰器代码:

    import asyncio
    from loguru import logger
    from functools import wraps
    from asyncio import ensure_future
    from starlette.concurrency import run_in_threadpool
    from typing import Any, Callable, Coroutine, Optional, Union
    
    NoArgsNoReturnFuncT = Callable[[], None]
    NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]]
    NoArgsNoReturnDecorator = Callable[
        [Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]],
        NoArgsNoReturnAsyncFuncT
    ]
    
    
    def repeat_task(
        *,
        seconds: float,
        wait_first: bool = False,
        raise_exceptions: bool = False,
        max_repetitions: Optional[int] = None,
    ) -> NoArgsNoReturnDecorator:
        '''
        返回一个修饰器, 该修饰器修改函数, 使其在首次调用后定期重复执行.
        其装饰的函数不能接受任何参数并且不返回任何内容.
        参数:
            seconds: float
                等待重复执行的秒数
            wait_first: bool (默认 False)
                如果为 True, 该函数将在第一次调用前先等待一个周期.
            raise_exceptions: bool (默认 False)
                如果为 True, 该函数抛出的错误将被再次抛出到事件循环的异常处理程序.
            max_repetitions: Optional[int] (默认 None)
                该函数重复执行的最大次数, 如果为 None, 则该函数将永远重复.
        '''
        def decorator(func: Union[NoArgsNoReturnAsyncFuncT, NoArgsNoReturnFuncT]) -> NoArgsNoReturnAsyncFuncT:
            '''
            将修饰函数转换为自身重复且定期调用的版本.
            '''
            is_coroutine = asyncio.iscoroutinefunction(func)
    
            @wraps(func)
            async def wrapped() -> None:
                repetitions = 0
    
                async def loop() -> None:
                    nonlocal repetitions
                    if wait_first:
                        await asyncio.sleep(seconds)
                    while max_repetitions is None or repetitions < max_repetitions:
                        try:
                            if is_coroutine:
                                # 以协程方式执行
                                await func()  # type: ignore
                            else:
                                # 以线程方式执行
                                await run_in_threadpool(func)
                            repetitions += 1
                        except Exception as exc:
                            logger.error(f'执行重复任务异常: {exc}')
                            if raise_exceptions:
                                raise exc
                        await asyncio.sleep(seconds)
                ensure_future(loop())
            return wrapped
        return decorator
    

    main.py 中调用时需要先添加 @app.on_event('startup') 装饰器,然后再添加我们自己实现的 repeat_task 装饰器:

    ......
    from core.tasks import repeat_task
    ......
    @app.on_event('startup')
    @repeat_task(seconds=60*60, wait_first=True)
    def repeat_task_aggregate_request_records() -> None:
        logger.info('触发重复任务: 聚合请求记录')
    ......
    

    将重复周期改成 6 秒测试一下效果:

    INFO:     Started server process [2075595]
    INFO:     Waiting for application startup.
    INFO:     Application startup complete.
    INFO:     Uvicorn running on http://0.0.0.0:8083 (Press CTRL+C to quit)
    2022-05-31 19:31:44.065 | INFO     | apis.bases.api_logs:repeat_task_aggregate_request_records:52 - 触发重复任务: 聚合请求记录
    2022-05-31 19:31:50.067 | INFO     | apis.bases.api_logs:repeat_task_aggregate_request_records:52 - 触发重复任务: 聚合请求记录
    2022-05-31 19:31:56.068 | INFO     | apis.bases.api_logs:repeat_task_aggregate_request_records:52 - 触发重复任务: 聚合请求记录
    

    以上只实现了一个定期重复执行的 repeat_task 装饰器,如果要实现更多需求,只需要照葫芦画瓢再实现几个装饰器就可以了。

    相关文章

      网友评论

          本文标题:FastAPI利用装饰器实现定时任务

          本文链接:https://www.haomeiwen.com/subject/bsxtmrtx.html