美文网首页FastAPI 解读 by Gascognya
FastAPI 依赖注入详解:处理依赖树

FastAPI 依赖注入详解:处理依赖树

作者: Gascognya | 来源:发表于2020-10-26 20:58 被阅读0次
        async def app(request: Request) -> Response:
    
            ......
    
            solved_result = await solve_dependencies(
                request=request,
                dependant=dependant,
                body=body,
                dependency_overrides_provider=dependency_overrides_provider,
            )
    
            values, errors, background_tasks, sub_response, _ = solved_result
    
            if errors:
                raise RequestValidationError(errors, body=body)
            else:
                raw_response = await run_endpoint_function(
                    dependant=dependant, values=values, is_coroutine=is_coroutine
                )
    
            ......
    

    这里是endpoint的前一步,request首先要经过solve_dependencies()来与endpoint的依赖树进行匹配,这相当于API的门卫一般的存在。
    匹配的结果包含在solve过程中的结果,错误,以及其他若干信息。有了详细的信息,我们便能精确的定位到问题的所在。

    async def solve_dependencies(
            *,
            request: Union[Request, WebSocket],
            dependant: Dependant,
            body: Optional[Union[Dict[str, Any], FormData]] = None,
            background_tasks: Optional[BackgroundTasks] = None,
            response: Optional[Response] = None,
            dependency_overrides_provider: Optional[Any] = None,
            dependency_cache: Optional[Dict[Tuple[Callable, Tuple[str]], Any]] = None,
    ) -> Tuple[
        Dict[str, Any],
        List[ErrorWrapper],
        Optional[BackgroundTasks],
        Response,
        Dict[Tuple[Callable, Tuple[str]], Any],
    ]:
        """
    
        :param request: 请求报文
        :param dependant: endpoint对应的依赖树
        :param body: 请求体
        :param background_tasks: 后台任务
        :param response: 子依赖
        :param dependency_overrides_provider: app中设置的依赖替代项
        :param dependency_cache: 已完成的依赖
        :return:
        """
        values: Dict[str, Any] = {}
        errors: List[ErrorWrapper] = []
        response = response or Response(
            content=None,
            status_code=None,  # type: ignore
            headers=None,
            media_type=None,
            background=None,
        )
        dependency_cache = dependency_cache or {}
    

    开始解依赖树

        for sub_dependant in dependant.dependencies:
            sub_dependant.call = cast(Callable, sub_dependant.call)
            sub_dependant.cache_key = cast(
                Tuple[Callable, Tuple[str]], sub_dependant.cache_key
            )
            # cast的作用是标注类型,方便提示
    
            call = sub_dependant.call
            use_sub_dependant = sub_dependant
            # 分别拿到依赖内容和依赖项
    
            if (
                    dependency_overrides_provider
                    and dependency_overrides_provider.dependency_overrides
            ):
                # 依赖重写时
                original_call = sub_dependant.call
                call = getattr(
                    dependency_overrides_provider, "dependency_overrides", {}
                ).get(original_call, original_call)
                # 找到对应的重写
                use_path: str = sub_dependant.path  # type: ignore
                use_sub_dependant = get_dependant(
                    path=use_path,
                    call=call,
                    name=sub_dependant.name,
                    security_scopes=sub_dependant.security_scopes,
                )
                # 重新生成依赖
                use_sub_dependant.security_scopes = sub_dependant.security_scopes
    
            solved_result = await solve_dependencies(
                request=request,
                dependant=use_sub_dependant,
                body=body,
                background_tasks=background_tasks,
                response=response,
                dependency_overrides_provider=dependency_overrides_provider,
                dependency_cache=dependency_cache,
            )
            # 获得依赖项的子依赖的结果集
    
            (
                sub_values,
                sub_errors,
                background_tasks,
                _,  # 子依赖项返回与我们相同的响应
                sub_dependency_cache,
            ) = solved_result
            # 拿到结果和错误
    

    上面这部分负责解决子依赖,拿到最后的结果集,然后用子依赖的结果集来解决自身

            dependency_cache.update(sub_dependency_cache)
            # 将子依赖已解决的内容注册
            if sub_errors:
                errors.extend(sub_errors)
                continue
    
            if sub_dependant.use_cache and sub_dependant.cache_key in dependency_cache:
                # 如果设置了使用缓存,且该依赖内容已被子依赖解决过
                solved = dependency_cache[sub_dependant.cache_key]
                # 直接抄答案
    
            # 否则照常执行
            elif is_gen_callable(call) or is_async_gen_callable(call):
                stack = request.scope.get("fastapi_astack")
                if stack is None:
                    raise RuntimeError(
                        async_contextmanager_dependencies_error
                    )  # pragma: no cover
                solved = await solve_generator(
                    call=call, stack=stack, sub_values=sub_values
                )
                # 用子依赖得到的结果集,作为参数,传入到依赖内容中。
    
            elif is_coroutine_callable(call):
                solved = await call(**sub_values)
            else:
                solved = await run_in_threadpool(call, **sub_values)
    
            # 对于同步异步不同的执行方式
    
            if sub_dependant.name is not None:
                values[sub_dependant.name] = solved
                # 收集结果
            if sub_dependant.cache_key not in dependency_cache:
                dependency_cache[sub_dependant.cache_key] = solved
                # 将结果添加到结果集
    

    解决该节点的解决每一个依赖项

        # 依赖项不再有子依赖时,会直接跳过上面的for循环
        path_values, path_errors = request_params_to_args(
            dependant.path_params, request.path_params
        )
        query_values, query_errors = request_params_to_args(
            dependant.query_params, request.query_params
        )
        header_values, header_errors = request_params_to_args(
            dependant.header_params, request.headers
        )
        cookie_values, cookie_errors = request_params_to_args(
            dependant.cookie_params, request.cookies
        )
        # 生成依赖树时,我们将需要的参数,保存到了path_params,query_params等地方。
        # 现在就是从Request中提取它们的好时机
        # 当然这个过程中也会产生错误,我们会收集这些错误
        values.update(path_values)
        values.update(query_values)
        values.update(header_values)
        values.update(cookie_values)
        errors += path_errors + query_errors + header_errors + cookie_errors
        # 合并现有错误
    

    解决参数依赖,分别遍历依赖的个参数需求列表,与request所能提供的做匹配。
    我们来看一下匹配的函数

    def request_params_to_args(
            required_params: Sequence[ModelField],
            received_params: Union[Mapping[str, Any], QueryParams, Headers],
    ) -> Tuple[Dict[str, Any], List[ErrorWrapper]]:
        values = {}
        errors = []
        for field in required_params:
            if is_scalar_sequence_field(field) and isinstance(
                    received_params, (QueryParams, Headers)
            ):
                value = received_params.getlist(field.alias) or field.default
            else:
                value = received_params.get(field.alias)
            # 分别对标准序列参数和其他参数的情况进行处理,拿到value
            # 这里未处理默认值的情况下
    
            field_info = field.field_info
            assert isinstance(
                field_info, params.Param
            ), "Params must be subclasses of Param"
            if value is None:
                if field.required:
                    errors.append(
                        ErrorWrapper(
                            MissingError(), loc=(field_info.in_.value, field.alias)
                        )
                    )
                    # 必须则引发错误
                else:
                    values[field.name] = deepcopy(field.default)
                    # 否则使用默认值
                continue
            v_, errors_ = field.validate(
                value, values, loc=(field_info.in_.value, field.alias)
            )
            if isinstance(errors_, ErrorWrapper):
                errors.append(errors_)
            elif isinstance(errors_, list):
                errors.extend(errors_)
            else:
                values[field.name] = v_
        return values, errors
    
    回到solve_dependencies
        if dependant.body_params:
            # 如果有user_info(user: User)这样的参数,User为Model。
            # 其会保存到body_params中,现在是处理它们的时候
            (
                body_values,
                body_errors,
            ) = await request_body_to_args(  # body_params checked above
                required_params=dependant.body_params, received_body=body
            )
            # body传入进去,进行匹配,得到结果
            values.update(body_values)
            errors.extend(body_errors)
            # 整合结果
        if dependant.http_connection_param_name:
            values[dependant.http_connection_param_name] = request
        if dependant.request_param_name and isinstance(request, Request):
            values[dependant.request_param_name] = request
        elif dependant.websocket_param_name and isinstance(request, WebSocket):
            values[dependant.websocket_param_name] = request
        # 这三者是解决需要特定参数的时候,主要是指Request或WebSocket这样的参数
    
        if dependant.background_tasks_param_name:
            if background_tasks is None:
                background_tasks = BackgroundTasks()
            values[dependant.background_tasks_param_name] = background_tasks
        # 后台任务
        if dependant.response_param_name:
            values[dependant.response_param_name] = response
        # 如果需要操作Response报文,这里会提供sub response,其内容最后会整合到response中
    
        if dependant.security_scopes_param_name:
            values[dependant.security_scopes_param_name] = SecurityScopes(
                scopes=dependant.security_scopes
            )
        # 拿到安全域
        return values, errors, background_tasks, response, dependency_cache
    

    solve_dependencies本身也是递归函数,这和get_dependant是相辅相成的。

    相关文章

      网友评论

        本文标题:FastAPI 依赖注入详解:处理依赖树

        本文链接:https://www.haomeiwen.com/subject/nbgamktx.html