Uvicorn 基础工作原理

作者: Gascognya | 来源:发表于2020-08-21 14:36 被阅读0次

    Uvicorn运行时,分为两大部分。

    • 一个是uvicorn本身的循环,用于服务器自身状态的维护,不参与通信。
    • 另一个是由uvicorn建立的Asyncio的服务器,提交由h11或httptools协议库实现的Protocol。

    在上一章 Asyncio 协议Protocol 与 传输Transport 中我们提到了关于Asyncio提供的网络服务器。

    服务器启动

    main.py → run()是uvicorn的启动入口,他可以从uvicorn.run调用启动,也可以从命令行启动。
    之前的一篇 Uvicorn 基本了解介绍了关于如何启动Uvicorn。
    main.py → run()会创建若干个class Server的实例,通常为一个,代表Uvicorn服务器自身。

    Server会进行初始化,例如构建前半的中间件堆栈(starlette中的我们可以称之为后半)。初始化完成后,服务器会在main_loop()中不断循环来维护自身状态。
    在初始化阶段,Server会构建Asyncio的网络服务器。
    流程如下:

    config.py → Config类 → load
    if isinstance(self.http, str):
        # 设置HTTP协议实现。httptools实现提供了更高的性能,但与PyPy不兼容,并且需要在Windows上进行编译。
        # 选项: “ auto”,“ h11”,“ httptools”。 默认值: 'auto'。
        self.http_protocol_class = import_from_string(HTTP_PROTOCOLS[self.http])
    
    config.py → HTTP_PROTOCOLS
    HTTP_PROTOCOLS = {
        "auto": "uvicorn.protocols.http.auto:AutoHTTPProtocol",
        "h11": "uvicorn.protocols.http.h11_impl:H11Protocol",
        "httptools": "uvicorn.protocols.http.httptools_impl:HttpToolsProtocol",
    }
    
    main.py → Server类 → startup
    create_protocol = functools.partial(
                config.http_protocol_class, config=config, server_state=self.server_state
            )
    
    server = await loop.create_server(
        create_protocol,
        host=config.host,
        port=config.port,
        ssl=config.ssl,
        backlog=config.backlog,
    )
    

    到此为止,两大部分就都构建完成了,进入运行时状态

    H11Protocol

    class H11Protocol(asyncio.Protocol)是由uvicorn提供的一个基于H11协议库实现的Protocol,它是整个uvicorn通信的核心(如果选用httptools库就是HttpToolsProtocol)

    H11Protocol
    可以看到他实现了Protocol的接口,socket接收到的http报文会被传递至data_received(),这样,数据的传入就完成了。
        def data_received(self, data):
            if self.timeout_keep_alive_task is not None:
                self.timeout_keep_alive_task.cancel()
                self.timeout_keep_alive_task = None
    
            self.conn.receive_data(data)
            # 引用: self.conn = h11.Connection(h11.SERVER)
            # 调用h11,此步仅将数据加入接受缓冲区
            
            self.handle_events()
            # 开始处理
    

    可以看到,接收到数据,首先是将数据加入到缓冲区。然后再进行处理。

    def handle_events(self):
                    event = self.conn.next_event()
    
                    ......
    
                    self.scope = {
                        "type": "http",
                        "asgi": {
                            "version": self.config.asgi_version,
                            "spec_version": "2.1",
                        },
                        "http_version": event.http_version.decode("ascii"),
                        "server": self.server,
                        "client": self.client,
                        "scheme": self.scheme,
                        "method": event.method.decode("ascii"),
                        "root_path": self.root_path,
                        "path": unquote(raw_path.decode("ascii")),
                        "raw_path": raw_path,
                        "query_string": query_string,
                        "headers": self.headers,
                    }
    
                    ......
    
                    self.cycle = RequestResponseCycle(
                        scope=self.scope,
                        conn=self.conn,
                        transport=self.transport,
                        flow=self.flow,
                        logger=self.logger,
                        access_logger=self.access_logger,
                        access_log=self.access_log,
                        default_headers=self.default_headers,
                        message_event=self.message_event,
                        on_response=self.on_response_complete,
                    )
                    task = self.loop.create_task(self.cycle.run_asgi(app))
    

    RequestResponseCycle

    RequestResponseCycle

    我们可以看到其具有sendreceive两个方法。让人感到很熟悉。

        async def run_asgi(self, app):
                result = await app(self.scope, self.receive, self.send)
                ......
    

    这里的app,是指uvicorn构建的中间件堆栈,我们切回Config

        def load(self):
    
            ......
    
            # 开始中间件堆栈,和starlette中的构建方法完全一致
            # 最内层为wsgi/asgi中间件,其app属性指向传进来的app(starlette, fastapi等实例)
            # 实际上就是将两段app链相连接
            if self.interface == "wsgi":
                self.loaded_app = WSGIMiddleware(self.loaded_app)
                self.ws_protocol_class = None
            elif self.interface == "asgi2":
                self.loaded_app = ASGI2Middleware(self.loaded_app)
            if self.debug:
                self.loaded_app = DebugMiddleware(self.loaded_app)
            if logger.level <= TRACE_LOG_LEVEL:
                self.loaded_app = MessageLoggerMiddleware(self.loaded_app)
            if self.proxy_headers:
                self.loaded_app = ProxyHeadersMiddleware(
                    self.loaded_app, trusted_hosts=self.forwarded_allow_ips
                )
    

    起初这个loaded_appstarlette的实例。
    最终构成了ProxyHeadersMiddleware为起点,endpointApp为终点的App链(中间件堆栈包含于其中)
    Uvicorn中间件堆栈Starlette实例Starlette中间件堆栈Routerendpoint/cbv
    这就是整个服务器的全貌
    我们来看一send和receive方法

    Send

        # ASGI 接口
        async def send(self, message):
            message_type = message["type"]
    
            if self.flow.write_paused and not self.disconnected:
                await self.flow.drain()
    
            if self.disconnected:
                return
    
            # send函数,分为两个阶段,三次发送
            # 第一次发送type = http.response.start,发送request头
            # 第二次发送type = http.response.body,发送body
            # 然后查看是否还要more body,没有则发送b""
            if not self.response_started:
                # 发送response 状态行和 headers
                if message_type != "http.response.start":
                    msg = "Expected ASGI message 'http.response.start', but got '%s'."
                    raise RuntimeError(msg % message_type)
    
                self.response_started = True
                self.waiting_for_100_continue = False
    
                status_code = message["status"]
                headers = self.default_headers + message.get("headers", [])
    
                if self.access_log:
                    self.access_logger.info(
                        '%s - "%s %s HTTP/%s" %d',
                        get_client_addr(self.scope),
                        self.scope["method"],
                        get_path_with_query_string(self.scope),
                        self.scope["http_version"],
                        status_code,
                        extra={"status_code": status_code, "scope": self.scope},
                    )
    
                # 写入response状态行和 headers
                reason = STATUS_PHRASES[status_code]
                event = h11.Response(
                    status_code=status_code, headers=headers, reason=reason
                )
                output = self.conn.send(event)
                self.transport.write(output)
    
            elif not self.response_complete:
                # 发送 response body
                if message_type != "http.response.body":
                    msg = "Expected ASGI message 'http.response.body', but got '%s'."
                    raise RuntimeError(msg % message_type)
    
                body = message.get("body", b"")
                more_body = message.get("more_body", False)
    
                # 写入 response body
                if self.scope["method"] == "HEAD":
                    event = h11.Data(data=b"")
                else:
                    event = h11.Data(data=body)
                output = self.conn.send(event)
                self.transport.write(output)
    
                # 处理response完成
                if not more_body:
                    # body已全部发送完毕
                    self.response_complete = True
                    event = h11.EndOfMessage()
                    output = self.conn.send(event)
                    self.transport.write(output)
                    # 第三次发送,发送b""表示结束
            else:
                # Response 已经发送
                msg = "Unexpected ASGI message '%s' sent, after response already completed."
                raise RuntimeError(msg % message_type)
    
            if self.response_complete:
                if self.conn.our_state is h11.MUST_CLOSE or not self.keep_alive:
                    event = h11.ConnectionClosed()
                    self.conn.send(event)
                    self.transport.close()
                self.on_response()
    

    send中,包含了大量的服务器状态切换,其逻辑极为复杂。尤其是关于服务器的状态机方面,让人头昏眼花。
    在发送的时候,报文分至少三次发送:

    • 第一次,发送Headers,type为http.response.start(在starlette中见过),发送完毕后self.response_startedTrue
    • 第二次,发送Body,此过程可能持续多次。type为http.response.body,如果没有更多的body数据了,self.response_completeTrue
    • 第三次,按照HTTP协议,发送b' '空字节,表示发送完毕。整个发送过程完成。

    receive

        async def receive(self):
            if self.waiting_for_100_continue and not self.transport.is_closing():
                event = h11.InformationalResponse(
                    status_code=100, headers=[], reason="Continue"
                )
                output = self.conn.send(event)
                self.transport.write(output)
                self.waiting_for_100_continue = False
    
            if not self.disconnected and not self.response_complete:
                self.flow.resume_reading()
                await self.message_event.wait()
                self.message_event.clear()
    
            if self.disconnected or self.response_complete:
                message = {"type": "http.disconnect"}
            else:
                message = {
                    "type": "http.request",
                    "body": self.body,
                    "more_body": self.more_body,
                }
                self.body = b""
    
            return message
    

    receive的使用比较少,在传统http请求中基本不需要,主要是用于websocket,在此不多做介绍。

    到此为止,在starlette中看到的send,receive和scope三者的来源,我们已经搞清了。

    关于Uvicorn的更深一步探索,我们将将在未来进行。目前的准备还不够充分,贸然进行容易出现错误。

    相关文章

      网友评论

        本文标题:Uvicorn 基础工作原理

        本文链接:https://www.haomeiwen.com/subject/wmksjktx.html