美文网首页
Python Web——Flask源码解析

Python Web——Flask源码解析

作者: 睡不醒的大橘 | 来源:发表于2020-09-29 18:21 被阅读0次

    相关系列

    WSGI协议
    Gunicorn 源码解析

    最简单的Flask Application

    from flask import Flask
    app = Flask(__name__)
    
    @app.route('/')
    def hello_world():
        return 'Hello, World!'
        
    if __name__ == '__main__':
       app.run()
    
    1. 导入 Flask 类。该类的实例是我们的WSGI应用程序。
    2. Flask构造函数使用当前模块(__name __)的名称作为参数。
    3. route() 装饰器来告诉 Flask 触发函数的 URL 。
    4. 函数名称被用于生成相关联的 URL 。函数最后返回需要在用户浏览器中显示的信息。
    5. Flask类的run()方法在本地开发服务器上运行应用程序。这部分是Flask 内置的wsgi server,但它不能同时是多进程和多线程的,也缺乏一些调度和监控机制,因此仅在开发时使用,生产环境中通常会使用Gunicorn,uwsgi等更强大的wsgi server。

    @app.route

    @app.route 为一个函数注册为一个URL

    # flask app.py Flask
    def route(self, rule, **options):
        """A decorator that is used to register a view function for a
        given URL rule. """
    
        def decorator(f):
            endpoint = options.pop("endpoint", None)
            self.add_url_rule(rule, endpoint, f, **options)
            return f
    
        return decorator
    
    # flask app.py Flask
    @setupmethod
    def add_url_rule(
        self,
        rule,
        endpoint=None,
        view_func=None,
        provide_automatic_options=None,
        **options
    ):
        """Connects a URL rule.  Works exactly like the :meth:`route`
        decorator.  If a view_func is provided it will be registered with the
        endpoint."""
        if endpoint is None:
            endpoint = _endpoint_from_view_func(view_func)
        options["endpoint"] = endpoint
        methods = options.pop("methods", None)
    
        # if the methods are not given and the view_func object knows its
        # methods we can use that instead.  If neither exists, we go with
        # a tuple of only ``GET`` as default.
        if methods is None:
            methods = getattr(view_func, "methods", None) or ("GET",)
        if isinstance(methods, string_types):
            raise TypeError(
                "Allowed methods have to be iterables of strings, "
                'for example: @app.route(..., methods=["POST"])'
            )
        methods = set(item.upper() for item in methods)
    
        # Methods that should always be added
        required_methods = set(getattr(view_func, "required_methods", ()))
    
        # starting with Flask 0.8 the view_func object can disable and
        # force-enable the automatic options handling.
        if provide_automatic_options is None:
            provide_automatic_options = getattr(
                view_func, "provide_automatic_options", None
            )
    
        if provide_automatic_options is None:
            if "OPTIONS" not in methods:
                provide_automatic_options = True
                required_methods.add("OPTIONS")
            else:
                provide_automatic_options = False
    
        # Add the required methods now.
        methods |= required_methods
    
        rule = self.url_rule_class(rule, methods=methods, **options)
        rule.provide_automatic_options = provide_automatic_options
    
        self.url_map.add(rule)
        if view_func is not None:
            old_func = self.view_functions.get(endpoint)
            if old_func is not None and old_func != view_func:
                raise AssertionError(
                    "View function mapping is overwriting an "
                    "existing endpoint function: %s" % endpoint
                )
            self.view_functions[endpoint] = view_func
    

    其中@setupmethod装饰器会检查被装饰的函数是否在第一个request之前运行,如果不是且在debug模式,则会抛错:

    def setupmethod(f):
        """Wraps a method so that it performs a check in debug mode if the
        first request was already handled.
        """
    
        def wrapper_func(self, *args, **kwargs):
            if self.debug and self._got_first_request:
                raise AssertionError(
                    "A setup function was called after the "
                    "first request was handled.  This usually indicates a bug "
                    "in the application where a module was not imported "
                    "and decorators or other functionality was called too late.\n"
                    "To fix this make sure to import all your view modules, "
                    "database models and everything related at a central place "
                    "before the application starts serving requests."
                )
            return f(self, *args, **kwargs)
    
        return update_wrapper(wrapper_func, f)
    

    请求处理流程

    1. 回调函数
    • WSGI协议 一节介绍过,WSGI Application需要是一个可调用对象。Flask object实现了call方法,当有请求到来时,这个方法将会被调用:
    #flask\app.py Flask
    def __call__(self, environ, start_response):
        """The WSGI server calls the Flask application object as the
        WSGI application. This calls :meth:`wsgi_app` which can be
        wrapped to applying middleware."""
        return self.wsgi_app(environ, start_response)
    
    def wsgi_app(self, environ, start_response):
    #flask\app.py
        """The actual WSGI application."""
        #把请求相关信息传入,得到一个RequestContext对象,详见 2
        ctx = self.request_context(environ)
        error = None
        try:
            try:
                # 将ctx push进_request_ctx_stack 中
                ctx.push()
                response = self.full_dispatch_request()
            except Exception as e:
                error = e
                response = self.handle_exception(e)
            except:  # noqa: B001
                error = sys.exc_info()[1]
                raise
            return response(environ, start_response)
        finally:
            if self.should_ignore_error(error):
                error = None
            ctx.auto_pop(error)
    
    1. RequestContext
    #flask\app.py
    def request_context(self, environ):
        return RequestContext(self, environ)
    
    #flask\ctx.py
    def __init__(self, app, environ, request=None, session=None):
        self.app = app
        if request is None:
            request = app.request_class(environ)
        self.request = request
        self.url_adapter = None
        try:
            self.url_adapter = app.create_url_adapter(self.request)
        except HTTPException as e:
            self.request.routing_exception = e
        self.flashes = None
        # 设置session
        self.session = session
        self._implicit_app_ctx_stack = []
        self.preserved = False
        self._preserved_exc = None
        self._after_request_functions = []
    
    #flask\app.py
    def create_url_adapter(self, request):
        """Creates a URL adapter for the given request. The URL adapter
        is created at a point where the request context is not yet set
        up so the request is passed explicitly.
        """
        if request is not None:
            # If subdomain matching is disabled (the default), use the
            # default subdomain in all cases. This should be the default
            # in Werkzeug but it currently does not have that feature.
            subdomain = (
                (self.url_map.default_subdomain or None)
                if not self.subdomain_matching
                else None
            )
            return self.url_map.bind_to_environ(
                request.environ,
                server_name=self.config["SERVER_NAME"],
                subdomain=subdomain,
            )
        # We need at the very least the server name to be set for this
        # to work.
        if self.config["SERVER_NAME"] is not None:
            return self.url_map.bind(
                self.config["SERVER_NAME"],
                script_name=self.config["APPLICATION_ROOT"],
                url_scheme=self.config["PREFERRED_URL_SCHEME"],
            )
    

    其中url_map指向一个Map类,Map类存储了所有URL规则和一些配置参数。它的bind()作用是添加Server name等对所有规则生效的信息,返回MapAdapter:

    # werkzeug\routing.py
    def bind_to_environ(self, environ, server_name=None, subdomain=None):
        environ = _get_environ(environ)
        wsgi_server_name = get_host(environ).lower()
        scheme = environ["wsgi.url_scheme"]
    
        if server_name is None:
            server_name = wsgi_server_name
        else:
            server_name = server_name.lower()
    
            # strip standard port to match get_host()
            if scheme == "http" and server_name.endswith(":80"):
                server_name = server_name[:-3]
            elif scheme == "https" and server_name.endswith(":443"):
                server_name = server_name[:-4]
    
        if subdomain is None and not self.host_matching:
            cur_server_name = wsgi_server_name.split(".")
            real_server_name = server_name.split(".")
            offset = -len(real_server_name)
    
            if cur_server_name[offset:] != real_server_name:
                warnings.warn(
                    "Current server name '{}' doesn't match configured"
                    " server name '{}'".format(wsgi_server_name, server_name),
                    stacklevel=2,
                )
                subdomain = "<invalid>"
            else:
                subdomain = ".".join(filter(None, cur_server_name[:offset]))
    
        def _get_wsgi_string(name):
            val = environ.get(name)
            if val is not None:
                return wsgi_decoding_dance(val, self.charset)
    
        script_name = _get_wsgi_string("SCRIPT_NAME")
        path_info = _get_wsgi_string("PATH_INFO")
        query_args = _get_wsgi_string("QUERY_STRING")
        return Map.bind(
            self,
            server_name,
            script_name,
            subdomain,
            scheme,
            environ["REQUEST_METHOD"],
            path_info,
            query_args=query_args,
        )
    
    # werkzeug\routing.py
    def bind(
        self,
        server_name,
        script_name=None,
        subdomain=None,
        url_scheme="http",
        default_method="GET",
        path_info=None,
        query_args=None,
    ):
        """Return a new :class:`MapAdapter` with the details specified to the"""
        server_name = server_name.lower()
        if self.host_matching:
            if subdomain is not None:
                raise RuntimeError("host matching enabled and a subdomain was provided")
        elif subdomain is None:
            subdomain = self.default_subdomain
        if script_name is None:
            script_name = "/"
        if path_info is None:
            path_info = "/"
        try:
            server_name = _encode_idna(server_name)
        except UnicodeError:
            raise BadHost()
        return MapAdapter(
            self,
            server_name,
            script_name,
            subdomain,
            url_scheme,
            path_info,
            default_method,
            query_args,
        )
    

    MapAdapter主要有match和build两个方法,match根据输入的URL匹配endpoint;build根据传入的endpoint和参数构造URL。下一小节将会对其进行详细介绍。

    1. ctx.push()
      得到ctx后,回调函数调用了ctx.push():
    # flask\ctx.py
    def push(self):
        """Binds the request context to the current context."""
        top = _request_ctx_stack.top
        if top is not None and top.preserved:
            top.pop(top._preserved_exc)
    
        # Before we push the request context we have to ensure that there
        # is an application context.
        # 到_app_ctx_stack取栈顶数据,如果未取到或者取到的不是当前的app,
        # 就调用app.app_context()方法,
        # 就是新实例一个上下文app_ctx对象,再执行app_ctx.push()方法
        app_ctx = _app_ctx_stack.top
        if app_ctx is None or app_ctx.app != self.app:
            app_ctx = self.app.app_context()
            app_ctx.push()
            self._implicit_app_ctx_stack.append(app_ctx)
        else:
            self._implicit_app_ctx_stack.append(None)
    
        if hasattr(sys, "exc_clear"):
            sys.exc_clear()
        # push ctx到_request_ctx_stack
        _request_ctx_stack.push(self)
    
        # Open the session at the moment that the request context is available.
        # This allows a custom open_session method to use the request context.
        # Only open a new session if this is the first time the request was
        # pushed, otherwise stream_with_context loses the session.
        # 开启一个session
        if self.session is None:
            session_interface = self.app.session_interface
            self.session = session_interface.open_session(self.app, self.request)
    
            if self.session is None:
                self.session = session_interface.make_null_session(self.app)
    
        if self.url_adapter is not None:
            # 匹配 request
            self.match_request()
    

    _request_ctx_stack, _app_ctx_stack在项目启动时就会被创建。

    # flask/globals.py
    # context locals
    _request_ctx_stack = LocalStack()
    _app_ctx_stack = LocalStack()
    current_app = LocalProxy(_find_app)
    request = LocalProxy(partial(_lookup_req_object, "request"))
    session = LocalProxy(partial(_lookup_req_object, "session"))
    g = LocalProxy(partial(_lookup_app_object, "g"))
    
    # flask/session.py SecureCookieSessionInterface
    def open_session(self, app, request):
        s = self.get_signing_serializer(app)
        if s is None:
            return None
        val = request.cookies.get(app.session_cookie_name)
        if not val:
            return self.session_class()
        max_age = total_seconds(app.permanent_session_lifetime)
        try:
            data = s.loads(val, max_age=max_age)
            return self.session_class(data)
        except BadSignature:
            return self.session_class()
    

    请求第一次来时,request.cookies为空,即返回self.session_class():

    session_class = SecureCookieSession
    

    请求第二次到来时,当cookie不为空,且依然在有效期内,则通过与写入时同样的签名算法将cookie中的值解密出来并写入字典并返回中。

    match_request对request进行路由规则匹配

    # flask/ctx.py
    def match_request(self):
        """Can be overridden by a subclass to hook into the matching
        of the request.
        """
        try:
            result = self.url_adapter.match(return_rule=True)
            self.request.url_rule, self.request.view_args = result
        except HTTPException as e:
            self.request.routing_exception = e
    
    # werkzeug\routing.py MapAdapter
    def match(
        self,
        path_info=None,
        method=None,
        return_rule=False,
        query_args=None,
        websocket=None,
    ):
        self.map.update()
        if path_info is None:
            path_info = self.path_info
        else:
            path_info = to_unicode(path_info, self.map.charset)
        if query_args is None:
            query_args = self.query_args
        method = (method or self.default_method).upper()
    
        if websocket is None:
            websocket = self.websocket
    
        require_redirect = False
    
        path = u"%s|%s" % (
            self.map.host_matching and self.server_name or self.subdomain,
            path_info and "/%s" % path_info.lstrip("/"),
        )
    
        have_match_for = set()
        websocket_mismatch = False
    
        # 与 MapAdapter里的每一个rule进行匹配,直到匹配上
        for rule in self.map._rules:
            try:
                rv = rule.match(path, method)
            except RequestPath as e:
                raise RequestRedirect(
                    self.make_redirect_url(
                        url_quote(e.path_info, self.map.charset, safe="/:|+"),
                        query_args,
                    )
                )
            except RequestAliasRedirect as e:
                raise RequestRedirect(
                    self.make_alias_redirect_url(
                        path, rule.endpoint, e.matched_values, method, query_args
                    )
                )
            if rv is None:
                continue
            if rule.methods is not None and method not in rule.methods:
                have_match_for.update(rule.methods)
                continue
    
            if rule.websocket != websocket:
                websocket_mismatch = True
                continue
    
            if self.map.redirect_defaults:
                redirect_url = self.get_default_redirect(rule, method, rv, query_args)
                if redirect_url is not None:
                    raise RequestRedirect(redirect_url)
    
            if rule.redirect_to is not None:
                if isinstance(rule.redirect_to, string_types):
    
                    def _handle_match(match):
                        value = rv[match.group(1)]
                        return rule._converters[match.group(1)].to_url(value)
    
                    redirect_url = _simple_rule_re.sub(_handle_match, rule.redirect_to)
                else:
                    redirect_url = rule.redirect_to(self, **rv)
                raise RequestRedirect(
                    str(
                        url_join(
                            "%s://%s%s%s"
                            % (
                                self.url_scheme or "http",
                                self.subdomain + "." if self.subdomain else "",
                                self.server_name,
                                self.script_name,
                            ),
                            redirect_url,
                        )
                    )
                )
    
            if require_redirect:
                raise RequestRedirect(
                    self.make_redirect_url(
                        url_quote(path_info, self.map.charset, safe="/:|+"), query_args
                    )
                )
    
            if return_rule:
                return rule, rv
            else:
                return rule.endpoint, rv
    
        if have_match_for:
            raise MethodNotAllowed(valid_methods=list(have_match_for))
    
        if websocket_mismatch:
            raise WebsocketMismatch()
    
        raise NotFound()
    

    它调用Rule的match(), 与 MapAdapter里的每一个rule进行匹配,直到匹配上

    # werkzeug\routing.py Rule
    def match(self, path, method=None):
        """Check if the rule matches a given path. Path is a string in the
        form ``"subdomain|/path"`` and is assembled by the map.  If
        the map is doing host matching the subdomain part will be the host
        instead.
    
        If the rule matches a dict with the converted values is returned,
        otherwise the return value is `None`.
    
        :internal:
        """
        if not self.build_only:
            require_redirect = False
    
            m = self._regex.search(path)
            if m is not None:
                groups = m.groupdict()
                # we have a folder like part of the url without a trailing
                # slash and strict slashes enabled. raise an exception that
                # tells the map to redirect to the same url but with a
                # trailing slash
                if (
                    self.strict_slashes
                    and not self.is_leaf
                    and not groups.pop("__suffix__")
                    and (
                        method is None or self.methods is None or method in self.methods
                    )
                ):
                    path += "/"
                    require_redirect = True
                # if we are not in strict slashes mode we have to remove
                # a __suffix__
                elif not self.strict_slashes:
                    del groups["__suffix__"]
    
                result = {}
                for name, value in iteritems(groups):
                    try:
                        value = self._converters[name].to_python(value)
                    except ValidationError:
                        return
                    result[str(name)] = value
                if self.defaults:
                    result.update(self.defaults)
    
                if self.merge_slashes:
                    new_path = "|".join(self.build(result, False))
                    if path.endswith("/") and not new_path.endswith("/"):
                        new_path += "/"
                    if new_path.count("/") < path.count("/"):
                        path = new_path
                        require_redirect = True
    
                if require_redirect:
                    path = path.split("|", 1)[1]
                    raise RequestPath(path)
    
                if self.alias and self.map.redirect_defaults:
                    raise RequestAliasRedirect(result)
    
                return result
    
    1. full_dispatch_request()
    # flask\app.py
    def full_dispatch_request(self):
        """Dispatches the request and on top of that performs request
        pre and postprocessing as well as HTTP exception catching and
        error handling.
        """
        self.try_trigger_before_first_request_functions()
        try:
            # 发送请求开始的信号
            request_started.send(self)
            rv = self.preprocess_request()
            if rv is None:
                # 执行所匹配的URL对应的函数
                rv = self.dispatch_request()
        except Exception as e:
            rv = self.handle_user_exception(e)
        return self.finalize_request(rv)
    
    # flask\app.py
    def dispatch_request(self):
        """Does the request dispatching.  Matches the URL and returns the
        return value of the view or error handler.  This does not have to
        be a response object.  In order to convert the return value to a
        proper response object, call :func:`make_response`.
    
        """
        # 获取_request_ctx_stack栈顶的一个request
        req = _request_ctx_stack.top.request
        if req.routing_exception is not None:
            self.raise_routing_exception(req)
        # 得到请求所匹配的路由规则
        rule = req.url_rule
        # if we provide automatic options for this URL and the
        # request came with the OPTIONS method, reply automatically
        if (
            getattr(rule, "provide_automatic_options", False)
            and req.method == "OPTIONS"
        ):
            return self.make_default_options_response()
        # otherwise dispatch to the handler for that endpoint
        # 执行规则所对应的函数
        return self.view_functions[rule.endpoint](**req.view_args)
    
    # flask\app.py Flask
    def finalize_request(self, rv, from_error_handler=False):
        # 对返回值进行设置响应头等数据
        response = self.make_response(rv)
        try:
            response = self.process_response(response)
            # 发送请求结束的信号
            request_finished.send(self, response=response)
        except Exception:
            if not from_error_handler:
                raise
            self.logger.exception(
                "Request finalizing failed with an error while handling an error"
            )
        return response
    
    # flask\app.py Flask
    def process_response(self, response):
        """Can be overridden in order to modify the response object
        before it's sent to the WSGI server.  By default this will
        call all the :meth:`after_request` decorated functions.
        """
        ctx = _request_ctx_stack.top
        bp = ctx.request.blueprint
        funcs = ctx._after_request_functions
        if bp is not None and bp in self.after_request_funcs:
            funcs = chain(funcs, reversed(self.after_request_funcs[bp]))
        if None in self.after_request_funcs:
            funcs = chain(funcs, reversed(self.after_request_funcs[None]))
        for handler in funcs:
            response = handler(response)
        if not self.session_interface.is_null_session(ctx.session):
            self.session_interface.save_session(self, ctx.session, response)
        return response
    
    # flask/session.py SecureCookieSessionInterface
    def save_session(self, app, session, response):
        domain = self.get_cookie_domain(app)
        path = self.get_cookie_path(app)
    
        # If the session is modified to be empty, remove the cookie.
        # If the session is empty, return without setting the cookie.
        if not session:
            if session.modified:
                response.delete_cookie(
                    app.session_cookie_name, domain=domain, path=path
                )
    
            return
    
        # Add a "Vary: Cookie" header if the session was accessed at all.
        if session.accessed:
            response.vary.add("Cookie")
    
        if not self.should_set_cookie(app, session):
            return
    
        httponly = self.get_cookie_httponly(app)
        secure = self.get_cookie_secure(app)
        samesite = self.get_cookie_samesite(app)
        expires = self.get_expiration_time(app, session)
        val = self.get_signing_serializer(app).dumps(dict(session))
        response.set_cookie(
            app.session_cookie_name,
            val,
            expires=expires,
            httponly=httponly,
            domain=domain,
            path=path,
            secure=secure,
            samesite=samesite,
        )
    

    save_session()主要就是将session写入response.set_cookie中。这样便完成session的写入response工作,并由response返回至客户端。

    1. 请求结束时会执ctx.auto_pop()函数,将与对应请求相关的request,session清除,session生命周期便结束:
    # flask/ctx RequestContext
    def auto_pop(self, exc):
        if self.request.environ.get("flask._preserve_context") or (
            exc is not None and self.app.preserve_context_on_exception
        ):
            self.preserved = True
            self._preserved_exc = exc
        else:
            self.pop(exc)
    
        def pop(self, exc=_sentinel):
            """Pops the request context and unbinds it by doing that.  This will
            also trigger the execution of functions registered by the
            :meth:`~flask.Flask.teardown_request` decorator.
            """
            app_ctx = self._implicit_app_ctx_stack.pop()
    
            try:
                clear_request = False
                if not self._implicit_app_ctx_stack:
                    self.preserved = False
                    self._preserved_exc = None
                    if exc is _sentinel:
                        exc = sys.exc_info()[1]
                    self.app.do_teardown_request(exc)
    
                    if hasattr(sys, "exc_clear"):
                        sys.exc_clear()
    
                    request_close = getattr(self.request, "close", None)
                    if request_close is not None:
                        request_close()
                    clear_request = True
            finally:
                # 从_request_ctx_stack pop出request
                rv = _request_ctx_stack.pop()
    
                # get rid of circular dependencies at the end of the request
                # so that we don't require the GC to be active.
                if clear_request:
                    rv.request.environ["werkzeug.request"] = None
    
                # Get rid of the app as well if necessary.
                if app_ctx is not None:
                    app_ctx.pop(exc)
    
                assert rv is self, "Popped wrong request context. (%r instead of %r)" % (
                    rv,
                    self,
                )
    

    相关文章

      网友评论

          本文标题:Python Web——Flask源码解析

          本文链接:https://www.haomeiwen.com/subject/xtoeuktx.html