Tornado之ioloop源码学习

作者: throwsterY | 来源:发表于2017-07-26 22:42 被阅读82次

    抽空看了下tornado的源码,会将几个关键部分以专栏文章的形式记录下来,算是对学习的一个汇总,也希望能对使用tornado的朋友有所启迪,文中有任何说的不到位的地方,欢迎私信或者评论指出。

    看开源项目代码时,我会直接选择最原始的版本,tornadoweb/tornado,因为我认为最核心的功能往往在v1.0.0都具备,后续很少有对大功能的改进,而且随着各路开源人士代码的提交,项目的代码风格未必会完全的一致。

    本文会介绍 ioloop.py, 看下文之前,需要你已经了解了Linux的IO模型,如果没有的话推荐看下《Unix的网络编程卷》。

    那么开始吧!

    # Choose a poll implementation. Use epoll if it is available, fall back to
    # select() for non-Linux platforms
    if hasattr(select, "epoll"):
        # Python 2.6+ on Linux
        _poll = select.epoll
    elif hasattr(select, "kqueue"):
        # Python 2.6+ on BSD or Mac
        _poll = _KQueue
    else:
        try:
            # Linux systems with our C module installed
            import epoll
            _poll = _EPoll
        except:
            # All other systems
            import sys
            if "linux" in sys.platform:
                logging.warning("epoll module not found; using select()")
            _poll = _Select
    

    Linux中的epoll模型在不同的平台上有不同的叫法,Linux下叫epoll,mac或者bsd上叫kqueue,它们本质上都是IO复用的一种形式,Python 2.6+的select库中包含了对应的实现,但是2.6以下的版本没有对应的实现,tornado使用C语言模块的实现并简单包装了下,2.6以下版本就用tornado包装的epoll。上述代码就是干这个事情的,根据系统平台和Python的版本选择对应的epoll实现。

    那么对应的epoll实现都包含了哪些功能呢?我们看下其中的_EPOLL,这是其中的一个实现,tornado对底层做了包装。

    class _EPoll(object):
        """An epoll-based event loop using our C module for Python 2.5 systems"""
        _EPOLL_CTL_ADD = 1
        _EPOLL_CTL_DEL = 2
        _EPOLL_CTL_MOD = 3
    
        def __init__(self):
            self._epoll_fd = epoll.epoll_create()
    
        def fileno(self):
            return self._epoll_fd
    
        def register(self, fd, events):
            epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_ADD, fd, events)
    
        def modify(self, fd, events):
            epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_MOD, fd, events)
    
        def unregister(self, fd):
            epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_DEL, fd, 0)
    
        def poll(self, timeout):
            return epoll.epoll_wait(self._epoll_fd, int(timeout * 1000))
    

    我们可以看出,对外提供的就是epoll常规的几个功能,了解过epoll的一看便知。

    接下来我们看下IOLOOP这个类的代码,按照从上到下的顺序:

    # Constants from the epoll module
    _EPOLLIN = 0x001
    _EPOLLPRI = 0x002
    _EPOLLOUT = 0x004
    _EPOLLERR = 0x008
    _EPOLLHUP = 0x010
    _EPOLLRDHUP = 0x2000
    _EPOLLONESHOT = (1 << 30)
    _EPOLLET = (1 << 31)
    
    # Our events map exactly to the epoll events
    NONE = 0
    READ = _EPOLLIN
    WRITE = _EPOLLOUT
    ERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUP
    

    这里定义了epoll中的事件,我们比较关注的是_EPOLLIN和_EPOLLOUT,分别表示我们关心的fd(文件描述符)可写了或者可读了。

    def __init__(self, impl=None):
        self._impl = impl or _poll()
        if hasattr(self._impl, 'fileno'):
            self._set_close_exec(self._impl.fileno())
        self._handlers = {}
        self._events = {}
        self._callbacks = set()
        self._timeouts = []
        self._running = False
        self._stopped = False
        self._blocking_log_threshold = None
    
        # Create a pipe that we send bogus data to when we want to wake
        # the I/O loop when it is idle
        if os.name != 'nt':
            r, w = os.pipe()
            self._set_nonblocking(r)
            self._set_nonblocking(w)
            self._set_close_exec(r)
            self._set_close_exec(w)
            self._waker_reader = os.fdopen(r, "r", 0)
            self._waker_writer = os.fdopen(w, "w", 0)
        else:
            self._waker_reader = self._waker_writer = win32_support.Pipe()
            r = self._waker_writer.reader_fd
        self.add_handler(r, self._read_waker, self.READ)
    

    这里解释两个地方。

    _set_close_exec方法是干嘛的?

    def _set_close_exec(self, fd):
        flags = fcntl.fcntl(fd, fcntl.F_GETFD)
        fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
    

    Linux在fork子进程时使用写时复制的策略,假设原有进程对某个文件持有fd,fork后,子进程也会有相应的fd对应那个文件,但是子进程如果仅仅是被fork来exec的,那么在exec时会有新的上下文及变量,原来持有的那个文件描述符就不见了,设置了这个位,当子进程执行exec的时候,所持有的fd就会被关闭。

    r, w = os.pipe()是在干嘛?

    开启了一个管道,并设置读写均为nonblocking,当read或write被阻塞,将返回-1或者EAGAIN错误。

    设置这个管道为了高效率的让IO Loop停止循环,只要在通道另一侧写点什么,就会阻塞poll()方法。

    @classmethod 
    def instance(cls):
        """Returns a global IOLoop instance.
    
        Most single-threaded applications have a single, global IOLoop.
        Use this method instead of passing around IOLoop instances
        throughout your code.
    
        A common pattern for classes that depend on IOLoops is to use
        a default argument to enable programs with multiple IOLoops
        but not require the argument for simpler applications:
    
            class MyClass(object):
                def __init__(self, io_loop=None):
                    self.io_loop = io_loop or IOLoop.instance()
        """
    if not hasattr(cls, "_instance"):
            cls._instance = cls()
    return cls._instance
    

    instance方法就是实现单例的,不多介绍。

    def add_handler(self, fd, handler, events):
        """Registers the given handler to receive the given events for fd."""
        self._handlers[fd] = handler
    self._impl.register(fd, events | self.ERROR)
    
    def update_handler(self, fd, events):
        """Changes the events we listen for fd."""
        self._impl.modify(fd, events | self.ERROR)
    
    def remove_handler(self, fd):
        """Stop listening for events on fd."""
        self._handlers.pop(fd, None)
        self._events.pop(fd, None)
    try:
        self._impl.unregister(fd)
    except (OSError, IOError):
        logging.debug("Error deleting fd from IOLoop", exc_info=True)
    

    这几个方法,就是负责给指定的fd绑定对应的handler以及监听的事件的。

    def set_blocking_log_threshold(self, s):
        """Logs a stack trace if the ioloop is blocked for more than s seconds.
        Pass None to disable.  Requires python 2.6 on a unixy platform.
        """
        if not hasattr(signal, "setitimer"):
            logging.error("set_blocking_log_threshold requires a signal module "
                       "with the setitimer method")
            return
        self._blocking_log_threshold = s
        if s is not None:
            signal.signal(signal.SIGALRM, self._handle_alarm)
    
    def _handle_alarm(self, signal, frame):
        logging.warning('IOLoop blocked for %f seconds in\n%s',
                     self._blocking_log_threshold,
                     ''.join(traceback.format_stack(frame)))
    

    使用signal模块来监控Ioloop的block时间,超过某个时间就会触发我们自己定义的handler。signal.SIGALRM和signal.ITIMER_REAL一般配合使用。

    加下来就是最重要的start方法,start方法下还有几个小方法,将在这里一并介绍。

    def start(self):
        """Starts the I/O loop.
    
        The loop will run until one of the I/O handlers calls stop(), which
        will make the loop stop after the current event iteration completes.
        """
        if self._stopped:
            self._stopped = False
            return
        self._running = True
        while True:
            # Never use an infinite timeout here - it can stall epoll
            poll_timeout = 0.2
    
            # Prevent IO event starvation by delaying new callbacks
            # to the next iteration of the event loop.
            callbacks = list(self._callbacks)
            for callback in callbacks:
                # A callback can add or remove other callbacks
                if callback in self._callbacks:
                    self._callbacks.remove(callback)
                    self._run_callback(callback)
    
            if self._callbacks:
                poll_timeout = 0.0
    
    
            if self._timeouts:
                now = time.time()
                while self._timeouts and self._timeouts[0].deadline <= now:
                    timeout = self._timeouts.pop(0)
                    self._run_callback(timeout.callback)
                if self._timeouts:
                    milliseconds = self._timeouts[0].deadline - now
                    poll_timeout = min(milliseconds, poll_timeout)
    
            if not self._running:
                break
    
            if self._blocking_log_threshold is not None:
                # clear alarm so it doesn't fire while poll is waiting for
                # events.
                signal.setitimer(signal.ITIMER_REAL, 0, 0)
    
            try:
                event_pairs = self._impl.poll(poll_timeout)
            except Exception, e:
                # Depending on python version and IOLoop implementation,
                # different exception types may be thrown and there are
                # two ways EINTR might be signaled:
                # * e.errno == errno.EINTR
                # * e.args is like (errno.EINTR, 'Interrupted system call')
                if (getattr(e, 'errno') == errno.EINTR or
                    (isinstance(getattr(e, 'args'), tuple) and
                     len(e.args) == 2 and e.args[0] == errno.EINTR)):
                    logging.warning("Interrupted system call", exc_info=1)
                    continue
                else:
                    raise
    
            if self._blocking_log_threshold is not None:
                signal.setitimer(signal.ITIMER_REAL,
                                 self._blocking_log_threshold, 0)
    
            # Pop one fd at a time from the set of pending fds and run
            # its handler. Since that handler may perform actions on
            # other file descriptors, there may be reentrant calls to
            # this IOLoop that update self._events
            self._events.update(event_pairs)
            while self._events:
                fd, events = self._events.popitem()
                try:
                    self._handlers[fd](fd, events)
                except (KeyboardInterrupt, SystemExit):
                    raise
                except (OSError, IOError), e:
                    if e[0] == errno.EPIPE:
                        # Happens when the client closes the connection
                        pass
                    else:
                        logging.error("Exception in I/O handler for fd %d",
                                      fd, exc_info=True)
                except:
                    logging.error("Exception in I/O handler for fd %d",
                                  fd, exc_info=True)
        # reset the stopped flag so another start/stop pair can be issued
        self._stopped = False
        if self._blocking_log_threshold is not None:
            signal.setitimer(signal.ITIMER_REAL, 0, 0)
    

    _callbacks保存了一些函数,这些函数会在下一次IO loop事件循环前被调用,在任何时候任何线程中调用都是安全的,可用于将一些控件传输到ioloop的线程中。

    _timeouts用户保存执行函数和deadline的对应关系,和_callbacks相比,它指定了函数执行时间,而_callback是在下一次Ioloop循环前立刻执行。

    关于poll_timeout时间的设置问题

    =0表示无论有没有就绪时间立刻返回

    我们可以看到默认是0.2,当有_callback可以执行,我们把它设置为0,再看下过多长时间_timeout中有函数可以执行,取最小时间。

    简单概括就是,如果_callback和_timeout都没有方法可以执行,就默认0.2,如果有方法可以执行,默认等待时间就是最快会执行方法到现在的时间间隔。

    剩下 的部分就是用poll函数拿到就绪事件,然后用signal.ITIMER_REAL计时,开始处理,处理时候使用pop方法,而不是遍历,原因是fd和handler的映射关系可能在遍历过程修改,执行完成后,reset _stopped设置为false,关闭计时器。

    关于ioloop的介绍就到这里,不正之处欢迎指出。

    相关文章

      网友评论

      • pyrene:可以啊,就是总体的分析一下就好了
        throwsterY:@pyrene 总体分析是指 tornado 各个核心模块的分析吗

      本文标题:Tornado之ioloop源码学习

      本文链接:https://www.haomeiwen.com/subject/dshakxtx.html