本篇开始阅读Starlette的routing.py源码
routing.py结构从Router类开始
class Router:
"""
参数:
(1) routes: 路由列表
(2) redirect_slashes: 重定向斜杠
(3) default: 处理无法匹配项的最基础App
(4) on_start: 启动事件列表
(5) on_shutdown: 结束事件列表
(6) lifespan: 上述两者的合并
"""
def __init__(
self,
routes: typing.Sequence[BaseRoute] = None,
redirect_slashes: bool = True,
default: ASGIApp = None,
on_startup: typing.Sequence[typing.Callable] = None,
on_shutdown: typing.Sequence[typing.Callable] = None,
lifespan: typing.Callable[[typing.Any], typing.AsyncGenerator] = None,
) -> None:
self.routes = [] if routes is None else list(routes)
self.redirect_slashes = redirect_slashes
self.default = self.not_found if default is None else default
self.on_startup = [] if on_startup is None else list(on_startup)
self.on_shutdown = [] if on_shutdown is None else list(on_shutdown)
async def default_lifespan(app: typing.Any) -> typing.AsyncGenerator:
await self.startup()
yield
await self.shutdown()
# 将on_start和on_shutdown合并成一个生成器, 赋值给lifespan, 如果lifespan不存在
# 运行所有启动事件 → yield → 运行所有结束事件
self.lifespan_context = default_lifespan if lifespan is None else lifespan
async def not_found(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] == "websocket":
websocket_close = WebSocketClose()
await websocket_close(scope, receive, send)
return
# 如果我们在starlette应用中运行, 且抛出了一个错误
# 那么可配置的异常处理程序, 便可处理返回的response并触发一场
# 对于简单的ASGI程序来说, 只仅仅返回response即可
if "app" in scope:
raise HTTPException(status_code=404)
else:
response = PlainTextResponse("Not Found", status_code=404)
await response(scope, receive, send)
def url_path_for(self, name: str, **path_params: str) -> URLPath:
for route in self.routes:
try:
return route.url_path_for(name, **path_params)
except NoMatchFound:
pass
raise NoMatchFound()
启动时间与结束事件
startup()与shutdown()为上文self.lifespan_context = default_lifespan
中
def default_lifespan
的定义
async def startup(self) -> None:
"""
运行所有 启动事件
"""
for handler in self.on_startup:
if asyncio.iscoroutinefunction(handler):
await handler()
else:
handler()
async def shutdown(self) -> None:
"""
运行所有 结束事件
"""
for handler in self.on_shutdown:
if asyncio.iscoroutinefunction(handler):
await handler()
else:
handler()
async def lifespan(self, scope: Scope, receive: Receive, send: Send) -> None:
"""
处理 ASGI 生命周期(lifespan)消息, 它允许我们管理启动事件和结束事件
"""
first = True
app = scope.get("app")
message = await receive()
try:
if inspect.isasyncgenfunction(self.lifespan_context): # 判断是否为协程
async for item in self.lifespan_context(app):
# 第一次触发 await self.startup()
# 第二次触发 await self.shutdown()
assert first, "Lifespan context yielded multiple times."
# 第一次触发前为True, 发送启动事件完成信息
# 第二次触发前为false, 不再发送该消息, 异常被捕获
first = False
await send({"type": "lifespan.startup.complete"})
message = await receive()
# 发送生命周期启动事件完成信息, 等待回应
else:
for item in self.lifespan_context(app): # type: ignore
assert first, "Lifespan context yielded multiple times."
first = False
await send({"type": "lifespan.startup.complete"})
message = await receive()
except BaseException:
# 捕获错误
if first:
# 如果first为true, 说明为第一次完成前发生的意外错误
exc_text = traceback.format_exc()
await send({"type": "lifespan.startup.failed", "message": exc_text})
raise
else:
# 否则为人为错误, 发送结束时间完成信息
await send({"type": "lifespan.shutdown.complete"})
路由主入口
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
"""
Router类的主入口
"""
assert scope["type"] in ("http", "websocket", "lifespan")
if "router" not in scope:
scope["router"] = self
if scope["type"] == "lifespan":
await self.lifespan(scope, receive, send)
return
partial = None
for route in self.routes:
# 确认是否有路由和传入的范围匹配
# 如果发现, 则移交给匹配的路由
match, child_scope = route.matches(scope)
if match == Match.FULL:
scope.update(child_scope)
await route.handle(scope, receive, send)
# 如果完全匹配, 则直接交给路由节点的函数处理
return
elif match == Match.PARTIAL and partial is None:
partial = route
partial_scope = child_scope
if partial is not None:
# 处理部分匹配, 在这种情况下, endpoint能够处理请求, 但不是首选.
# 我们特意使用这个partial来处理 "405 Method Not Allowed"
scope.update(partial_scope)
await partial.handle(scope, receive, send)
return
# 未匹配的情况, 判断重定向
if scope["type"] == "http" and self.redirect_slashes and scope["path"] != "/":
# 如果类型为http 且 自身存在重定向符 且 路径不为根路径
redirect_scope = dict(scope)
if scope["path"].endswith("/"):
redirect_scope["path"] = redirect_scope["path"].rstrip("/")
else:
redirect_scope["path"] = redirect_scope["path"] + "/"
# 路径如果包含/则去掉, 不包含则加上
for route in self.routes:
match, child_scope = route.matches(redirect_scope)
if match != Match.NONE:
# 再次进行匹配, 如果结果不为空, 则发送重定向response
redirect_url = URL(scope=redirect_scope)
response = RedirectResponse(url=str(redirect_url))
await response(scope, receive, send)
return
# 完全未匹配情况, 调用自身的default
# 在定义时, self.default = self.not_found, 来处理404信息
await self.default(scope, receive, send)
routing.py→Match
class Match(Enum):
"""
一个匹配程度的枚举类
分别用0,1,2来表示
"""
NONE = 0
PARTIAL = 1
FULL = 2
# The following usages are now discouraged in favour of configuration during Router.__init__(...)
以此往下同Starlette类, 都是操作自身属性的封装方法
def mount(self, path: str, app: ASGIApp, name: str = None) -> None:
route = Mount(path, app=app, name=name)
self.routes.append(route)
def host(self, host: str, app: ASGIApp, name: str = None) -> None:
route = Host(host, app=app, name=name)
self.routes.append(route)
def add_route(): ...
def add_websocket_route(): ...
def route(): ...
def websocket_route(): ...
def add_event_handler(): ...
def on_event(): ...
前两个方法说明, 除了Route对象, 可以向路由中加载Mount对象以及Host对象, 他们都继承于BaseRoute类
BaseRoute类
class BaseRoute:
def matches(self, scope: Scope) -> typing.Tuple[Match, Scope]:
raise NotImplementedError() # pragma: no cover
def url_path_for(self, name: str, **path_params: str) -> URLPath:
raise NotImplementedError() # pragma: no cover
async def handle(self, scope: Scope, receive: Receive, send: Send) -> None:
raise NotImplementedError() # pragma: no cover
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
"""
一个 Route 可以作为一个独立的ASGI应用程序单独使用
这是一种人为的情况,因为它们几乎总是在一个 Router 中使用
但可能对一些工具和最小的应用程序有用
"""
match, child_scope = self.matches(scope)
if match == Match.NONE:
if scope["type"] == "http":
response = PlainTextResponse("Not Found", status_code=404)
await response(scope, receive, send)
elif scope["type"] == "websocket":
websocket_close = WebSocketClose()
await websocket_close(scope, receive, send)
return
scope.update(child_scope)
await self.handle(scope, receive, send)
# 其功能与router的__call__部分功能相似, 可以将路径与自身进行匹配
Route类
class Route(BaseRoute):
"""
单个Route表示一个路由节点, 连接endpoint与路径
参数:
(1) path: 匹配路径
(2) endpoint: 路径处理的函数
(3) methods: 允许的HTTP方法
(4) name: 用于反向查找(?)
(5) include_in_schema: 未知(?)
"""
def __init__(
self,
path: str,
endpoint: typing.Callable,
*,
methods: typing.List[str] = None,
name: str = None,
include_in_schema: bool = True,
) -> None:
assert path.startswith("/"), "Routed paths must start with '/'"
self.path = path
self.endpoint = endpoint
self.name = get_name(endpoint) if name is None else name
# 通过endpoint的函数名(__name__)获取,源码略
self.include_in_schema = include_in_schema
if inspect.isfunction(endpoint) or inspect.ismethod(endpoint):
# Endpoint 是一个函数或者方法 把它看作 `func(request) -> response`.
self.app = request_response(endpoint)
# 将endpoint进行封装
if methods is None:
methods = ["GET"]
else:
# Endpoint 是一个类(cbv). 把它看作 ASGI 应用.
self.app = endpoint
if methods is None:
self.methods = None
else:
self.methods = set(method.upper() for method in methods)
if "GET" in self.methods:
self.methods.add("HEAD")
# 设置允许的HTTP方法
self.path_regex, self.path_format, self.param_convertors = compile_path(path)
# 分解路径字符串
routing.py→request_response
用于将endpoint封装成app
app传入元数据,直接得到response
def request_response(func: typing.Callable) -> ASGIApp:
"""
获取一个函数或者协程 `func(request) -> response`,
然后返回一个 ASGI 应用
"""
is_coroutine = asyncio.iscoroutinefunction(func)
async def app(scope: Scope, receive: Receive, send: Send) -> None:
request = Request(scope, receive=receive, send=send)
# 构建一个request实例
if is_coroutine:
response = await func(request)
else:
response = await run_in_threadpool(func, request)
# 如果是协程则执行, 如果是普通函数则加入线程池
# 这里的func, 即为我们最常写的路由对应的endpoint(请求处理函数)
# 函数在此被执行, 得到response
await response(scope, receive, send)
return app
routing.py→compile_path
# 匹配URL路径中的params参数, 例. '{param}', 或 '{param:int}'
PARAM_REGEX = re.compile("{([a-zA-Z_][a-zA-Z0-9_]*)(:[a-zA-Z_][a-zA-Z0-9_]*)?}")
def compile_path(
path: str,
) -> typing.Tuple[typing.Pattern, str, typing.Dict[str, Convertor]]:
"""
提交一个设定的路径参数匹配字符串, 类似"/{username:str}",
返回一个三元元祖(regex, format, {param_name:convertor}).
regex: "/(?P<username>[^/]+)"
format: "/{username}"
convertors: {"username": StringConvertor()}
"""
path_regex = "^"
path_format = ""
idx = 0
param_convertors = {}
for match in PARAM_REGEX.finditer(path):
param_name, convertor_type = match.groups("str")
# 解析出参数名, 和参数类型
convertor_type = convertor_type.lstrip(":")
assert (
convertor_type in CONVERTOR_TYPES
), f"Unknown path convertor '{convertor_type}'"
# 查询是否在类型字典中
convertor = CONVERTOR_TYPES[convertor_type]
# 从类型字典得到对应的参数数据类型转换器
path_regex += re.escape(path[idx: match.start()])
# 解析出其中的正则
path_regex += f"(?P<{param_name}>{convertor.regex})"
# 正则字符串中加入分组
path_format += path[idx: match.start()]
path_format += "{%s}" % param_name
param_convertors[param_name] = convertor
idx = match.end()
path_regex += re.escape(path[idx:]) + "$"
path_format += path[idx:]
return re.compile(path_regex), path_format, param_convertors
回到Route类
def matches(self, scope: Scope) -> typing.Tuple[Match, Scope]:
"""
路由节点的匹配方法, 接收scope字典, 进行解析
返回 match, child_scope
"""
if scope["type"] == "http":
match = self.path_regex.match(scope["path"])
# 将传过来的path与自身路径正则进行匹配
if match:
matched_params = match.groupdict()
for key, value in matched_params.items():
matched_params[key] = self.param_convertors[key].convert(value)
# 调用对应的转换器
path_params = dict(scope.get("path_params", {}))
path_params.update(matched_params)
child_scope = {"endpoint": self.endpoint, "path_params": path_params}
if self.methods and scope["method"] not in self.methods:
# 如果请求的方法,没在自身允许的方法中则返回部分匹配
return Match.PARTIAL, child_scope
else:
# 否则返回全部匹配
return Match.FULL, child_scope
# 未匹配
return Match.NONE, {}
def url_path_for(self, name: str, **path_params: str) -> URLPath:
# 同url_for一样,用于反向查找
seen_params = set(path_params.keys())
expected_params = set(self.param_convertors.keys())
if name != self.name or seen_params != expected_params:
raise NoMatchFound()
path, remaining_params = replace_params(
self.path_format, self.param_convertors, path_params
)
assert not remaining_params
return URLPath(path=path, protocol="http")
async def handle(self, scope: Scope, receive: Receive, send: Send) -> None:
"""
节点的入口
当route匹配成功,就会调用匹配成功的handle方法
"""
if self.methods and scope["method"] not in self.methods:
if "app" in scope:
raise HTTPException(status_code=405)
else:
response = PlainTextResponse("Method Not Allowed", status_code=405)
await response(scope, receive, send)
else:
# 当判断属于支持的方法时,调用封装好的endpoint
await self.app(scope, receive, send)
def __eq__(self, other: typing.Any) -> bool:
return (
isinstance(other, Route)
and self.path == other.path
and self.endpoint == other.endpoint
and self.methods == other.methods
)
routing.py余下内容将在下篇继续解读
网友评论