Starlette 源码阅读 (二) 路由

作者: Gascognya | 来源:发表于2020-08-09 23:18 被阅读0次
    本篇开始阅读Starlette的routing.py源码
    routing.py结构

    从Router类开始

    class Router:
        """
            参数:
                (1) routes: 路由列表
                (2) redirect_slashes: 重定向斜杠
                (3) default: 处理无法匹配项的最基础App
                (4) on_start: 启动事件列表
                (5) on_shutdown: 结束事件列表
                (6) lifespan: 上述两者的合并
        """
        def __init__(
            self,
            routes: typing.Sequence[BaseRoute] = None,
            redirect_slashes: bool = True,
            default: ASGIApp = None,
            on_startup: typing.Sequence[typing.Callable] = None,
            on_shutdown: typing.Sequence[typing.Callable] = None,
            lifespan: typing.Callable[[typing.Any], typing.AsyncGenerator] = None,
        ) -> None:
            self.routes = [] if routes is None else list(routes)
            self.redirect_slashes = redirect_slashes
            self.default = self.not_found if default is None else default
            self.on_startup = [] if on_startup is None else list(on_startup)
            self.on_shutdown = [] if on_shutdown is None else list(on_shutdown)
    
            async def default_lifespan(app: typing.Any) -> typing.AsyncGenerator:
                await self.startup()
                yield
                await self.shutdown()
    
            # 将on_start和on_shutdown合并成一个生成器, 赋值给lifespan, 如果lifespan不存在
            # 运行所有启动事件 → yield → 运行所有结束事件
            self.lifespan_context = default_lifespan if lifespan is None else lifespan
    
        async def not_found(self, scope: Scope, receive: Receive, send: Send) -> None:
            if scope["type"] == "websocket":
                websocket_close = WebSocketClose()
                await websocket_close(scope, receive, send)
                return
    
            # 如果我们在starlette应用中运行, 且抛出了一个错误
            # 那么可配置的异常处理程序, 便可处理返回的response并触发一场
            # 对于简单的ASGI程序来说, 只仅仅返回response即可
            if "app" in scope:
                raise HTTPException(status_code=404)
            else:
                response = PlainTextResponse("Not Found", status_code=404)
            await response(scope, receive, send)
    
        def url_path_for(self, name: str, **path_params: str) -> URLPath:
            for route in self.routes:
                try:
                    return route.url_path_for(name, **path_params)
                except NoMatchFound:
                    pass
            raise NoMatchFound()
    
    启动时间与结束事件

    startup()与shutdown()为上文self.lifespan_context = default_lifespan
    def default_lifespan的定义

        async def startup(self) -> None:
            """
            运行所有 启动事件
            """
            for handler in self.on_startup:
                if asyncio.iscoroutinefunction(handler):
                    await handler()
                else:
                    handler()
    
        async def shutdown(self) -> None:
            """
            运行所有 结束事件
            """
            for handler in self.on_shutdown:
                if asyncio.iscoroutinefunction(handler):
                    await handler()
                else:
                    handler()
    
        async def lifespan(self, scope: Scope, receive: Receive, send: Send) -> None:
            """
            处理 ASGI 生命周期(lifespan)消息, 它允许我们管理启动事件和结束事件
            """
            first = True
            app = scope.get("app")
            message = await receive()
            try:
                if inspect.isasyncgenfunction(self.lifespan_context):  # 判断是否为协程
                    async for item in self.lifespan_context(app):
                        # 第一次触发 await self.startup()
                        # 第二次触发 await self.shutdown()
                        assert first, "Lifespan context yielded multiple times."
                        # 第一次触发前为True, 发送启动事件完成信息
                        # 第二次触发前为false, 不再发送该消息, 异常被捕获
                        first = False
                        await send({"type": "lifespan.startup.complete"})
                        message = await receive()
                        # 发送生命周期启动事件完成信息, 等待回应
                else:
                    for item in self.lifespan_context(app):  # type: ignore
                        assert first, "Lifespan context yielded multiple times."
                        first = False
                        await send({"type": "lifespan.startup.complete"})
                        message = await receive()
            except BaseException:
                # 捕获错误
                if first:
                    # 如果first为true, 说明为第一次完成前发生的意外错误
                    exc_text = traceback.format_exc()
                    await send({"type": "lifespan.startup.failed", "message": exc_text})
                raise
            else:
                # 否则为人为错误, 发送结束时间完成信息
                await send({"type": "lifespan.shutdown.complete"})
    
    路由主入口
        async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
            """
            Router类的主入口
            """
            assert scope["type"] in ("http", "websocket", "lifespan")
    
            if "router" not in scope:
                scope["router"] = self
    
            if scope["type"] == "lifespan":
                await self.lifespan(scope, receive, send)
                return
    
            partial = None
    
            for route in self.routes:
                # 确认是否有路由和传入的范围匹配
                # 如果发现, 则移交给匹配的路由
                match, child_scope = route.matches(scope)
                if match == Match.FULL:
                    scope.update(child_scope)
                    await route.handle(scope, receive, send)
                    # 如果完全匹配, 则直接交给路由节点的函数处理
                    return
                elif match == Match.PARTIAL and partial is None:
                    partial = route
                    partial_scope = child_scope
    
            if partial is not None:
                # 处理部分匹配, 在这种情况下, endpoint能够处理请求, 但不是首选.
                # 我们特意使用这个partial来处理 "405 Method Not Allowed"
                scope.update(partial_scope)
                await partial.handle(scope, receive, send)
                return
    
            # 未匹配的情况, 判断重定向
            if scope["type"] == "http" and self.redirect_slashes and scope["path"] != "/":
                # 如果类型为http 且 自身存在重定向符 且 路径不为根路径
                redirect_scope = dict(scope)
                if scope["path"].endswith("/"):
                    redirect_scope["path"] = redirect_scope["path"].rstrip("/")
                else:
                    redirect_scope["path"] = redirect_scope["path"] + "/"
                # 路径如果包含/则去掉, 不包含则加上
                for route in self.routes:
                    match, child_scope = route.matches(redirect_scope)
                    if match != Match.NONE:
                        # 再次进行匹配, 如果结果不为空, 则发送重定向response
                        redirect_url = URL(scope=redirect_scope)
                        response = RedirectResponse(url=str(redirect_url))
                        await response(scope, receive, send)
                        return
            # 完全未匹配情况, 调用自身的default
            # 在定义时, self.default = self.not_found, 来处理404信息
            await self.default(scope, receive, send)
    

    routing.py→Match

    class Match(Enum):
        """
        一个匹配程度的枚举类
        分别用0,1,2来表示
        """
        NONE = 0
        PARTIAL = 1
        FULL = 2
    

    # The following usages are now discouraged in favour of configuration during Router.__init__(...)
    以此往下同Starlette类, 都是操作自身属性的封装方法

        def mount(self, path: str, app: ASGIApp, name: str = None) -> None:
            route = Mount(path, app=app, name=name)
            self.routes.append(route)
    
        def host(self, host: str, app: ASGIApp, name: str = None) -> None:
            route = Host(host, app=app, name=name)
            self.routes.append(route)
    
        def add_route(): ...
        def add_websocket_route(): ...
        def route(): ...
        def websocket_route(): ...
        def add_event_handler(): ...
        def on_event(): ...
    

    前两个方法说明, 除了Route对象, 可以向路由中加载Mount对象以及Host对象, 他们都继承于BaseRoute类

    BaseRoute类

    class BaseRoute:
        def matches(self, scope: Scope) -> typing.Tuple[Match, Scope]:
            raise NotImplementedError()  # pragma: no cover
    
        def url_path_for(self, name: str, **path_params: str) -> URLPath:
            raise NotImplementedError()  # pragma: no cover
    
        async def handle(self, scope: Scope, receive: Receive, send: Send) -> None:
            raise NotImplementedError()  # pragma: no cover
    
        async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
            """
            一个 Route 可以作为一个独立的ASGI应用程序单独使用
            这是一种人为的情况,因为它们几乎总是在一个 Router 中使用
            但可能对一些工具和最小的应用程序有用
            """
            match, child_scope = self.matches(scope)
            if match == Match.NONE:
                if scope["type"] == "http":
                    response = PlainTextResponse("Not Found", status_code=404)
                    await response(scope, receive, send)
                elif scope["type"] == "websocket":
                    websocket_close = WebSocketClose()
                    await websocket_close(scope, receive, send)
                return
    
            scope.update(child_scope)
            await self.handle(scope, receive, send)
            # 其功能与router的__call__部分功能相似, 可以将路径与自身进行匹配
    

    Route类

    class Route(BaseRoute):
        """
        单个Route表示一个路由节点, 连接endpoint与路径
        参数:
            (1) path: 匹配路径
            (2) endpoint: 路径处理的函数
            (3) methods: 允许的HTTP方法
            (4) name: 用于反向查找(?)
            (5) include_in_schema: 未知(?)
        """
        def __init__(
                self,
                path: str,
                endpoint: typing.Callable,
                *,
                methods: typing.List[str] = None,
                name: str = None,
                include_in_schema: bool = True,
        ) -> None:
            assert path.startswith("/"), "Routed paths must start with '/'"
            self.path = path
            self.endpoint = endpoint
            self.name = get_name(endpoint) if name is None else name
            # 通过endpoint的函数名(__name__)获取,源码略
            self.include_in_schema = include_in_schema
    
            if inspect.isfunction(endpoint) or inspect.ismethod(endpoint):
                # Endpoint 是一个函数或者方法 把它看作 `func(request) -> response`.
                self.app = request_response(endpoint)
                # 将endpoint进行封装
                if methods is None:
                    methods = ["GET"]
            else:
                # Endpoint 是一个类(cbv). 把它看作 ASGI 应用.
                self.app = endpoint
    
            if methods is None:
                self.methods = None
            else:
                self.methods = set(method.upper() for method in methods)
                if "GET" in self.methods:
                    self.methods.add("HEAD")
            # 设置允许的HTTP方法
            self.path_regex, self.path_format, self.param_convertors = compile_path(path)
            # 分解路径字符串
    
    

    routing.py→request_response

    用于将endpoint封装成app
    app传入元数据,直接得到response

    def request_response(func: typing.Callable) -> ASGIApp:
        """
        获取一个函数或者协程 `func(request) -> response`,
        然后返回一个 ASGI 应用
        """
        is_coroutine = asyncio.iscoroutinefunction(func)
    
        async def app(scope: Scope, receive: Receive, send: Send) -> None:
            request = Request(scope, receive=receive, send=send)
            # 构建一个request实例
            if is_coroutine:
                response = await func(request)
            else:
                response = await run_in_threadpool(func, request)
            # 如果是协程则执行, 如果是普通函数则加入线程池
            # 这里的func, 即为我们最常写的路由对应的endpoint(请求处理函数)
            # 函数在此被执行, 得到response
            await response(scope, receive, send)
    
        return app
    

    routing.py→compile_path

    # 匹配URL路径中的params参数, 例. '{param}', 或 '{param:int}'
    PARAM_REGEX = re.compile("{([a-zA-Z_][a-zA-Z0-9_]*)(:[a-zA-Z_][a-zA-Z0-9_]*)?}")
    
    def compile_path(
            path: str,
    ) -> typing.Tuple[typing.Pattern, str, typing.Dict[str, Convertor]]:
        """
        提交一个设定的路径参数匹配字符串, 类似"/{username:str}",
        返回一个三元元祖(regex, format, {param_name:convertor}).
    
        regex:      "/(?P<username>[^/]+)"
        format:     "/{username}"
        convertors: {"username": StringConvertor()}
        """
        path_regex = "^"
        path_format = ""
    
        idx = 0
        param_convertors = {}
        for match in PARAM_REGEX.finditer(path):
            param_name, convertor_type = match.groups("str")
            # 解析出参数名, 和参数类型
            convertor_type = convertor_type.lstrip(":")
            assert (
                    convertor_type in CONVERTOR_TYPES
            ), f"Unknown path convertor '{convertor_type}'"
            # 查询是否在类型字典中
            convertor = CONVERTOR_TYPES[convertor_type]
            # 从类型字典得到对应的参数数据类型转换器
    
            path_regex += re.escape(path[idx: match.start()])
            # 解析出其中的正则
            path_regex += f"(?P<{param_name}>{convertor.regex})"
            # 正则字符串中加入分组
            path_format += path[idx: match.start()]
            path_format += "{%s}" % param_name
    
            param_convertors[param_name] = convertor
    
            idx = match.end()
    
        path_regex += re.escape(path[idx:]) + "$"
        path_format += path[idx:]
    
        return re.compile(path_regex), path_format, param_convertors
    

    回到Route类

        def matches(self, scope: Scope) -> typing.Tuple[Match, Scope]:
            """
            路由节点的匹配方法, 接收scope字典, 进行解析
            返回 match, child_scope
            """
            if scope["type"] == "http":
                match = self.path_regex.match(scope["path"])
                # 将传过来的path与自身路径正则进行匹配
                if match:
                    matched_params = match.groupdict()
                    for key, value in matched_params.items():
                        matched_params[key] = self.param_convertors[key].convert(value)
                    # 调用对应的转换器
                    path_params = dict(scope.get("path_params", {}))
                    path_params.update(matched_params)
                    child_scope = {"endpoint": self.endpoint, "path_params": path_params}
                    if self.methods and scope["method"] not in self.methods:
                        # 如果请求的方法,没在自身允许的方法中则返回部分匹配
                        return Match.PARTIAL, child_scope
                    else:
                        # 否则返回全部匹配
                        return Match.FULL, child_scope
            # 未匹配
            return Match.NONE, {}
    
        def url_path_for(self, name: str, **path_params: str) -> URLPath:
            # 同url_for一样,用于反向查找
            seen_params = set(path_params.keys())
            expected_params = set(self.param_convertors.keys())
    
            if name != self.name or seen_params != expected_params:
                raise NoMatchFound()
    
            path, remaining_params = replace_params(
                self.path_format, self.param_convertors, path_params
            )
            assert not remaining_params
            return URLPath(path=path, protocol="http")
    
        async def handle(self, scope: Scope, receive: Receive, send: Send) -> None:
            """
            节点的入口
            当route匹配成功,就会调用匹配成功的handle方法
            """
            if self.methods and scope["method"] not in self.methods:
                if "app" in scope:
                    raise HTTPException(status_code=405)
                else:
                    response = PlainTextResponse("Method Not Allowed", status_code=405)
                await response(scope, receive, send)
            else:
                # 当判断属于支持的方法时,调用封装好的endpoint
                await self.app(scope, receive, send)
    
        def __eq__(self, other: typing.Any) -> bool:
            return (
                    isinstance(other, Route)
                    and self.path == other.path
                    and self.endpoint == other.endpoint
                    and self.methods == other.methods
            )
    

    routing.py余下内容将在下篇继续解读

    相关文章

      网友评论

        本文标题:Starlette 源码阅读 (二) 路由

        本文链接:https://www.haomeiwen.com/subject/imctdktx.html