简明网络I/O与并发 --- I/O
简明网络I/O与并发 --- 并发
计算机的基本组成其实很简单,处理器,存储器加上输入输出设备,就能构成计算机。大至超级计算机,小到手机等都是一样的模型。计算的本质就是从输入设备读取数据处理然后输出。可以理解理解计算机做的事情就是IO和计算。
在网络发明之前,计算机从存储设备中读取数据,进程通过内存的通道进行通信。互联网诞生之后,越来越多计算机通过互联网连接,将数据传输到世界各地。计算机之间可以通信,本质上也是计算机进程相互通信。为了方便不同终端进行通信,网络协议栈抽象出socket层,通过对socket文件描述符的操作来进行网络IO。当然,不同的应用场景,衍生出了不同的网络模型。
本文的描述发起IO进程,也可以描述为发起IO的线程
一次网络响应
互联网应用中,多数架构是CS模式,即client发出请求,server接受请求,处理之后返回响应。这样的一次交互,伴随着client和server的IO操作。对于常见的爬虫,client将尽可能提升其并发发送请求IO的能力,对于角色类似被爬虫对象那些后端server,也需要尽可能提升其并发处理多client请求的能力。
例如有个用户,发送了一个请求,请求了服务器上的一个文件,这样的一个交互过程如图:
server-client.jpg左边的图使用了 python3 -m http.server
创建了一个监听8000的server,后端的client请求了服务器path为/hello.txt
的文件。具体是如何实现的呢?当server开启服务之后,就会监听来自8000端口请求,client把请求发送给server,server再从自己的磁盘上读取 hello.txt 文件,然后返回给客户端。这样一次简单的交互,涉及了网络IO和磁盘文件的IO。大致流程如下图:
上图只表述了server处理响应的过程:
- server的进程发起Read系统调用,内核随即从硬件Disk读取数据到内核缓冲区(kernel buf)
- 内核再把缓冲区的数据copy到应用程序进程的缓冲区,应用程序就可以对数据进行修改。
- 应用进程将数据通过系统调用Send发送到socket缓冲区,每个socket文件都在内核维护了一个发送/接受缓冲区。
- 最后再把socket发送缓冲区的数据copy到NIC网卡中,通过协议栈发送到对端的网卡中。
- 对端的网卡接收数据中,client也会发起一个Recv的系统调用,然后内核会从网卡中读取数据,然后copy到应用程序的缓冲区。
整个过程,数据在三个主要层次流动,即硬件,内核,应用。在流动的过程中,从一个层流向另一个层即为IO操作。
DMA Direct Memory Access,直接内存访问方式,即现在的计算机硬件设备,可以独立地直接读写系统内存,而不需CPU完全介入处理。也就是数据从DISK或者NIC从把数据copy到内核buf,不需要计算机cpu的参与,而是通过设备上的芯片(cpu)参与。对于内核来说,这样的数据读取过程中,cpu可以做别的事情。
一次I/O过程
通过上面的数据流动,可以看到IO的基本方式,那么什么是IO呢?通常现代的程序软件都运行在内存里,内存又分为用户态和内核态,后者隶属于操作系统。所谓的IO,就是将硬件(磁盘、网卡)的数据读取到程序的内存中。
因为应用程序很少可以直接和硬件交互,因此操作系统作为两者的桥梁。通常操作系统在对接两端(应用程序与硬件)时,自身有一个内核buf,用于数据的copy中转。
basic-io.jpg应用的读IO操作,即将网卡的数据,copy到应用的进程buf,中途会经过内核的buf。
- 应用进行发起read系统调用。
- 内核接受应用的请求,如果内核buf有数据,则把数据copy到应用buf中,调用结束。
- 如果内核buf中没有数据,会向io模块发送请求,io模块和硬件交互。
4.当NIC接收到协议栈的数据后, NIC 会通过DMA 技术将数据copy到内核 buf 中 - 内核将内核buf的数据copy到应用的buf中,调用结束。
一般网络IO分为两个阶段,等待数据阶段和拷贝数据阶段。前者是指数据通过协议栈发送到网卡,网卡再通过DMAcopy到内核buf。后者是将内核buf的数据copy到进程buf中。
数据发送到nic设备的过程由协议栈支持,在操作系统层面实现。对于第二阶段拷贝数据的过程,进程的行为很重要,如果进程阻塞,那么将是同步调用,否则则是异步调用。后面会再说明。
I/O 基本模型
《UNIX网络编程》中提到了5中基本的网络I/O模型,主要分为同步和异步I/O:
- 阻塞I/O(blocking)
- 非阻塞I/O(nonblocking)
- 多路复用I/O(multiplexing)
- 信号驱动I/O(SIGIO)
- 异步I/O(asynchronous)
好用的是第一种,代码逻辑简单,符合人的思考方式。现实中常用的是第三种,第二种不太好用,第四种也很少,第五种不太成熟。下面针对具体的方式逐一简介。
阻塞I/O
前面已经介绍,IO过程分为两个阶段,等待数据准备和数据拷贝过程。这里涉及两个对象,其一是发起IO操作的进程(线程),其二是内核对象。所谓阻塞是指进程在两个阶段都阻塞,即线程挂起,不能做别的事情。
blocking.jpg红色的虚线表示io函数调用过程,加粗的红线表示数据从内核buf拷贝到应用buf的过程,改过程阻塞线程。简书把图片压缩得不要看了
进程对象发起 Recv操作,这是一个系统调用,然后内核会看内核buf是否有数据,如果没有数据,那么进程将会被挂起,直到内核buf从硬件或者网络读取到数据之后,内核再把数据从内核buf拷贝到进程buf中,然后唤醒发起调用的进程,并且Recv操作将会返回数据。接下来进行可以对进程buf的数据进行处理。
一个单线程同步阻塞server:
import socket
address = ('', 5000)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(100)
while True:
conn, addr = sock.accept()
print('client {} connect'.format(conn.fileno()))
while True:
data = conn.recv(1024)
if not data:
print('client {} closed'.format(conn.fileno()))
break
else:
print('data is ', data)
conn.sendall(data)
conn.close()
break
conn.close()
accept 和 recv 两个socket上的函数调用之后,如果没有数据返回,线程将会阻塞,直到有数据到达。
server的socket套接字有两种,一种是 监听套接字 (sock),它有一个accept方法,该方法的作用就是从已握手的队列中取出一个连接,另外一种是连接套接字(conn),即accept方法返回的socket。
非阻塞I/O
线程在blockingIO中,发起了IO调用之后随即被挂起,不能做别的。在nonblockingIO中,如果没有io数据,那么发起的系统调用也会马上返回,会返回一个EWOULDBLOCK
错误。函数返回之后,线程没有被挂起,当然是可以继续做别的。
正如图上所示,在真实环境中,进程发起了非阻塞io请求,返回了EWOULDBLOCK之后,将会继续再次发起非阻塞的io请求,这个过程还是会使用CPU,因此也称之为轮询(polling)。当内核有数据的时候,内核将内核buf的数据copy到应用buf的过程还是需要cpu参与,这个过程对于nonblockingio来说,线程仍然是阻塞的。
非阻塞IO的一个简单应用:
import socket
import errno
address = ('', 5000)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(False)
sock.bind(address)
sock.listen(100)
while True:
try:
conn, addr = sock.accept()
except socket.error as e:
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
continue
else:
raise
print('client {} connect'.format(conn.fileno()))
conn.setblocking(False)
while True:
try:
data = conn.recv(1024)
except socket.error as e:
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
continue
else:
raise
if not data:
print('client {} closed'.format(conn.fileno()))
break
else:
print('data is ', data)
conn.sendall(data)
conn.close()
break
conn.close()
由此可见,当数据尚未准备好的时候,CPU将会在不同的轮询。accept和recv都在循环中。
多路复用I/O
阻塞IO会让线程挂起不能做别的,非阻塞IO则提供了新的思路,函数调用之后就返回,可是为了完成IO,需要不同的polling。每次轮询都是一次系统调用。某种程度下,非阻塞IO的性能将还不如阻塞IO。既然需要内核频繁操作,那么就有人想出了新的模型。
让内核代理去做轮询,然后应用进程只有数据准备了再发起IO操作不就好了吗?的确,多路复用IO就是这样的原理。由内核负责监控应用指定的socket文件描述符,当socket准备好数据(可读,可写,异常)的时候,通知应用进程。准备好数据是一个事件,当事件发生的时候,通知应用进程,而应用进程可以根据事件事先注册回调函数。
multiplexing.jpg进程发起了 select或poll或者epoll调用之后,可以设置阻塞进程。当内核数据准备好的时候通知应用进程,即事件发生。应用进程注册了回调函数,这里是 recv回调函数。因此进程可以再次发起recv系统调用。后面这个过程前面的阻塞非阻塞调用一样。只不过这里通常一定是可以读到数据,非阻塞的方式也不会返回错误。但是整个copy过程,进程还是阻塞的。
对于单个io请求,这样的做法其实并没有多大优势,甚至还不如阻塞IO。不过多路复用的好处在于多路,即可以同时监听多个socket描述符,当大量描述符可读可写事件发生的时候,更有利于服务器的并发性能。
多路复用I/O的本质就是
多路监听 + 阻塞/非阻塞IO
。多路监听即select,poll,epoll这些系统调用。后面的才是真正的IO,红色的线表示,即前文介绍的阻塞或者非阻塞IO。
下面是一个poll的例子:
import functools
import select
import socket
class Server:
def __init__(self):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._poll = select.poll()
self._handlers = {}
self._fd_events = {}
self._bytes_received = {}
self._bytes_to_send = {}
def start(self):
sock = self._sock
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(False)
sock.bind(('', 5000))
sock.listen(100)
handlers = self._handlers
poll = self._poll
self.add_handler(sock.fileno(), self._accept, select.POLLIN)
while True:
poll_events = poll.poll(1)
for fd, event in poll_events:
if event & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
rb = self._bytes_received.pop(fd, b'')
sb = self._bytes_to_send.pop(fd, b'')
if rb:
print(f'Client {fd} sent {rb} but then closed')
elif sb:
print(f'Client {fd} closed before we sent {sb}')
else:
print(f'Client {fd} closed socket normally')
self.unregister(fd)
else:
handler = handlers.get(fd)
if handler:
handler()
def _accept(self, ):
while True:
try:
conn, addr = self._sock.accept()
except OSError:
break
else:
conn.setblocking(0)
fd = conn.fileno()
self.add_handler(fd, functools.partial(self._read, conn), select.POLLIN)
def _read(self, conn):
fd = conn.fileno()
more_data = conn.recv(10)
if not more_data:
print('client close')
conn.close()
return
data = self._bytes_received.pop(conn.fileno(), b'') + more_data
if data.endswith(b'\r\n\r\n'):
self._bytes_to_send[conn.fileno()] = data
self.remove_handler(fd)
self.add_handler(fd, functools.partial(self._write, conn, data), select.POLLOUT)
else:
self._bytes_received[conn.fileno()] = data
def _write(self, conn, data):
fd = conn.fileno()
self.remove_handler(fd)
data = self._bytes_to_send.pop(conn.fileno())
n = conn.send(data)
if n < len(data):
self._bytes_to_send[conn.fileno()] = data[n:]
self.add_handler(fd, functools.partial(self._write, conn, data), select.POLLOUT)
else:
conn.close()
def add_handler(self, fd, handler, event):
self._handlers[fd] = handler
self.register(fd, event)
def remove_handler(self, fd):
self._handlers.pop(fd, None)
self.unregister(fd)
def register(self, fd, event):
if fd in self._fd_events:
raise IOError(f"fd {fd} already registered")
self._poll.register(fd, event)
self._fd_events[fd] = event
def unregister(self, fd):
event = self._fd_events.pop(fd, None)
if event is not None:
self._poll.unregister(fd)
if __name__ == '__main__':
server = Server()
server.start()
从代码逻辑来看,poll_events = poll.poll(1)
调用设置超时时间,如果不设置,那么将会阻塞。当描述符数据准备好的时候,poll会返回fd和event的组合。例如监听socket绑定的回调是_accept,而在_accept函数中,又绑定了连接套接字的_read函数,当可读的时候调用了_read进行完成之后,又会绑定其_write函数。
上述代码每次recv10字节数据,显然没有读完,因此_read函数返回之后,下一次事件循环,poll将还会返回连接套接字可读事件,后者将会再次调用_read函数。直到读取完所有数据。这种编码方式跟同步阻塞的方式相差挺大的,各种回调函数的设置,让这个代码结构不那么顺序化。
当然 select poll epoll更多时候是配合非阻塞的方式使用。如下图:
multiplexing-unblocking.jpg一般多路复用IO都是配合非阻塞IO使用。因为读写socket的时候,并不确定读到什么时候才能读完。在一个循环里读,如果设置为阻塞模式,那么进程将会被挂起。比较好的做法是设置成非阻塞,一旦读写返回了EWOULDBLOCK,进行yield,然后切换到别的地方。直到下一次事件循环。下面是借助python的yield实现的一个简单的case:
import collections
import socket
import select
import types
import errno
class Stream:
def __init__(self, sock, loop):
sock.setblocking(False)
self._sock = sock
self._loop = loop
def close(self):
self._sock.close()
def read(self, size=10):
while True:
sock = self._sock
fd = sock.fileno()
try:
more_data = sock.recv(size)
print('more data', self, more_data)
except OSError as e:
if e.args[0] not in (errno.EAGAIN, errno.EWOULDBLOCK):
raise
else:
yield
else:
data = self._loop._bytes_received.get(fd, b'') + more_data
if data.endswith(b'\r\n\r\n'):
self._loop.remove_handler(fd)
return data
else:
self._loop._bytes_received[fd] = data
yield
def write(self, data):
sock = self._sock
fd = sock.fileno()
try:
while True:
try:
send_bytes = sock.send(data)
except OSError as e:
if e.errno not in (socket.EWOULDBLOCK, socket.EAGAIN):
raise
else:
yield
else:
if send_bytes == len(data):
return
data = data[send_bytes:]
self._loop.add_handler(fd, self.write(data), select.POLLOUT)
yield
finally:
self._loop.remove_handler(fd)
class IOLoop:
def __init__(self):
self._poll = select.poll()
self._handlers = {}
self._fd_events = {}
self._bytes_received = {}
self._bytes_to_send = {}
def start(self):
handlers = self._handlers
poll = self._poll
while True:
poll_events = poll.poll(1)
for fd, event in poll_events:
# 错误处理
if event & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
rb = self._bytes_received.pop(fd, b'')
sb = self._bytes_to_send.pop(fd, b'')
if rb:
print(f'Client {fd} sent {rb} but then closed')
elif sb:
print(f'Client {fd} closed before we sent {sb}')
else:
print(f'Client {fd} closed socket normally')
self.unregister(fd)
else:
handler = handlers.get(fd)
if handler:
if callable(handler):
handler()
else:
stack = handler
while True:
generator, value = stack[-1]
try:
yield_value = generator.send(value)
if isinstance(yield_value, types.GeneratorType):
stack.append([yield_value, None])
else:
break
except StopIteration as e:
stack.pop()
if stack:
stack[-1][-1] = e.value
else:
break
def add_handler(self, fd, handler, event):
if isinstance(handler, types.GeneratorType):
self._handlers[fd] = collections.deque([[handler, None]])
else:
self._handlers[fd] = handler
self.register(fd, event)
def remove_handler(self, fd):
self._handlers.pop(fd, None)
self.unregister(fd)
def register(self, fd, event):
if fd in self._fd_events:
raise IOError(f'fd {fd} already registered')
self._poll.register(fd, event)
self._fd_events[fd] = event
def unregister(self, fd):
event = self._fd_events.pop(fd, None)
if event is not None:
self._poll.unregister(fd)
def modify(self, fd, event):
self._poll.modify(fd, event)
self._fd_events[fd] = event
class Server:
def __init__(self):
self._sock = socket.socket()
self._loop = IOLoop()
self._stream = Stream(self._sock, self._loop)
def start(self):
sock = self._sock
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(True)
sock.bind(('', 5000))
sock.listen(100)
self._loop.add_handler(sock.fileno(), self._accept, select.POLLIN)
self._loop.start()
def _accept(self):
while True:
try:
conn, addr = self._sock.accept()
except OSError:
break
else:
stream = Stream(conn, self._loop)
fd = conn.fileno()
self._loop.add_handler(fd, self._handle(stream), select.POLLIN)
def _handle(self, stream):
data = yield stream.read()
yield stream.write(data)
if __name__ == '__main__':
s = Server()
s.start()
在 read 方法中,如果 recv 返回了EWOULDBLOCK,那么进程将会yield,然后跳转到别的逻辑进行执行,直到下一次事件循环可读的时候,再从yield挂起除继续执行,继续调用recv。与前一个例子相比,后者在read的代码逻辑和阻塞式IO更像。当然,借助yield黑魔法,还可以将异步回调代码写得跟同步一样,这就是协程的编码方式。python3.5+则通过语言层面实现了协程的支持。
多路复用IO几乎成为了主流的server方式。尤其是epoll,成为了Nginx,Redis, Tornado等软件高性能的基石。
信号IO
让内核在描述符就绪时发送SIGIO信号通知进程。这种模型为信号驱动式I/O(signal-driven I/O),和事件驱动类似,也是一种回调方式。与非阻塞方式不一样的地方是,发起了信号驱动的系统调用,进程没有挂起,可以做的事情,可是实际中,代码逻辑通常还是主循环,主循环里可能还是会阻塞。因此使用这样的IO的软件很少。
可是当信号返回可以读写的时候,因为还需要cpu将内核数据copy到应用buf。这个过程毫无疑问还是阻塞的。
signal.jpg异步I/O
前面一直强调,内核在copy数据从内核buf到应用buf的过程中,cpu需要参与,进程都会被阻塞。因此可以理解,进程和内核的步调是一致,也就是同步。这样的IO模型统称之为同步I/O。那么什么是异步I/O呢?
Unix下的异步I/O模型如下:
async.jpg图中的IO调用函数的红线只出现在第一步中。
即无论是第一阶段数据准备还是第二阶段数据拷贝,发起系统调用的进程都不会被阻塞。在第二阶段的过程中,进程没有阻塞,那么可以抢占CPU,而内核copy数据的时候,也需要CPU,这就造成了应用和内核进行CPU竞争,并且步调不一致了。某些情况下,其性能反而不如其他IO模式。使用的人也很少。
并发
前面介绍的IO模型,列举了server使用的几个方式。从代码结构来看,这样的server都是单进程的。现实中为了实现并发技术,有的程序也会借助多线程多进程方式。关于更多的服务器并发处理模型,已经如何使用poll/epoll将会在后面的文档中介绍。
多路复用IO不仅配合非阻塞IO使用,很多时候也配合单进程单线程使用协程的方式,避免的线程进程的上下文切换带来的性能折损。
总结
随着服务产品用户流量和用户群扩大,文中梳理了unix常见的几种网络IO,并且针对这些IO进行了简单的介绍和简要的使用说明。
首先说了什么是IO,即 应用内存--内核缓存--硬件数据 三者之间的数据流动。三者正好组成了两个阶段,以读为例子,数据从硬件到内核buf过程为数据准备状态,由内核拷贝到应用为数据复制阶段。
在进程发起IO请求时,在第一阶段数据等待时是否挂起分为阻塞和非阻塞
数据等待阶段
- 阻塞:进程挂起
- 非阻塞:进程不挂起,立即返回,返回EWOULDBLOCK
在第二个阶段数据拷贝过程时,发起IO请求的进程是否阻塞。
数据拷贝阶段:
- 阻塞进程:同步,内核拷贝数据占用CPU
- 不阻塞进程:异步,进程可能和内核竞争CPU
这些组合就是常见的名词,同步阻塞,同步非阻塞,异步非阻塞。其实真实的软件世界里,阻塞式IO基本都是同步的,异步也都是非阻塞。
同步非阻塞就是十分常见的多路复用结合非阻塞IO实现的方案,也称之为事件驱动。同步有利于逻辑的书写,非阻塞有利于调用率实现并发。因此现实中更多的IO模型是多路复用IO,并且在发展过程中,select,poll和epoll是逐步进化链。epoll实现了内核级数据结构优化,在实际性能上又了很大的提升。这些都会在后面的文档介绍。
互联网应用出现实现了很多杀手级的产品和巨型公司。这些产品因为使用的人多而带来了很多技术革新,十年前C10K还是无需考虑的问题,如今很多应用都面临C10K,C100K,甚至是C1000K的挑战。并发量越来越来,服务IO的并发模型也有多种实现,对接下将讨论的并发模型。本文对基本IO的介绍,有利于了解当前不同高性能服务器的技术选型和服务原理。
后记:
网络上很多文字解释同步异步,阻塞非阻塞的概念,也有很多生动的类比。曾经我也试图使用生活中的例子类类比网络IO,虽然对于初次接入者有一定的友好性,现在看来这些类比其实是本末倒置,南辕北辙。现实中很少有完全匹配的case,因此为了类比反而会对其基本原理含糊其辞。最好的认知方式就是代码。
很多人将多路复用I/O归结为异步IO,与《Unix网络编程》编程里的定义是冲突。后者里多路复用IO是同步IO,因为真正的IO还是阻塞或者非阻塞IO。只有aio_read的系统调用才是异步IO。很多使用epoll实现的服务软件也声称是高性能异步框架,这里的“异步”更多是表示服务器并发处理请求的能力,而不是IO基本的“异步”。并发的请求交替处理,从整个服务器角度来看是一种异步行为。另外一些人容易误解,一个原因大概就是想当然“异步”一定要比“同步”性能高,其实也是不对的。真正的异步io未必就比同步的epoll好,而同步epoll在并发量很小的情况下,未必就比poll,select甚至是同步阻塞IO好。
🚫转载。禁止转载的原因不是所谓的版权。从我个人的经验来看,blog的笔记是记录当时总结。随着认知的变换,可能之前的笔记会有错误。我发现了一般都会返回去修正,甚至删掉之前的内容。如果把错误一直留着,很可能会误人子弟。而笔记一旦转载出去,即使我修改原文了,转载之后的也不会被修正,错误还是会被保留。因此我希望不要转载任何blog笔记。
网友评论