美文网首页Starlette 解读 by Gascognya
Starlette 源码阅读 (一) ASGI应用

Starlette 源码阅读 (一) ASGI应用

作者: Gascognya | 来源:发表于2020-08-06 14:46 被阅读0次

    为了深入了解FastAPI框架,笔者决定将其基础库Starlette的源码进行阅读,未来也可能阅读uvicorn库的ASGI原理。个人水平十分有限,有错误的地方还请见谅。


    Starlette文件结构

    从applications.py开始

    applications只包含了Starlette一个类

     class Starlette:
      """
            创建一个APP
            参数:
                (1) debug: 布尔值, 用于设置在出现错误时是否应返回调试回溯.
    
                (2) routers: 一个路由列表, 提供HTTP和WebSocket服务.
    
                (3) middleware: 一个中间件列表, 应用于每个request请求.
                    一个starlette应用, 最少包含两个中间件:
                    1. ServerErrorMiddleware中间件处于最外层, 以处理在整个堆栈中任何地方发生的未捕获错误.
                    2. ExceptionMiddleware在最内层以处理处理路由或终结点中发生的异常情况.
    
                (4) exception_handlers: 一个字典, 键为状态码或者异常类, 值为其对应的调用函数.
                    调用函数应该是handler(request, exc) -> response, 可以是标准函数, 也可以是异步函数.
    
                (5) on_startup: 一个启动时的调用项列表, 启动处理调用项, 不接受任何参数, 可以是同步函数, 也可以是异步函数.
    
                (6) on_shutdown: 一个关闭时的调用项列表, 同上.
        """
        def __init__(
            self,
            debug: bool = False,
            routes: typing.Sequence[BaseRoute] = None,
            middleware: typing.Sequence[Middleware] = None,
            exception_handlers: typing.Dict[
                typing.Union[int, typing.Type[Exception]], typing.Callable
            ] = None,
            on_startup: typing.Sequence[typing.Callable] = None,
            on_shutdown: typing.Sequence[typing.Callable] = None,
            lifespan: typing.Callable[["Starlette"], typing.AsyncGenerator] = None,
        ) -> None:
            """
                lifespan上下文函数是一种较新的样式,
                它取代了on_startup/on_shutdown处理程序。
                两种方式任选其一,切勿一起使用。
            """
            assert lifespan is None or (
                on_startup is None and on_shutdown is None
            ), "Use either 'lifespan' or 'on_startup'/'on_shutdown', not both."
    
            self._debug = debug
            self.state = State()
            self.router = Router(
                routes, on_startup=on_startup, on_shutdown=on_shutdown, lifespan=lifespan
            )
            self.exception_handlers = (
                {} if exception_handlers is None else dict(exception_handlers)
            )
            self.user_middleware = [] if middleware is None else list(middleware)
            self.middleware_stack = self.build_middleware_stack()
            # build_middleware_stack函数用于构建中间件堆栈
    
        def build_middleware_stack(self) -> ASGIApp:
            """
                构建中间件堆栈
                返回值是ServerErrorMiddleware, 也就是最外层的中间件.其app属性指向上一个中间件, 以此类推
            """
            debug = self.debug
            error_handler = None
            exception_handlers = {}
    
            for key, value in self.exception_handlers.items():
                if key in (500, Exception):
                    error_handler = value
                else:
                    exception_handlers[key] = value
            # 将异常类和状态码分离出来保存
    
            middleware = (
                [Middleware(ServerErrorMiddleware, handler=error_handler, debug=debug,)]
                + self.user_middleware
                + [
                    Middleware(
                        ExceptionMiddleware, handlers=exception_handlers, debug=debug,
                    )
                ]
            )
            # 创建中间件元祖, 顺序为:
            # (1)最外层ServerErrorMiddleware(异常类)
            # (2)用户的中间件列表
            # (3)最内层ExceptionMiddleware(状态码字典)
    
            app = self.router
            for cls, options in reversed(middleware):
                app = cls(app=app, **options)
            # cls, options为middleware类的两个属性
            # 分别对应上面Middleware的第一个参数, 和后续参数.
            # 随后进行套娃, 迭代中第一个app中的app属性为self.router, 随后每个中间件app中的app属性, 皆指向上一个中间件app
            # 把最后一个套娃(应该是最外层的ServerErrorMiddleware)返回
    
            return app
    
        @property
        def routes(self) -> typing.List[BaseRoute]:
            return self.router.routes
    
        @property
        def debug(self) -> bool:
            return self._debug
    
        @debug.setter
        def debug(self, value: bool) -> None:
            self._debug = value
            self.middleware_stack = self.build_middleware_stack()
    
        def url_path_for(self, name: str, **path_params: str) -> URLPath:
            return self.router.url_path_for(name, **path_params)
    

    App的启动入口

        async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
            scope["app"] = self
            # 将自身装入到传过来的scope字典中, 然后传给中间件堆栈
            await self.middleware_stack(scope, receive, send)
            # 前面提到, middleware_stack实际上是最外层中间件ServerErrorMiddleware
            # 调用ServerErrorMiddleware的__call__
    
    errors.py → ServerErrorMiddleware类

    与上文相关的部分

    class ServerErrorMiddleware:
        """
            当服务器发生错误时, 返回500响应
    
            如果debug设置了, 那么将返回traceback, 否则将调用指定的handler
    
            这个中间件类通常应该用于包含其他所有中间件, 这样保证任何地方出现异常
            都可以返回 500 response
        """
    
        def __init__(
            self, app: ASGIApp, handler: typing.Callable = None, debug: bool = False
        ) -> None:
            self.app = app
            self.handler = handler
            self.debug = debug
    
            # if key in (500, Exception):
            #     error_handler = value
            # [Middleware(ServerErrorMiddleware, handler=error_handler, debug=debug,)]
            # 中间件堆栈定义时的参数, handler为用户自定义的500处理:
    
        async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
            if scope["type"] != "http":
                await self.app(scope, receive, send)
                return
            # 如果请求类型不是http, 则转入下一个中间件
    
            response_started = False
            async def _send(message: Message) -> None:
                nonlocal response_started, send
    
                if message["type"] == "http.response.start":
                    response_started = True
                await send(message)
            # 做一个闭包传给下一个中间件, 让后续中间件使用send时,
            # 能够修改自身的response_started为True
    
            try:
                await self.app(scope, receive, _send)
            except Exception as exc:
                # 如果调用过_send, response_started为True
                # 下面代码将不会执行
                if not response_started:
                    request = Request(scope)
                    if self.debug:
                        # 在debug模式中, 将返回traceback
                        response = self.debug_response(request, exc)
                    elif self.handler is None:
                        # 使用默认的handler处理
                        response = self.error_response(request, exc)
                    else:
                        # 使用用户自定义的500 handler
                        if asyncio.iscoroutinefunction(self.handler):
                            response = await self.handler(request, exc)
                        else:
                            response = await run_in_threadpool(self.handler, request, exc)
                        # 判断是否为协程, 不是则以线程池方式运行
                    await response(scope, receive, send)
    
                # 我们总是不断地抛出异常。
                # 这允许服务器记录错误,
                # 或者允许测试客户端在测试用例中选择性地提出错误。
                raise exc from None
    

    调用

    uvicorn.run(app, host="127.0.0.1", port=8000)
    运行时将app传给ASGI服务器, 服务器调用时会沿着
    app.__call__ → middleware1.__call__ → middleware2.__call__...
    查看最内层中间件ExceptionMiddleware.__call__的部分代码

        async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
            if scope["type"] != "http":
                await self.app(scope, receive, send)
                return
    
            response_started = False
    
            async def sender(message: Message) -> None:
                nonlocal response_started
    
                if message["type"] == "http.response.start":
                    response_started = True
                await send(message)
    
            try:
                await self.app(scope, receive, sender)
            except Exception as exc:
    

    会发现和最外层的ServerErrorMiddleware有极为相似, 暂且推测. 整个中间件堆栈中, 多次使用了将send闭包传给下层, 下层可能再做一次闭包将自己的response_started也包括进去.

    其结构抽象化如下:
        async def send3(message):
            nonlocal response_started
            if message["type"] == "http.response.start":
                response_started = True
                # ExceptionMiddleware.response_started
            await def send2(message):
                    nonlocal response_started
                    if message["type"] == "http.response.start":
                        response_started = True
                        # XXXMiddleware.response_started
                    await def send1(message):
                            nonlocal response_started
                            if message["type"] == "http.response.start":
                                response_started = True
                                # ServerErrorMiddleware.response_started
                            await send(message)
    

    回到applications.py → Starlette类

    英文注释在这里做了警告, 以下都是修改Starlette自身属性的方法

        # The following usages are now discouraged in favour of configuration
        #  during Starlette.__init__(...)
        # 以下方法, 现在不支持在__init__中配置使用
        def on_event(self, event_type: str) -> typing.Callable:
            return self.router.on_event(event_type)
    
        def mount(self, path: str, app: ASGIApp, name: str = None) -> None:
            self.router.mount(path, app=app, name=name)
    
        def host(self, host: str, app: ASGIApp, name: str = None) -> None:
            self.router.host(host, app=app, name=name)
    
        def add_middleware(self, middleware_class: type, **options: typing.Any) -> None:
            self.user_middleware.insert(0, Middleware(middleware_class, **options))
            self.middleware_stack = self.build_middleware_stack()
    
        def add_exception_handler(
            self,
            exc_class_or_status_code: typing.Union[int, typing.Type[Exception]],
            handler: typing.Callable,
        ) -> None:
            self.exception_handlers[exc_class_or_status_code] = handler
            self.middleware_stack = self.build_middleware_stack()
    
        def add_event_handler(self, event_type: str, func: typing.Callable) -> None:
            self.router.add_event_handler(event_type, func)
    
        def add_route(
            self,
            path: str,
            route: typing.Callable,
            methods: typing.List[str] = None,
            name: str = None,
            include_in_schema: bool = True,
        ) -> None:
            self.router.add_route(
                path, route, methods=methods, name=name, include_in_schema=include_in_schema
            )
    
        def add_websocket_route(
            self, path: str, route: typing.Callable, name: str = None
        ) -> None:
            self.router.add_websocket_route(path, route, name=name)
    
    

    以及对上述方法的一些包装

        def exception_handler(
            self, exc_class_or_status_code: typing.Union[int, typing.Type[Exception]]
        ) -> typing.Callable:
            def decorator(func: typing.Callable) -> typing.Callable:
                self.add_exception_handler(exc_class_or_status_code, func)
                return func
    
            return decorator
    
        def route(
            self,
            path: str,
            methods: typing.List[str] = None,
            name: str = None,
            include_in_schema: bool = True,
        ) -> typing.Callable:
            def decorator(func: typing.Callable) -> typing.Callable:
                self.router.add_route(
                    path,
                    func,
                    methods=methods,
                    name=name,
                    include_in_schema=include_in_schema,
                )
                return func
    
            return decorator
    
        def websocket_route(self, path: str, name: str = None) -> typing.Callable:
            def decorator(func: typing.Callable) -> typing.Callable:
                self.router.add_websocket_route(path, func, name=name)
                return func
    
            return decorator
    
        def middleware(self, middleware_type: str) -> typing.Callable:
            assert (
                middleware_type == "http"
            ), 'Currently only middleware("http") is supported.'
    
            def decorator(func: typing.Callable) -> typing.Callable:
                self.add_middleware(BaseHTTPMiddleware, dispatch=func)
                return func
    
            return decorator
    
    

    以上为Starlette库中applications.py为主的源码解读, 如有错误欢迎指正.

    相关文章

      网友评论

        本文标题:Starlette 源码阅读 (一) ASGI应用

        本文链接:https://www.haomeiwen.com/subject/yxszrktx.html