美文网首页Starlette 解读 by Gascognya
Starlette 源码阅读 (十) 异步与后台任务

Starlette 源码阅读 (十) 异步与后台任务

作者: Gascognya | 来源:发表于2020-08-17 12:18 被阅读0次

    background.py & concurrency.py

    实际上笔者对于异步的深层原理了解并不透彻,还只停留在勉强会用的水平。日后会对这方面只是进行系统性自下而上的学习。

    后台任务

    代码十分简单,在此仅贴出来

    class BackgroundTask:
        def __init__(
            self, func: typing.Callable, *args: typing.Any, **kwargs: typing.Any
        ) -> None:
            self.func = func
            self.args = args
            self.kwargs = kwargs
            self.is_async = asyncio.iscoroutinefunction(func)
    
        async def __call__(self) -> None:
            if self.is_async:
                await self.func(*self.args, **self.kwargs)
            else:
                await run_in_threadpool(self.func, *self.args, **self.kwargs)
    
    
    class BackgroundTasks(BackgroundTask):
        def __init__(self, tasks: typing.Sequence[BackgroundTask] = []):
            self.tasks = list(tasks)
    
        def add_task(
            self, func: typing.Callable, *args: typing.Any, **kwargs: typing.Any
        ) -> None:
            task = BackgroundTask(func, *args, **kwargs)
            self.tasks.append(task)
    
        async def __call__(self) -> None:
            for task in self.tasks:
                await task()
    
    

    异步

    提供了一种异步循环方式,还有一个线程池

    T = typing.TypeVar("T")
    async def run_until_first_complete(*args: typing.Tuple[typing.Callable, dict]) -> None:
        # 直到其中一个完成
        # 在StreamingResponse中使用过
        # await run_until_first_complete(
        #    (self.stream_response, {"send": send}),
        #    (self.listen_for_disconnect, {"receive": receive}),
        # )
        tasks = [handler(**kwargs) for handler, kwargs in args]
        (done, pending) = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
        [task.cancel() for task in pending]
        [task.result() for task in done]
    
    
    async def run_in_threadpool(
        func: typing.Callable[..., T], *args: typing.Any, **kwargs: typing.Any
    ) -> T:
        # 线程池
        loop = asyncio.get_event_loop()
        if contextvars is not None:  # pragma: no cover
            # 确保我们运行在相同的上下文中
            child = functools.partial(func, *args, **kwargs)
            context = contextvars.copy_context()
            func = context.run
            args = (child,)
        elif kwargs:  # pragma: no cover
            # run_in_executor不接受“kwargs”,因此将它们绑定到这里
            func = functools.partial(func, **kwargs)
        return await loop.run_in_executor(None, func, *args)
    
    
    class _StopIteration(Exception):
        pass
    
    
    def _next(iterator: Iterator) -> Any:
        # 我们不能从线程池迭代器内部触发‘StopIteration’,
        # 然后在上下文外部捕获它,所以我们强制它们进入不同的异常类型。
        try:
            return next(iterator)
        except StopIteration:
            raise _StopIteration
    
    
    async def iterate_in_threadpool(iterator: Iterator) -> AsyncGenerator:
        while True:
            try:
                yield await run_in_threadpool(_next, iterator)
            except _StopIteration:
                break
    
    

    相关文章

      网友评论

        本文标题:Starlette 源码阅读 (十) 异步与后台任务

        本文链接:https://www.haomeiwen.com/subject/vvtqjktx.html