简明网络I/O与并发 --- I/O

作者: 人世间 | 来源:发表于2018-12-12 21:20 被阅读33次

    简明网络I/O与并发 --- I/O
    简明网络I/O与并发 --- 并发

    计算机的基本组成其实很简单,处理器,存储器加上输入输出设备,就能构成计算机。大至超级计算机,小到手机等都是一样的模型。计算的本质就是从输入设备读取数据处理然后输出。可以理解理解计算机做的事情就是IO和计算。

    在网络发明之前,计算机从存储设备中读取数据,进程通过内存的通道进行通信。互联网诞生之后,越来越多计算机通过互联网连接,将数据传输到世界各地。计算机之间可以通信,本质上也是计算机进程相互通信。为了方便不同终端进行通信,网络协议栈抽象出socket层,通过对socket文件描述符的操作来进行网络IO。当然,不同的应用场景,衍生出了不同的网络模型。

    本文的描述发起IO进程,也可以描述为发起IO的线程

    一次网络响应

    互联网应用中,多数架构是CS模式,即client发出请求,server接受请求,处理之后返回响应。这样的一次交互,伴随着client和server的IO操作。对于常见的爬虫,client将尽可能提升其并发发送请求IO的能力,对于角色类似被爬虫对象那些后端server,也需要尽可能提升其并发处理多client请求的能力。

    例如有个用户,发送了一个请求,请求了服务器上的一个文件,这样的一个交互过程如图:

    server-client.jpg

    左边的图使用了 python3 -m http.server 创建了一个监听8000的server,后端的client请求了服务器path为/hello.txt 的文件。具体是如何实现的呢?当server开启服务之后,就会监听来自8000端口请求,client把请求发送给server,server再从自己的磁盘上读取 hello.txt 文件,然后返回给客户端。这样一次简单的交互,涉及了网络IO和磁盘文件的IO。大致流程如下图:

    response-io.jpg

    上图只表述了server处理响应的过程:

    1. server的进程发起Read系统调用,内核随即从硬件Disk读取数据到内核缓冲区(kernel buf)
    2. 内核再把缓冲区的数据copy到应用程序进程的缓冲区,应用程序就可以对数据进行修改。
    3. 应用进程将数据通过系统调用Send发送到socket缓冲区,每个socket文件都在内核维护了一个发送/接受缓冲区。
    4. 最后再把socket发送缓冲区的数据copy到NIC网卡中,通过协议栈发送到对端的网卡中。
    5. 对端的网卡接收数据中,client也会发起一个Recv的系统调用,然后内核会从网卡中读取数据,然后copy到应用程序的缓冲区。

    整个过程,数据在三个主要层次流动,即硬件,内核,应用。在流动的过程中,从一个层流向另一个层即为IO操作。

    DMA Direct Memory Access,直接内存访问方式,即现在的计算机硬件设备,可以独立地直接读写系统内存,而不需CPU完全介入处理。也就是数据从DISK或者NIC从把数据copy到内核buf,不需要计算机cpu的参与,而是通过设备上的芯片(cpu)参与。对于内核来说,这样的数据读取过程中,cpu可以做别的事情。

    一次I/O过程

    通过上面的数据流动,可以看到IO的基本方式,那么什么是IO呢?通常现代的程序软件都运行在内存里,内存又分为用户态和内核态,后者隶属于操作系统。所谓的IO,就是将硬件(磁盘、网卡)的数据读取到程序的内存中。

    因为应用程序很少可以直接和硬件交互,因此操作系统作为两者的桥梁。通常操作系统在对接两端(应用程序与硬件)时,自身有一个内核buf,用于数据的copy中转。

    basic-io.jpg

    应用的读IO操作,即将网卡的数据,copy到应用的进程buf,中途会经过内核的buf。

    1. 应用进行发起read系统调用。
    2. 内核接受应用的请求,如果内核buf有数据,则把数据copy到应用buf中,调用结束。
    3. 如果内核buf中没有数据,会向io模块发送请求,io模块和硬件交互。
      4.当NIC接收到协议栈的数据后, NIC 会通过DMA 技术将数据copy到内核 buf 中
    4. 内核将内核buf的数据copy到应用的buf中,调用结束。

    一般网络IO分为两个阶段,等待数据阶段和拷贝数据阶段。前者是指数据通过协议栈发送到网卡,网卡再通过DMAcopy到内核buf。后者是将内核buf的数据copy到进程buf中。

    数据发送到nic设备的过程由协议栈支持,在操作系统层面实现。对于第二阶段拷贝数据的过程,进程的行为很重要,如果进程阻塞,那么将是同步调用,否则则是异步调用。后面会再说明。

    I/O 基本模型

    《UNIX网络编程》中提到了5中基本的网络I/O模型,主要分为同步和异步I/O:

    1. 阻塞I/O(blocking)
    2. 非阻塞I/O(nonblocking)
    3. 多路复用I/O(multiplexing)
    4. 信号驱动I/O(SIGIO)
    5. 异步I/O(asynchronous)

    好用的是第一种,代码逻辑简单,符合人的思考方式。现实中常用的是第三种,第二种不太好用,第四种也很少,第五种不太成熟。下面针对具体的方式逐一简介。

    阻塞I/O

    前面已经介绍,IO过程分为两个阶段,等待数据准备和数据拷贝过程。这里涉及两个对象,其一是发起IO操作的进程(线程),其二是内核对象。所谓阻塞是指进程在两个阶段都阻塞,即线程挂起,不能做别的事情。

    blocking.jpg

    红色的虚线表示io函数调用过程,加粗的红线表示数据从内核buf拷贝到应用buf的过程,改过程阻塞线程。简书把图片压缩得不要看了

    进程对象发起 Recv操作,这是一个系统调用,然后内核会看内核buf是否有数据,如果没有数据,那么进程将会被挂起,直到内核buf从硬件或者网络读取到数据之后,内核再把数据从内核buf拷贝到进程buf中,然后唤醒发起调用的进程,并且Recv操作将会返回数据。接下来进行可以对进程buf的数据进行处理。

    一个单线程同步阻塞server:

    import socket
    
    address = ('', 5000)
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(address)
    sock.listen(100)
    
    while True:
        conn, addr = sock.accept()
        print('client {} connect'.format(conn.fileno()))
        while True:
            data = conn.recv(1024)
            if not data:
                print('client {} closed'.format(conn.fileno()))
                break
            else:
                print('data is ', data)
                conn.sendall(data)
                conn.close()
                break
        conn.close()
    

    accept 和 recv 两个socket上的函数调用之后,如果没有数据返回,线程将会阻塞,直到有数据到达。

    server的socket套接字有两种,一种是 监听套接字 (sock),它有一个accept方法,该方法的作用就是从已握手的队列中取出一个连接,另外一种是连接套接字(conn),即accept方法返回的socket。

    非阻塞I/O

    线程在blockingIO中,发起了IO调用之后随即被挂起,不能做别的。在nonblockingIO中,如果没有io数据,那么发起的系统调用也会马上返回,会返回一个EWOULDBLOCK错误。函数返回之后,线程没有被挂起,当然是可以继续做别的。

    nonblocking.jpg

    正如图上所示,在真实环境中,进程发起了非阻塞io请求,返回了EWOULDBLOCK之后,将会继续再次发起非阻塞的io请求,这个过程还是会使用CPU,因此也称之为轮询(polling)。当内核有数据的时候,内核将内核buf的数据copy到应用buf的过程还是需要cpu参与,这个过程对于nonblockingio来说,线程仍然是阻塞的。

    非阻塞IO的一个简单应用:

    import socket
    import errno
    
    address = ('', 5000)
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.setblocking(False)
    sock.bind(address)
    sock.listen(100)
    
    while True:
        try:
            conn, addr = sock.accept()
        except socket.error as e:
            if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                continue
            else:
                raise
    
        print('client {} connect'.format(conn.fileno()))
        conn.setblocking(False)
        while True:
            try:
                data = conn.recv(1024)
            except socket.error as e:
                if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                    continue
                else:
                    raise
    
            if not data:
                print('client {} closed'.format(conn.fileno()))
                break
            else:
                print('data is ', data)
                conn.sendall(data)
                conn.close()
                break
        conn.close()
    

    由此可见,当数据尚未准备好的时候,CPU将会在不同的轮询。accept和recv都在循环中。

    多路复用I/O

    阻塞IO会让线程挂起不能做别的,非阻塞IO则提供了新的思路,函数调用之后就返回,可是为了完成IO,需要不同的polling。每次轮询都是一次系统调用。某种程度下,非阻塞IO的性能将还不如阻塞IO。既然需要内核频繁操作,那么就有人想出了新的模型。

    让内核代理去做轮询,然后应用进程只有数据准备了再发起IO操作不就好了吗?的确,多路复用IO就是这样的原理。由内核负责监控应用指定的socket文件描述符,当socket准备好数据(可读,可写,异常)的时候,通知应用进程。准备好数据是一个事件,当事件发生的时候,通知应用进程,而应用进程可以根据事件事先注册回调函数。

    multiplexing.jpg

    进程发起了 select或poll或者epoll调用之后,可以设置阻塞进程。当内核数据准备好的时候通知应用进程,即事件发生。应用进程注册了回调函数,这里是 recv回调函数。因此进程可以再次发起recv系统调用。后面这个过程前面的阻塞非阻塞调用一样。只不过这里通常一定是可以读到数据,非阻塞的方式也不会返回错误。但是整个copy过程,进程还是阻塞的。

    对于单个io请求,这样的做法其实并没有多大优势,甚至还不如阻塞IO。不过多路复用的好处在于多路,即可以同时监听多个socket描述符,当大量描述符可读可写事件发生的时候,更有利于服务器的并发性能。

    多路复用I/O的本质就是 多路监听 + 阻塞/非阻塞IO。多路监听即select,poll,epoll这些系统调用。后面的才是真正的IO,红色的线表示,即前文介绍的阻塞或者非阻塞IO。

    下面是一个poll的例子:

    import functools
    import select
    import socket
    
    
    class Server:
    
        def __init__(self):
            self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self._poll = select.poll()
            self._handlers = {}
            self._fd_events = {}
            self._bytes_received = {}
            self._bytes_to_send = {}
    
        def start(self):
            sock = self._sock
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            sock.setblocking(False)
            sock.bind(('', 5000))
            sock.listen(100)
    
            handlers = self._handlers
            poll = self._poll
            self.add_handler(sock.fileno(), self._accept, select.POLLIN)
    
            while True:
                poll_events = poll.poll(1)
                for fd, event in poll_events:
                    if event & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
                        rb = self._bytes_received.pop(fd, b'')
                        sb = self._bytes_to_send.pop(fd, b'')
                        if rb:
                            print(f'Client {fd} sent {rb} but then closed')
                        elif sb:
                            print(f'Client {fd} closed before we sent {sb}')
                        else:
                            print(f'Client {fd} closed socket normally')
                        self.unregister(fd)
                    else:
                        handler = handlers.get(fd)
                        if handler:
                            handler()
    
        def _accept(self, ):
            while True:
                try:
                    conn, addr = self._sock.accept()
                except OSError:
                    break
                else:
                    conn.setblocking(0)
                    fd = conn.fileno()
                    self.add_handler(fd, functools.partial(self._read, conn), select.POLLIN)
    
        def _read(self, conn):
            fd = conn.fileno()
            more_data = conn.recv(10)
            if not more_data:
                print('client close')
                conn.close()
                return
    
            data = self._bytes_received.pop(conn.fileno(), b'') + more_data
            if data.endswith(b'\r\n\r\n'):
                self._bytes_to_send[conn.fileno()] = data
                self.remove_handler(fd)
                self.add_handler(fd, functools.partial(self._write, conn, data), select.POLLOUT)
            else:
                self._bytes_received[conn.fileno()] = data
    
        def _write(self, conn, data):
            fd = conn.fileno()
            self.remove_handler(fd)
    
            data = self._bytes_to_send.pop(conn.fileno())
            n = conn.send(data)
            if n < len(data):
                self._bytes_to_send[conn.fileno()] = data[n:]
                self.add_handler(fd, functools.partial(self._write, conn, data), select.POLLOUT)
            else:
                conn.close()
    
        def add_handler(self, fd, handler, event):
            self._handlers[fd] = handler
            self.register(fd, event)
    
        def remove_handler(self, fd):
            self._handlers.pop(fd, None)
            self.unregister(fd)
    
        def register(self, fd, event):
            if fd in self._fd_events:
                raise IOError(f"fd {fd} already registered")
            self._poll.register(fd, event)
            self._fd_events[fd] = event
    
        def unregister(self, fd):
            event = self._fd_events.pop(fd, None)
            if event is not None:
                self._poll.unregister(fd)
    
    
    if __name__ == '__main__':
        server = Server()
        server.start()
    

    从代码逻辑来看,poll_events = poll.poll(1) 调用设置超时时间,如果不设置,那么将会阻塞。当描述符数据准备好的时候,poll会返回fd和event的组合。例如监听socket绑定的回调是_accept,而在_accept函数中,又绑定了连接套接字的_read函数,当可读的时候调用了_read进行完成之后,又会绑定其_write函数。

    上述代码每次recv10字节数据,显然没有读完,因此_read函数返回之后,下一次事件循环,poll将还会返回连接套接字可读事件,后者将会再次调用_read函数。直到读取完所有数据。这种编码方式跟同步阻塞的方式相差挺大的,各种回调函数的设置,让这个代码结构不那么顺序化。

    当然 select poll epoll更多时候是配合非阻塞的方式使用。如下图:

    multiplexing-unblocking.jpg

    一般多路复用IO都是配合非阻塞IO使用。因为读写socket的时候,并不确定读到什么时候才能读完。在一个循环里读,如果设置为阻塞模式,那么进程将会被挂起。比较好的做法是设置成非阻塞,一旦读写返回了EWOULDBLOCK,进行yield,然后切换到别的地方。直到下一次事件循环。下面是借助python的yield实现的一个简单的case:

    import collections
    import socket
    import select
    import types
    import errno
    
    
    class Stream:
    
        def __init__(self, sock, loop):
            sock.setblocking(False)
            self._sock = sock
            self._loop = loop
    
        def close(self):
            self._sock.close()
    
        def read(self, size=10):
            while True:
                sock = self._sock
                fd = sock.fileno()
                try:
                    more_data = sock.recv(size)
                    print('more data', self, more_data)
                except OSError as e:
                    if e.args[0] not in (errno.EAGAIN, errno.EWOULDBLOCK):
                        raise
                    else:
                        yield
                else:
                    data = self._loop._bytes_received.get(fd, b'') + more_data
                    if data.endswith(b'\r\n\r\n'):
                        self._loop.remove_handler(fd)
                        return data
                    else:
                        self._loop._bytes_received[fd] = data
                        yield
    
        def write(self, data):
            sock = self._sock
            fd = sock.fileno()
            try:
                while True:
                    try:
                        send_bytes = sock.send(data)
                    except OSError as e:
                        if e.errno not in (socket.EWOULDBLOCK, socket.EAGAIN):
                            raise
                        else:
                            yield
                    else:
                        if send_bytes == len(data):
                            return
                        data = data[send_bytes:]
                    self._loop.add_handler(fd, self.write(data), select.POLLOUT)
                    yield
            finally:
                self._loop.remove_handler(fd)
    
    
    class IOLoop:
    
        def __init__(self):
            self._poll = select.poll()
            self._handlers = {}
            self._fd_events = {}
            self._bytes_received = {}
            self._bytes_to_send = {}
    
        def start(self):
            handlers = self._handlers
            poll = self._poll
    
            while True:
                poll_events = poll.poll(1)
    
                for fd, event in poll_events:
                    # 错误处理
                    if event & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
                        rb = self._bytes_received.pop(fd, b'')
                        sb = self._bytes_to_send.pop(fd, b'')
                        if rb:
                            print(f'Client {fd} sent {rb} but then closed')
                        elif sb:
                            print(f'Client {fd} closed before we sent {sb}')
                        else:
                            print(f'Client {fd} closed socket normally')
                        self.unregister(fd)
                    else:
                        handler = handlers.get(fd)
                        if handler:
                            if callable(handler):
                                handler()
                            else:
                                stack = handler
                                while True:
                                    generator, value = stack[-1]
                                    try:
                                        yield_value = generator.send(value)
                                        if isinstance(yield_value, types.GeneratorType):
                                            stack.append([yield_value, None])
                                        else:
                                            break
                                    except StopIteration as e:
                                        stack.pop()
                                        if stack:
                                            stack[-1][-1] = e.value
                                        else:
                                            break
    
        def add_handler(self, fd, handler, event):
            if isinstance(handler, types.GeneratorType):
                self._handlers[fd] = collections.deque([[handler, None]])
            else:
                self._handlers[fd] = handler
            self.register(fd, event)
    
        def remove_handler(self, fd):
            self._handlers.pop(fd, None)
            self.unregister(fd)
    
        def register(self, fd, event):
            if fd in self._fd_events:
                raise IOError(f'fd {fd} already registered')
            self._poll.register(fd, event)
            self._fd_events[fd] = event
    
        def unregister(self, fd):
            event = self._fd_events.pop(fd, None)
            if event is not None:
                self._poll.unregister(fd)
    
        def modify(self, fd, event):
            self._poll.modify(fd, event)
            self._fd_events[fd] = event
    
    
    class Server:
    
        def __init__(self):
    
            self._sock = socket.socket()
            self._loop = IOLoop()
            self._stream = Stream(self._sock, self._loop)
    
        def start(self):
            sock = self._sock
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            sock.setblocking(True)
            sock.bind(('', 5000))
            sock.listen(100)
    
            self._loop.add_handler(sock.fileno(), self._accept, select.POLLIN)
            self._loop.start()
    
        def _accept(self):
            while True:
                try:
                    conn, addr = self._sock.accept()
                except OSError:
                    break
                else:
                    stream = Stream(conn, self._loop)
                    fd = conn.fileno()
                    self._loop.add_handler(fd, self._handle(stream), select.POLLIN)
    
        def _handle(self, stream):
            data = yield stream.read()
            yield stream.write(data)
    
    
    if __name__ == '__main__':
        s = Server()
        s.start()
    
    

    在 read 方法中,如果 recv 返回了EWOULDBLOCK,那么进程将会yield,然后跳转到别的逻辑进行执行,直到下一次事件循环可读的时候,再从yield挂起除继续执行,继续调用recv。与前一个例子相比,后者在read的代码逻辑和阻塞式IO更像。当然,借助yield黑魔法,还可以将异步回调代码写得跟同步一样,这就是协程的编码方式。python3.5+则通过语言层面实现了协程的支持。

    多路复用IO几乎成为了主流的server方式。尤其是epoll,成为了Nginx,Redis, Tornado等软件高性能的基石。

    信号IO

    让内核在描述符就绪时发送SIGIO信号通知进程。这种模型为信号驱动式I/O(signal-driven I/O),和事件驱动类似,也是一种回调方式。与非阻塞方式不一样的地方是,发起了信号驱动的系统调用,进程没有挂起,可以做的事情,可是实际中,代码逻辑通常还是主循环,主循环里可能还是会阻塞。因此使用这样的IO的软件很少。

    可是当信号返回可以读写的时候,因为还需要cpu将内核数据copy到应用buf。这个过程毫无疑问还是阻塞的。

    signal.jpg

    异步I/O

    前面一直强调,内核在copy数据从内核buf到应用buf的过程中,cpu需要参与,进程都会被阻塞。因此可以理解,进程和内核的步调是一致,也就是同步。这样的IO模型统称之为同步I/O。那么什么是异步I/O呢?

    Unix下的异步I/O模型如下:

    async.jpg

    图中的IO调用函数的红线只出现在第一步中。

    即无论是第一阶段数据准备还是第二阶段数据拷贝,发起系统调用的进程都不会被阻塞。在第二阶段的过程中,进程没有阻塞,那么可以抢占CPU,而内核copy数据的时候,也需要CPU,这就造成了应用和内核进行CPU竞争,并且步调不一致了。某些情况下,其性能反而不如其他IO模式。使用的人也很少。

    并发

    前面介绍的IO模型,列举了server使用的几个方式。从代码结构来看,这样的server都是单进程的。现实中为了实现并发技术,有的程序也会借助多线程多进程方式。关于更多的服务器并发处理模型,已经如何使用poll/epoll将会在后面的文档中介绍。

    多路复用IO不仅配合非阻塞IO使用,很多时候也配合单进程单线程使用协程的方式,避免的线程进程的上下文切换带来的性能折损。

    总结

    随着服务产品用户流量和用户群扩大,文中梳理了unix常见的几种网络IO,并且针对这些IO进行了简单的介绍和简要的使用说明。

    首先说了什么是IO,即 应用内存--内核缓存--硬件数据 三者之间的数据流动。三者正好组成了两个阶段,以读为例子,数据从硬件到内核buf过程为数据准备状态,由内核拷贝到应用为数据复制阶段。

    在进程发起IO请求时,在第一阶段数据等待时是否挂起分为阻塞和非阻塞

    数据等待阶段

    • 阻塞:进程挂起
    • 非阻塞:进程不挂起,立即返回,返回EWOULDBLOCK

    在第二个阶段数据拷贝过程时,发起IO请求的进程是否阻塞。
    数据拷贝阶段:

    • 阻塞进程:同步,内核拷贝数据占用CPU
    • 不阻塞进程:异步,进程可能和内核竞争CPU

    这些组合就是常见的名词,同步阻塞,同步非阻塞,异步非阻塞。其实真实的软件世界里,阻塞式IO基本都是同步的,异步也都是非阻塞。

    同步非阻塞就是十分常见的多路复用结合非阻塞IO实现的方案,也称之为事件驱动。同步有利于逻辑的书写,非阻塞有利于调用率实现并发。因此现实中更多的IO模型是多路复用IO,并且在发展过程中,select,poll和epoll是逐步进化链。epoll实现了内核级数据结构优化,在实际性能上又了很大的提升。这些都会在后面的文档介绍。

    互联网应用出现实现了很多杀手级的产品和巨型公司。这些产品因为使用的人多而带来了很多技术革新,十年前C10K还是无需考虑的问题,如今很多应用都面临C10K,C100K,甚至是C1000K的挑战。并发量越来越来,服务IO的并发模型也有多种实现,对接下将讨论的并发模型。本文对基本IO的介绍,有利于了解当前不同高性能服务器的技术选型和服务原理。

    后记:

    网络上很多文字解释同步异步,阻塞非阻塞的概念,也有很多生动的类比。曾经我也试图使用生活中的例子类类比网络IO,虽然对于初次接入者有一定的友好性,现在看来这些类比其实是本末倒置,南辕北辙。现实中很少有完全匹配的case,因此为了类比反而会对其基本原理含糊其辞。最好的认知方式就是代码。

    很多人将多路复用I/O归结为异步IO,与《Unix网络编程》编程里的定义是冲突。后者里多路复用IO是同步IO,因为真正的IO还是阻塞或者非阻塞IO。只有aio_read的系统调用才是异步IO。很多使用epoll实现的服务软件也声称是高性能异步框架,这里的“异步”更多是表示服务器并发处理请求的能力,而不是IO基本的“异步”。并发的请求交替处理,从整个服务器角度来看是一种异步行为。另外一些人容易误解,一个原因大概就是想当然“异步”一定要比“同步”性能高,其实也是不对的。真正的异步io未必就比同步的epoll好,而同步epoll在并发量很小的情况下,未必就比poll,select甚至是同步阻塞IO好。

    🚫转载。禁止转载的原因不是所谓的版权。从我个人的经验来看,blog的笔记是记录当时总结。随着认知的变换,可能之前的笔记会有错误。我发现了一般都会返回去修正,甚至删掉之前的内容。如果把错误一直留着,很可能会误人子弟。而笔记一旦转载出去,即使我修改原文了,转载之后的也不会被修正,错误还是会被保留。因此我希望不要转载任何blog笔记。

    相关文章

      网友评论

        本文标题:简明网络I/O与并发 --- I/O

        本文链接:https://www.haomeiwen.com/subject/sttmhqtx.html