美文网首页Uvicorn 源码解读
Asyncio 协议Protocol 与 传输Transport

Asyncio 协议Protocol 与 传输Transport

作者: Gascognya | 来源:发表于2020-08-20 10:09 被阅读0次

    https://docs.python.org/zh-cn/3.8/library/asyncio-protocol.html

    Protocol & Transport

    python在asyncio库中,提供了一种简单的网络传输模型,协议与传输。

    其定义:
    在最顶层,传输只关心怎样传送字节内容,而协议决定传送哪些字节内容(还要在一定程度上考虑何时)。
    也可以这样说:从传输的角度来看,传输是套接字(或类似的I/O终端)的抽象,而协议是应用程序的抽象。
    换另一种说法,传输协议一起定义网络I/0和进程间I/O的抽象接口。
    传输对象协议对象总是一对一关系:协议调用传输方法来发送数据,而传输在接收到数据时调用协议方法传递数据。

    协议和传输,在socket的基础上进行了封装,是更高一层次的应用。
    所以说: ASGI服务器并不是从socket基础层面实现通信,而是使用了asyncio中原生提供的一种网络通信方式。

    传输Transport

    Transport 类实例由如 loop.create_connection()loop.create_unix_connection()loop.create_server()loop.sendfile()等这类事件循环方法使用或返回。

    Transport类位于asyncio.transports中,有例如BaseTransportWriteTransport只写,ReadTransport只读,Transport继承于前两个只写和只读的Transport

    协议Protocol

    asyncio提供了一组抽象基类,应该用于实现网络协议。这些类应与Transport一起使用。
    抽象基础协议类的子类可以实现某些或所有方法。所有这些方法都是回调:它们在某些事件(例如,接收到某些数据)时由Transport调用。基本协议方法应由相应的传输程序调用。

    两种常见的协议接口基类

    位于asyncio.Protocol

    class BaseProtocol:
        """
        协议接口的公共基类.
    
        通常用户实现的协议派生自基本协议,如协议或过程协议.
    
        应该直接实现BaseProtocol的唯一情况是像写管道那样的只写传输
        """
    
        __slots__ = ()
    
        def connection_made(self, transport):
            """
            在建立连接时调用.
    
            参数是表示管道连接的transport.
            要接收数据,请等待data_received()调用.
            当连接关闭时,调用connection_lost().
            """
    
        def connection_lost(self, exc):
            """
            当连接丢失或关闭时调用.
    
            参数是一个异常对象或None(后者表示接收到常规EOF或中止或关闭连接).
            """
    
        def pause_writing(self):
            """
            中断写入:
    
            当transport缓冲区超过高水位(high-water mark)时调用
    
            暂停和恢复调用是成对的 -- 当缓冲区严格超过高水位标记时,
            pause_writing()调用一次(即使后续的写操作会使缓冲区大小增加更多),
            当缓冲区大小达到低水位标记时,最终调用resume_writing()一次。
    
            注意:如果缓冲区大小等于高水位标记,就不会调用pause_writing()——它必须严格执行。
            相反,当缓冲区大小等于或低于低水位标记时,将调用resume_writing()。
            当任何一个标记为0时,这些结束条件对于确保事情按预期进行是重要的。
    
            注意:这是唯一一个没有通过EventLoop.call_soon()调用的协议回调
            如果是的话,在最需要的时候(当应用程序一直在写而不让步,直到pause_writing()被调用时),
            它将没有任何效果.
            """
    
        def resume_writing(self):
            """
            恢复写入:
    
            当transport缓冲区排放低于低水位线(low-water mark)时调用.
    
            详见pause_writing().
            """
    
    
    class Protocol(BaseProtocol):
        """
        流协议的接口.
    
        用户应该实现这个接口。它们可以继承这个类,
        但不必要这样做。这里的实现什么也不做(它们不引发异常)
    
        当用户希望请求transport时,他们会将protocol factory传递给一个实用程序函数
        (例如,EventLoop.create_connection())。
    
        当成功建立连接时,使用合适的transport对象调用connection_made()。
        然后data_received()将被调用0次或更多次,使用从传输接收到的数据(字节);
        最后,connection_lost()只被调用一次,使用异常对象或无异常对象作为参数。
    
        调用顺序:
    
          start -> CM [-> DR*] [-> ER?] -> CL -> end
    
        * CM: connection_made()
        * DR: data_received()
        * ER: eof_received()
        * CL: connection_lost()
        """
    
        __slots__ = ()
    
        def data_received(self, data):
            """
            当接收到一些数据时调用.
    
            参数是一个bytes对象.
            """
    
        def eof_received(self):
            """
            当另一端调用write_eof()或等效函数时调用.
    
            如果返回一个假值(包括None),则传输将关闭自身。
            如果它返回true值,则关闭传输取决于协议.
            """
    

    loop.create_server()

    接受protocol_factory,可以调用的工厂函数,其返回一个协议Protocol实例

    async def create_server(
            self, protocol_factory, host=None, port=None,
            *, family=socket.AF_UNSPEC,
            flags=socket.AI_PASSIVE, sock=None, backlog=100,
            ssl=None, reuse_address=None, reuse_port=None,
            ssl_handshake_timeout=None,
            start_serving=True):
        """
        一个协同程序,它创建一个绑定到主机和端口的TCP服务器。
    
        返回值是一个server对象,可以用来停止服务。
    
        如果host是空字符串或者没有,所有的接口都被假设,
        并且多个套接字的列表将被返回(最有可能的是一个IPv4和另一个IPv6)。
        主机参数也可以是要绑定到的主机序列(例如列表)。
    
        family可以设置为AF_INET或AF_INET6,以强制套接字使用IPv4或IPv6。
        如果没有设置,它将从主机(默认为AF_UNSPEC)确定。
    
        flags是getaddrinfo()的位掩码。
    
        sockot可以指定,以使用一个预先存在的套接字对象。
    
        backlog是传递给listen()的队列连接的最大数量(默认为100)。
    
        ssl可以设置为SSLContext,以在接受的连接上启用ssl。
    
        reuse_address告诉内核在TIME_WAIT状态下重用本地套接字,而不等待其自然超时过期。
        如果未指定,将在UNIX上自动设置为True。
    
        reuse_port告诉内核允许这个端点绑定到与其他现有端点绑定到的相同的端口,
        只要它们在创建时都设置了这个标志。Windows上不支持此选项。
    
        ssl_handshake_timeout是SSL服务器在中止连接之前等待SSL握手完成的时间(以秒为单位)。默认是60s。
    
        start_service设置为True(默认)会导致创建的服务器立即开始接受连接。
        当设置为False时,用户应该等待server . start_service()
        或server .serve_forever()使服务器开始接受连接。
        """
        raise NotImplementedError
    

    server对象是asyncio.base_events.Server的实例

    我简单写了个小例子,使用协议和传输,制作一个C/S

    Server

    class ServerProtocol(Protocol):
    
        def connection_made(self, transport) -> None:
            # 连接建立时的方法
            print(transport._extra)
            # 看一下transport的信息
            self.transport = transport
    
        def data_received(self, data: bytes) -> None:
            # 接收到数据时,这里直接给C端返回去一条
            message = f"服务器收到了您的信息:{data.decode()}"
            print(message)
            self.transport.write(message.encode())
    
    
    async def main():
        loop = asyncio.get_running_loop()
    
        server = await loop.create_server(
            lambda: ServerProtocol(),
            '127.0.0.1', 8888)
        # 之前提到过,第一个参数是
        # “接受protocol_factory,可以调用的工厂函数,其返回一个协议Protocol实例”
        # 套一层lambda,也算是一个简单的工厂函数。不可以直接传protocol实例
        # 必须这样做,因为内部要进行,Protocol = protocol_factory()
        async with server:
            await server.serve_forever()
    
    asyncio.run(main())
    
    

    Clinet

    class ClientProtocol(Protocol):
        def __init__(self, on_con_lost):
            self.count = 1
            self.on_con_lost = on_con_lost
    
        def connection_made(self, transport: transports.Transport) -> None:
            print(transport._extra)
            self.transport = transport
            self.transport.write(f"客户端发出第{self.count}条信息".encode())
    
        def data_received(self, data: bytes) -> None:
            message = data.decode()
            print(f"收到信息:{message}")
    
            # 防止爆炸,接收到数据两秒再发新的
            sleep(2)
            self.count += 1
            self.transport.write(f"客户端发出第{self.count}条信息".encode())
    
        def connection_lost(self, exc):
            print('服务器连接断开')
            self.on_con_lost.set_result(True)
    
    
    async def main():
        loop = asyncio.get_running_loop()
        on_con_lost = loop.create_future()
    
        transport, protocol = await loop.create_connection(
            lambda: ClientProtocol(on_con_lost),
            '127.0.0.1', 8888)
        # 客户端用的是loop.create_connection
        try:
            await on_con_lost
        finally:
            transport.close()
    
    asyncio.run(main())
    
    Server
    Client
    我们打印下C/S两端transport的extra数据

    为了方便观看调整了下key顺序

    server端
    {
        'peername': ('127.0.0.1', 53253), 
        'sockname': ('127.0.0.1', 8888),
        'socket': <asyncio.TransportSocket 
                    fd=476, 
                    family=AddressFamily.AF_INET, 
                    type=SocketKind.SOCK_STREAM, 
                    proto=0, 
                    laddr=('127.0.0.1', 8888), 
                    raddr=('127.0.0.1', 53253)
                    >
    }
    
    client端
    {
        'peername': ('127.0.0.1', 8888),
        'sockname': ('127.0.0.1', 53253),
        'socket': <asyncio.TransportSocket
                    fd=332,
                    family=AddressFamily.AF_INET,
                    type=SocketKind.SOCK_STREAM,
                    proto=6,
                    laddr=('127.0.0.1', 53253),
                    raddr=('127.0.0.1', 8888)
                    >
    }
    

    可以明确看到,使用了socket,说明socket的建立,已经是封装到内部的。
    s端和c端的socket是完全对应的。

    关于uvicorn

    而H11是一个实现http协议
    uvicorn用了HTTP协议库做了相应的Protocol。交由asyncio提供的网络应用服务处理

    相关文章

      网友评论

        本文标题:Asyncio 协议Protocol 与 传输Transport

        本文链接:https://www.haomeiwen.com/subject/wyarjktx.html