美文网首页Uvicorn 源码解读
Asyncio 协议Protocol 与 传输Transport

Asyncio 协议Protocol 与 传输Transport

作者: Gascognya | 来源:发表于2020-08-20 10:09 被阅读0次

https://docs.python.org/zh-cn/3.8/library/asyncio-protocol.html

Protocol & Transport

python在asyncio库中,提供了一种简单的网络传输模型,协议与传输。

其定义:
在最顶层,传输只关心怎样传送字节内容,而协议决定传送哪些字节内容(还要在一定程度上考虑何时)。
也可以这样说:从传输的角度来看,传输是套接字(或类似的I/O终端)的抽象,而协议是应用程序的抽象。
换另一种说法,传输协议一起定义网络I/0和进程间I/O的抽象接口。
传输对象协议对象总是一对一关系:协议调用传输方法来发送数据,而传输在接收到数据时调用协议方法传递数据。

协议和传输,在socket的基础上进行了封装,是更高一层次的应用。
所以说: ASGI服务器并不是从socket基础层面实现通信,而是使用了asyncio中原生提供的一种网络通信方式。

传输Transport

Transport 类实例由如 loop.create_connection()loop.create_unix_connection()loop.create_server()loop.sendfile()等这类事件循环方法使用或返回。

Transport类位于asyncio.transports中,有例如BaseTransportWriteTransport只写,ReadTransport只读,Transport继承于前两个只写和只读的Transport

协议Protocol

asyncio提供了一组抽象基类,应该用于实现网络协议。这些类应与Transport一起使用。
抽象基础协议类的子类可以实现某些或所有方法。所有这些方法都是回调:它们在某些事件(例如,接收到某些数据)时由Transport调用。基本协议方法应由相应的传输程序调用。

两种常见的协议接口基类

位于asyncio.Protocol

class BaseProtocol:
    """
    协议接口的公共基类.

    通常用户实现的协议派生自基本协议,如协议或过程协议.

    应该直接实现BaseProtocol的唯一情况是像写管道那样的只写传输
    """

    __slots__ = ()

    def connection_made(self, transport):
        """
        在建立连接时调用.

        参数是表示管道连接的transport.
        要接收数据,请等待data_received()调用.
        当连接关闭时,调用connection_lost().
        """

    def connection_lost(self, exc):
        """
        当连接丢失或关闭时调用.

        参数是一个异常对象或None(后者表示接收到常规EOF或中止或关闭连接).
        """

    def pause_writing(self):
        """
        中断写入:

        当transport缓冲区超过高水位(high-water mark)时调用

        暂停和恢复调用是成对的 -- 当缓冲区严格超过高水位标记时,
        pause_writing()调用一次(即使后续的写操作会使缓冲区大小增加更多),
        当缓冲区大小达到低水位标记时,最终调用resume_writing()一次。

        注意:如果缓冲区大小等于高水位标记,就不会调用pause_writing()——它必须严格执行。
        相反,当缓冲区大小等于或低于低水位标记时,将调用resume_writing()。
        当任何一个标记为0时,这些结束条件对于确保事情按预期进行是重要的。

        注意:这是唯一一个没有通过EventLoop.call_soon()调用的协议回调
        如果是的话,在最需要的时候(当应用程序一直在写而不让步,直到pause_writing()被调用时),
        它将没有任何效果.
        """

    def resume_writing(self):
        """
        恢复写入:

        当transport缓冲区排放低于低水位线(low-water mark)时调用.

        详见pause_writing().
        """


class Protocol(BaseProtocol):
    """
    流协议的接口.

    用户应该实现这个接口。它们可以继承这个类,
    但不必要这样做。这里的实现什么也不做(它们不引发异常)

    当用户希望请求transport时,他们会将protocol factory传递给一个实用程序函数
    (例如,EventLoop.create_connection())。

    当成功建立连接时,使用合适的transport对象调用connection_made()。
    然后data_received()将被调用0次或更多次,使用从传输接收到的数据(字节);
    最后,connection_lost()只被调用一次,使用异常对象或无异常对象作为参数。

    调用顺序:

      start -> CM [-> DR*] [-> ER?] -> CL -> end

    * CM: connection_made()
    * DR: data_received()
    * ER: eof_received()
    * CL: connection_lost()
    """

    __slots__ = ()

    def data_received(self, data):
        """
        当接收到一些数据时调用.

        参数是一个bytes对象.
        """

    def eof_received(self):
        """
        当另一端调用write_eof()或等效函数时调用.

        如果返回一个假值(包括None),则传输将关闭自身。
        如果它返回true值,则关闭传输取决于协议.
        """

loop.create_server()

接受protocol_factory,可以调用的工厂函数,其返回一个协议Protocol实例

async def create_server(
        self, protocol_factory, host=None, port=None,
        *, family=socket.AF_UNSPEC,
        flags=socket.AI_PASSIVE, sock=None, backlog=100,
        ssl=None, reuse_address=None, reuse_port=None,
        ssl_handshake_timeout=None,
        start_serving=True):
    """
    一个协同程序,它创建一个绑定到主机和端口的TCP服务器。

    返回值是一个server对象,可以用来停止服务。

    如果host是空字符串或者没有,所有的接口都被假设,
    并且多个套接字的列表将被返回(最有可能的是一个IPv4和另一个IPv6)。
    主机参数也可以是要绑定到的主机序列(例如列表)。

    family可以设置为AF_INET或AF_INET6,以强制套接字使用IPv4或IPv6。
    如果没有设置,它将从主机(默认为AF_UNSPEC)确定。

    flags是getaddrinfo()的位掩码。

    sockot可以指定,以使用一个预先存在的套接字对象。

    backlog是传递给listen()的队列连接的最大数量(默认为100)。

    ssl可以设置为SSLContext,以在接受的连接上启用ssl。

    reuse_address告诉内核在TIME_WAIT状态下重用本地套接字,而不等待其自然超时过期。
    如果未指定,将在UNIX上自动设置为True。

    reuse_port告诉内核允许这个端点绑定到与其他现有端点绑定到的相同的端口,
    只要它们在创建时都设置了这个标志。Windows上不支持此选项。

    ssl_handshake_timeout是SSL服务器在中止连接之前等待SSL握手完成的时间(以秒为单位)。默认是60s。

    start_service设置为True(默认)会导致创建的服务器立即开始接受连接。
    当设置为False时,用户应该等待server . start_service()
    或server .serve_forever()使服务器开始接受连接。
    """
    raise NotImplementedError

server对象是asyncio.base_events.Server的实例

我简单写了个小例子,使用协议和传输,制作一个C/S

Server

class ServerProtocol(Protocol):

    def connection_made(self, transport) -> None:
        # 连接建立时的方法
        print(transport._extra)
        # 看一下transport的信息
        self.transport = transport

    def data_received(self, data: bytes) -> None:
        # 接收到数据时,这里直接给C端返回去一条
        message = f"服务器收到了您的信息:{data.decode()}"
        print(message)
        self.transport.write(message.encode())


async def main():
    loop = asyncio.get_running_loop()

    server = await loop.create_server(
        lambda: ServerProtocol(),
        '127.0.0.1', 8888)
    # 之前提到过,第一个参数是
    # “接受protocol_factory,可以调用的工厂函数,其返回一个协议Protocol实例”
    # 套一层lambda,也算是一个简单的工厂函数。不可以直接传protocol实例
    # 必须这样做,因为内部要进行,Protocol = protocol_factory()
    async with server:
        await server.serve_forever()

asyncio.run(main())

Clinet

class ClientProtocol(Protocol):
    def __init__(self, on_con_lost):
        self.count = 1
        self.on_con_lost = on_con_lost

    def connection_made(self, transport: transports.Transport) -> None:
        print(transport._extra)
        self.transport = transport
        self.transport.write(f"客户端发出第{self.count}条信息".encode())

    def data_received(self, data: bytes) -> None:
        message = data.decode()
        print(f"收到信息:{message}")

        # 防止爆炸,接收到数据两秒再发新的
        sleep(2)
        self.count += 1
        self.transport.write(f"客户端发出第{self.count}条信息".encode())

    def connection_lost(self, exc):
        print('服务器连接断开')
        self.on_con_lost.set_result(True)


async def main():
    loop = asyncio.get_running_loop()
    on_con_lost = loop.create_future()

    transport, protocol = await loop.create_connection(
        lambda: ClientProtocol(on_con_lost),
        '127.0.0.1', 8888)
    # 客户端用的是loop.create_connection
    try:
        await on_con_lost
    finally:
        transport.close()

asyncio.run(main())
Server
Client
我们打印下C/S两端transport的extra数据

为了方便观看调整了下key顺序

server端
{
    'peername': ('127.0.0.1', 53253), 
    'sockname': ('127.0.0.1', 8888),
    'socket': <asyncio.TransportSocket 
                fd=476, 
                family=AddressFamily.AF_INET, 
                type=SocketKind.SOCK_STREAM, 
                proto=0, 
                laddr=('127.0.0.1', 8888), 
                raddr=('127.0.0.1', 53253)
                >
}
client端
{
    'peername': ('127.0.0.1', 8888),
    'sockname': ('127.0.0.1', 53253),
    'socket': <asyncio.TransportSocket
                fd=332,
                family=AddressFamily.AF_INET,
                type=SocketKind.SOCK_STREAM,
                proto=6,
                laddr=('127.0.0.1', 53253),
                raddr=('127.0.0.1', 8888)
                >
}

可以明确看到,使用了socket,说明socket的建立,已经是封装到内部的。
s端和c端的socket是完全对应的。

关于uvicorn

而H11是一个实现http协议
uvicorn用了HTTP协议库做了相应的Protocol。交由asyncio提供的网络应用服务处理

相关文章

网友评论

    本文标题:Asyncio 协议Protocol 与 传输Transport

    本文链接:https://www.haomeiwen.com/subject/wyarjktx.html