美文网首页
漫谈I/O多路复用与其在SS中的应用

漫谈I/O多路复用与其在SS中的应用

作者: LittleMagic | 来源:发表于2020-07-26 23:26 被阅读0次

    I/O多路复用概述

    我们知道,Unix-like系统中一共有5种I/O方式:

    • 阻塞I/O(Blocking I/O);
    • 非阻塞I/O(Non-blocking I/O);
    • I/O多路复用(I/O Multiplexing);
    • 信号驱动I/O(Signal-driven I/O);
    • 异步I/O(Asynchronous I/O)。

    限于篇幅,本文只讲I/O多路复用,不展开说其他每种I/O方式的细节。看官如果想全面了解,可以参考W. Richard Stevens等人所著的《UNIX网络编程卷1:套接字联网API(第3版)》。

    我们先对I/O多路复用下一个(自己的)定义:

    所谓I/O多路复用,就是指单个线程可以感知到多个I/O流的状态。当I/O流就绪时,触发执行相应的操作。

    也就是说,“多路”的是I/O流,“复用”的是线程。由于在Linux系统中一切皆文件,因此“I/O流”这个词就能理解为文件描述符(file descriptor, fd)。I/O多路复用的根本目的,是使得应用能够处理更多的并发,提高服务器的吞吐量。根据应用场景的不同,I/O多路复用有时也被称作事件驱动模型(Event-driven model),比如Redis里基于I/O多路复用自行实现的事件驱动库ae

    下图是《Unix网络编程》书中给出的I/O多路复用流程图,以select()和UDP的recvfrom()系统调用为例。

    在该示例中,客户端程序会首先调用I/O多路复用的系统调用select(),如果所有Socket里都没有数据报,该调用就会阻塞。一旦某个Socket有数据报准备好,select()就会返回可读,然后就调用recvfrom()将数据报中的数据从内核空间复制到用户空间。

    下图示出5种I/O模型的对比。I/O多路复用(中间的那个)本质上仍然是一种同步操作,但是与阻塞I/O相比,效率更高,不必一直“干等着”。而与非阻塞I/O相比,CPU也不必持续地检查流是否就绪,节省了很多CPU时间。

    I/O多路复用的实现

    在Linux系统中,存在有3种典型的I/O多路复用实现,即select、poll和epoll,它们的出现是有先后的。下面分别来看个大概,之后有时间的话,再根据内核源码来分析它们。

    select

    select方式早在上世纪80年代就已经出现了,在上一节图中出现过的select()系统调用的签名如下所示。

    int select(
        int nfds, 
        fd_set *restrict readfds, 
        fd_set *restrict writefds, 
        fd_set *restrict errorfds,
        struct timeval *restrict timeout);
    

    其中,readfds、writefds、errorfds是三个fd_set,即文件描述符的集合,分别代表读取、写入和异常的I/O流集合。nfds则表示检查[0, nfds - 1]这个范围内的fd,所以其值不应该是fd的总个数,而是最大的fd值+1。timeout表示阻塞超时,为0代表立即返回,为NULL则代表一直阻塞直到有fd就绪。

    select()系统调用的大致执行流程是:

    1. 将各个fd_set从用户空间复制到内核空间;
    2. 遍历[0, nfds - 1]范围内的每个fd,调用fd的poll()函数,检查其对应设备中是否有可用的流;
    3. 如果有流就绪,根据类型,将其加入对应的fd_set。否则就按照timeout设定阻塞当前线程,直到有流就绪或等待超时;
    4. select()调用返回可用的fd个数,并将各个fd_set从内核空间复制回用户空间。

    select方式的实现比较简单,并且跨平台性非常好。当然其缺点也比较明显:

    • fd的最大值太小,一般为1024,由FD_SETSIZE宏来指定。
    • 每次调用都需要线性轮询每个描述符,高并发情况下开销比较大。
    • fd_set从用户空间到内核空间来回复制,没有必要。

    poll

    poll()系统调用的签名如下所示。

    int poll(struct pollfd fds[], nfds_t nfds, int timeout);
    

    实际上,poll方式与select方式的实现几乎是相同的,不过它用pollfd结构替代了上面的fd_set结构而已。另外,它改用链表实现fd的存储,因此消除了select方式中fd的最大值限制,但其他方面没有明显的改善。

    epoll

    epoll直到Linux 2.6版本的内核才出现,它是对select/poll真正意义上的改进。它提供了3个系统调用。

    int epoll_create(int size);
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
    

    epoll_create()函数的作用是创建一个epoll句柄(可以理解为一个epoll专用的fd),size参数指定需要检查多少个fd。

    epoll_ctl()函数则是向句柄epfd注册、删除或修改需要监听的fd及事件event。epoll用epoll_event结构来描述事件类型与数据,在内核缓存中用红黑树保存,epfd作为根。另外,还会给中断处理程序注册一个回调,使得内核在句柄的中断到来时,标记fd为就绪。

    epoll_wait()函数等待注册在句柄epfd上的事件发生,发生之后,就将事件和已就绪的fd放入event数组中,或者阻塞直到timeout超时。epoll使用链表来维护已经就绪的事件和fd,因此链表中有无数据就可以代表有无事件发生。

    epoll相对于select的优点如下:

    • 没有fd数的限制;
    • 不采用轮询的方式检测fd是否可用,而是在事件触发后采用类似callback的机制通知,由O(n)变为O(1);
    • 利用mmap(在关于零拷贝的文章中讲过)映射内存空间,减少复制和修改的开销。

    epoll的事件有两种触发方式,即水平触发(level triggering, LT)和边缘触发(edge triggering, ET)。这是源自电子学的术语,下面两个图分别示出高电位触发和上升沿触发。写到这里,还是感谢一下我邮吧。

    epoll中的水平触发是默认工作方式。当内核通知一个fd已经就绪时,程序就可以进行I/O操作了。但是如果本次不处理该fd,当下一次调用epoll_wait()时,内核仍然会再次通知,直到该fd被处理为止。水平触发也是select和poll采用的工作方式。

    边缘触发则是epoll特有的工作方式,当fd在事件的发生的当时从未就绪变为已就绪的状态,内核会通知该fd的状态变化,并假定程序已经感知到了这种变化。如果未处理该fd,在下一次调用epoll_wait()时,内核也不会再通知了。

    由此可见,水平触发与边缘触发各有各的好处。水平触发能够保证数据的完整性,但是仍然存在内核空间到用户空间的拷贝。边缘触发由于只需通知一次,大大减少了内核的资源占用,但同时也不再保证数据完整,需要程序做额外的处理。

    SS中的I/O多路复用

    SS到底是什么东西?不知道哇(

    其实,I/O多路复用在其他组件(比如Java NIO、Redis、Nginx)中有更广泛也更典型的应用,但是它们也被其他大佬讲过很多次了。回想起之前每次梯子倒掉的时候都免不了一番折腾,于是就拿SS源码来随便说说吧。

    下图示出SS的基本工作原理,通俗易懂,不多讲了。

    早期的SS Server在处理SS Local端发来的加密的请求时,采用的是多线程+阻塞I/O方式,每建立一个连接就新起一个线程来处理,在并发量大的情况下,过多的线程会造成性能问题。在目前较新版本的SS中,已经采用Python提供的I/O多路复用库select来解决,它就是Linux系统中I/O多路复用的统一封装。

    下面来看看原版SS 2.8.2版本中EventLoop类的源码。顾名思义,它也是事件驱动的。

    class EventLoop(object):
        def __init__(self):
            if hasattr(select, 'epoll'):
                self._impl = select.epoll()
                model = 'epoll'
            elif hasattr(select, 'kqueue'):
                self._impl = KqueueLoop()
                model = 'kqueue'
            elif hasattr(select, 'select'):
                self._impl = SelectLoop()
                model = 'select'
            else:
                raise Exception('can not find any available functions in select '
                                'package')
            self._fdmap = {}  # (f, handler)
            self._last_time = time.time()
            self._periodic_callbacks = []
            self._stopping = False
            logging.debug('using event model: %s', model)
    
        def poll(self, timeout=None):
            events = self._impl.poll(timeout)
            return [(self._fdmap[fd][0], fd, event) for fd, event in events]
    
        def add(self, f, mode, handler):
            fd = f.fileno()
            self._fdmap[fd] = (f, handler)
            self._impl.register(fd, mode)
    
        def remove(self, f):
            fd = f.fileno()
            del self._fdmap[fd]
            self._impl.unregister(fd)
    
        def add_periodic(self, callback):
            self._periodic_callbacks.append(callback)
    
        def remove_periodic(self, callback):
            self._periodic_callbacks.remove(callback)
    
        def modify(self, f, mode):
            fd = f.fileno()
            self._impl.modify(fd, mode)
    
        def stop(self):
            self._stopping = True
    
        def run(self):
            events = []
            while not self._stopping:
                asap = False
                try:
                    events = self.poll(TIMEOUT_PRECISION)
                except (OSError, IOError) as e:
                    if errno_from_exception(e) in (errno.EPIPE, errno.EINTR):
                        asap = True
                        logging.debug('poll:%s', e)
                    else:
                        logging.error('poll:%s', e)
                        import traceback
                        traceback.print_exc()
                        continue
    
                for sock, fd, event in events:
                    handler = self._fdmap.get(fd, None)
                    if handler is not None:
                        handler = handler[1]
                        try:
                            handler.handle_event(sock, fd, event)
                        except (OSError, IOError) as e:
                            shell.print_exception(e)
                now = time.time()
                if asap or now - self._last_time >= TIMEOUT_PRECISION:
                    for callback in self._periodic_callbacks:
                        callback()
                    self._last_time = now
    
        def __del__(self):
            self._impl.close()
    

    从构造方法可知,SS支持select库提供的3种I/O多路复用实现:select、epoll和kqueue(相当于epoll在BSD中的实现)。其中select和kqueue都单独封装了对应的实现类,epoll没有,我们就以epoll为例来简单看看。

    EventLoop类中采用一个字典_fdmap保存fd与其事件处理逻辑的映射,即[fd, (f, handler)]。其add()、remove()、modify()方法则是代理了epoll.register()/unregister()/modify()方法,对应的系统调用是epoll_ctl()。poll()方法代理了epoll.poll()方法,对应的系统调用是epoll_wait(),可以设定超时。

    除了正常的事件处理逻辑之外,它也支持周期性的回调逻辑,保存在_periodic_callbacks结构中,用add_periodic()和remove_periodic()方法可以添加或删除。

    EventLoop的核心方法就是run()方法,它是一个无限循环,执行以下操作:

    1. 调用poll()方法等待事件的触发。
    2. 如果发生EPIPE异常,说明fd不可操作,连接可能已经关闭;如果发生EINTR异常,说明系统调用被信号中断。这两种情况都需要将asap标志位设为True,立即回调。
    3. poll()方法返回,从其返回的就绪列表中,取得fd对应的处理逻辑handler,并调用handle_event()方法进行处理。
    4. 如果设置了asap标志位,或者回调周期已到,就调用_periodic_callbacks中的各个回调函数。

    在SS Server的源码server.py中,初始化EventLoop的方法如下。

        def run_server():
            # 略去...
            try:
                loop = eventloop.EventLoop()
                dns_resolver.add_to_loop(loop)
                list(map(lambda s: s.add_to_loop(loop), tcp_servers + udp_servers))
    
                daemon.set_user(config.get('user', None))
                loop.run()
            except Exception as e:
                shell.print_exception(e)
                sys.exit(1)
    

    可见,在创建了EventLoop对象之后,会将早先创建好的异步DNS解析器dns_resolver、TCP转发tcp_servers、UDP转发udp_servers绑定到EventLoop,这三个组件都围绕EventLoop实现了事件驱动逻辑。来看看TCPRelay.add_to_loop()方法。

        def add_to_loop(self, loop):
            if self._eventloop:
                raise Exception('already add to loop')
            if self._closed:
                raise Exception('already closed')
            self._eventloop = loop
            self._eventloop.add(self._server_socket,
                                eventloop.POLL_IN | eventloop.POLL_ERR, self)
            self._eventloop.add_periodic(self.handle_periodic)
    

    其中,POLL_IN对应EPOLLIN宏,表示fd上有数据可读的事件,POLL_ERR则对应EPOLLERR宏,表示fd上有错误发生。同理,POLL_OUT则表示fd上有数据可写。

    事件处理器也由分别的类来实现,例如TCPRelayHandler.handleEvent()方法。

        def handle_event(self, sock, event):
            if self._stage == STAGE_DESTROYED:
                logging.debug('ignore handle_event: destroyed')
                return
            if sock == self._remote_sock:
                if event & eventloop.POLL_ERR:
                    self._on_remote_error()
                    if self._stage == STAGE_DESTROYED:
                        return
                if event & (eventloop.POLL_IN | eventloop.POLL_HUP):
                    self._on_remote_read()
                    if self._stage == STAGE_DESTROYED:
                        return
                if event & eventloop.POLL_OUT:
                    self._on_remote_write()
            elif sock == self._local_sock:
                if event & eventloop.POLL_ERR:
                    self._on_local_error()
                    if self._stage == STAGE_DESTROYED:
                        return
                if event & (eventloop.POLL_IN | eventloop.POLL_HUP):
                    self._on_local_read()
                    if self._stage == STAGE_DESTROYED:
                        return
                if event & eventloop.POLL_OUT:
                    self._on_local_write()
            else:
                logging.warn('unknown socket')
    

    可见,该方法先检查传入的sock是远程sock还是本地sock,然后通过位运算检查对应的epoll事件类型,进而调用远程或本地的读写或异常处理方法。

    最后,调用EventLoop.run()方法启动之,SS Server就可以开始处理SS Local发来的请求了。

    The End

    明天早起搬砖,晚安吧各位。

    相关文章

      网友评论

          本文标题:漫谈I/O多路复用与其在SS中的应用

          本文链接:https://www.haomeiwen.com/subject/ehsnlktx.html