https://docs.python.org/zh-cn/3.8/library/asyncio-protocol.html
Protocol & Transport
python在asyncio库中,提供了一种简单的网络传输模型,协议与传输。
其定义:
在最顶层,传输只关心怎样传送字节内容,而协议决定传送哪些字节内容(还要在一定程度上考虑何时)。
也可以这样说:从传输的角度来看,传输是套接字(或类似的I/O终端)的抽象,而协议是应用程序的抽象。
换另一种说法,传输和协议一起定义网络I/0和进程间I/O的抽象接口。
传输对象和协议对象总是一对一关系:协议调用传输方法来发送数据,而传输在接收到数据时调用协议方法传递数据。
协议和传输,在socket的基础上进行了封装,是更高一层次的应用。
所以说: ASGI服务器并不是从socket基础层面实现通信,而是使用了asyncio中原生提供的一种网络通信方式。
传输Transport
Transport 类实例由如
loop.create_connection()
、loop.create_unix_connection()
、loop.create_server()
、loop.sendfile()
等这类事件循环方法使用或返回。
Transport类位于asyncio.transports
中,有例如BaseTransport
,WriteTransport
只写,ReadTransport
只读,Transport
继承于前两个只写和只读的Transport
协议Protocol
asyncio提供了一组抽象基类,应该用于实现网络协议。这些类应与Transport一起使用。
抽象基础协议类的子类可以实现某些或所有方法。所有这些方法都是回调:它们在某些事件(例如,接收到某些数据)时由Transport调用。基本协议方法应由相应的传输程序调用。
两种常见的协议接口基类
位于asyncio.Protocol
class BaseProtocol:
"""
协议接口的公共基类.
通常用户实现的协议派生自基本协议,如协议或过程协议.
应该直接实现BaseProtocol的唯一情况是像写管道那样的只写传输
"""
__slots__ = ()
def connection_made(self, transport):
"""
在建立连接时调用.
参数是表示管道连接的transport.
要接收数据,请等待data_received()调用.
当连接关闭时,调用connection_lost().
"""
def connection_lost(self, exc):
"""
当连接丢失或关闭时调用.
参数是一个异常对象或None(后者表示接收到常规EOF或中止或关闭连接).
"""
def pause_writing(self):
"""
中断写入:
当transport缓冲区超过高水位(high-water mark)时调用
暂停和恢复调用是成对的 -- 当缓冲区严格超过高水位标记时,
pause_writing()调用一次(即使后续的写操作会使缓冲区大小增加更多),
当缓冲区大小达到低水位标记时,最终调用resume_writing()一次。
注意:如果缓冲区大小等于高水位标记,就不会调用pause_writing()——它必须严格执行。
相反,当缓冲区大小等于或低于低水位标记时,将调用resume_writing()。
当任何一个标记为0时,这些结束条件对于确保事情按预期进行是重要的。
注意:这是唯一一个没有通过EventLoop.call_soon()调用的协议回调
如果是的话,在最需要的时候(当应用程序一直在写而不让步,直到pause_writing()被调用时),
它将没有任何效果.
"""
def resume_writing(self):
"""
恢复写入:
当transport缓冲区排放低于低水位线(low-water mark)时调用.
详见pause_writing().
"""
class Protocol(BaseProtocol):
"""
流协议的接口.
用户应该实现这个接口。它们可以继承这个类,
但不必要这样做。这里的实现什么也不做(它们不引发异常)
当用户希望请求transport时,他们会将protocol factory传递给一个实用程序函数
(例如,EventLoop.create_connection())。
当成功建立连接时,使用合适的transport对象调用connection_made()。
然后data_received()将被调用0次或更多次,使用从传输接收到的数据(字节);
最后,connection_lost()只被调用一次,使用异常对象或无异常对象作为参数。
调用顺序:
start -> CM [-> DR*] [-> ER?] -> CL -> end
* CM: connection_made()
* DR: data_received()
* ER: eof_received()
* CL: connection_lost()
"""
__slots__ = ()
def data_received(self, data):
"""
当接收到一些数据时调用.
参数是一个bytes对象.
"""
def eof_received(self):
"""
当另一端调用write_eof()或等效函数时调用.
如果返回一个假值(包括None),则传输将关闭自身。
如果它返回true值,则关闭传输取决于协议.
"""
loop.create_server()
接受protocol_factory,可以调用的工厂函数,其返回一个协议Protocol实例
async def create_server(
self, protocol_factory, host=None, port=None,
*, family=socket.AF_UNSPEC,
flags=socket.AI_PASSIVE, sock=None, backlog=100,
ssl=None, reuse_address=None, reuse_port=None,
ssl_handshake_timeout=None,
start_serving=True):
"""
一个协同程序,它创建一个绑定到主机和端口的TCP服务器。
返回值是一个server对象,可以用来停止服务。
如果host是空字符串或者没有,所有的接口都被假设,
并且多个套接字的列表将被返回(最有可能的是一个IPv4和另一个IPv6)。
主机参数也可以是要绑定到的主机序列(例如列表)。
family可以设置为AF_INET或AF_INET6,以强制套接字使用IPv4或IPv6。
如果没有设置,它将从主机(默认为AF_UNSPEC)确定。
flags是getaddrinfo()的位掩码。
sockot可以指定,以使用一个预先存在的套接字对象。
backlog是传递给listen()的队列连接的最大数量(默认为100)。
ssl可以设置为SSLContext,以在接受的连接上启用ssl。
reuse_address告诉内核在TIME_WAIT状态下重用本地套接字,而不等待其自然超时过期。
如果未指定,将在UNIX上自动设置为True。
reuse_port告诉内核允许这个端点绑定到与其他现有端点绑定到的相同的端口,
只要它们在创建时都设置了这个标志。Windows上不支持此选项。
ssl_handshake_timeout是SSL服务器在中止连接之前等待SSL握手完成的时间(以秒为单位)。默认是60s。
start_service设置为True(默认)会导致创建的服务器立即开始接受连接。
当设置为False时,用户应该等待server . start_service()
或server .serve_forever()使服务器开始接受连接。
"""
raise NotImplementedError
server对象是asyncio.base_events.Server
的实例
我简单写了个小例子,使用协议和传输,制作一个C/S
Server
class ServerProtocol(Protocol):
def connection_made(self, transport) -> None:
# 连接建立时的方法
print(transport._extra)
# 看一下transport的信息
self.transport = transport
def data_received(self, data: bytes) -> None:
# 接收到数据时,这里直接给C端返回去一条
message = f"服务器收到了您的信息:{data.decode()}"
print(message)
self.transport.write(message.encode())
async def main():
loop = asyncio.get_running_loop()
server = await loop.create_server(
lambda: ServerProtocol(),
'127.0.0.1', 8888)
# 之前提到过,第一个参数是
# “接受protocol_factory,可以调用的工厂函数,其返回一个协议Protocol实例”
# 套一层lambda,也算是一个简单的工厂函数。不可以直接传protocol实例
# 必须这样做,因为内部要进行,Protocol = protocol_factory()
async with server:
await server.serve_forever()
asyncio.run(main())
Clinet
class ClientProtocol(Protocol):
def __init__(self, on_con_lost):
self.count = 1
self.on_con_lost = on_con_lost
def connection_made(self, transport: transports.Transport) -> None:
print(transport._extra)
self.transport = transport
self.transport.write(f"客户端发出第{self.count}条信息".encode())
def data_received(self, data: bytes) -> None:
message = data.decode()
print(f"收到信息:{message}")
# 防止爆炸,接收到数据两秒再发新的
sleep(2)
self.count += 1
self.transport.write(f"客户端发出第{self.count}条信息".encode())
def connection_lost(self, exc):
print('服务器连接断开')
self.on_con_lost.set_result(True)
async def main():
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
transport, protocol = await loop.create_connection(
lambda: ClientProtocol(on_con_lost),
'127.0.0.1', 8888)
# 客户端用的是loop.create_connection
try:
await on_con_lost
finally:
transport.close()
asyncio.run(main())
Server
Client
我们打印下C/S两端transport的extra数据
为了方便观看调整了下key顺序
server端
{
'peername': ('127.0.0.1', 53253),
'sockname': ('127.0.0.1', 8888),
'socket': <asyncio.TransportSocket
fd=476,
family=AddressFamily.AF_INET,
type=SocketKind.SOCK_STREAM,
proto=0,
laddr=('127.0.0.1', 8888),
raddr=('127.0.0.1', 53253)
>
}
client端
{
'peername': ('127.0.0.1', 8888),
'sockname': ('127.0.0.1', 53253),
'socket': <asyncio.TransportSocket
fd=332,
family=AddressFamily.AF_INET,
type=SocketKind.SOCK_STREAM,
proto=6,
laddr=('127.0.0.1', 53253),
raddr=('127.0.0.1', 8888)
>
}
可以明确看到,使用了socket,说明socket的建立,已经是封装到内部的。
s端和c端的socket是完全对应的。
关于uvicorn
而H11是一个实现http协议库
uvicorn用了HTTP协议库做了相应的Protocol。交由asyncio提供的网络应用服务处理
网友评论