美文网首页源码学习程序员
Tornado源码分析手记 —— IOLoop核心实现

Tornado源码分析手记 —— IOLoop核心实现

作者: Cyandev | 来源:发表于2016-01-29 21:45 被阅读769次

    最近开始阅读开源项目的源码了,鉴于一直用Tornado做项目,就从它着手开始吧。

    今天分析的是Tornado的"Killer Technique",哈哈,其实没那么夸张了。我们知道Tornado采用了与Node.js相同的单线程事件驱动模型,那么它就需要一个事件轮询机制,我没有看过Node.js的源码,所以不太清楚它的机制。Tornado在IO层面主要使用了两种解决方案:

    • select
    • epoll
      通过Configurable类实现IOLoop的工厂模式,在*NIX系统下默认采用了epoll的方式。

    接下来我们就逐行进行分析,IOLoop类的代码不会都看,阅读的顺序是从start函数开始。

    if self._running:
      raise RuntimeError("IOLoop is already running")
    self._setup_logging()
    if self._stopped:
      self._stopped = False
      return
    old_current = getattr(IOLoop._current, "instance", None)
    IOLoop._current.instance = self
    self._thread_ident = thread.get_ident()
    self._running = True
    

    这部分其实就是进行了一些状态检测,然后设置日志,设置标志位的值。不多啰嗦了。
    主要来看下面这个大的循环体:

    with self._callback_lock:
      callbacks = self._callbacks
      self._callbacks = []
    

    首先在线程同步锁下将所有的callback获取出来,然后清空原来的callback数组。

    due_timeouts = []
    if self._timeouts:
      now = self.time()
      while self._timeouts:
        if self._timeouts[0].callback is None:
          heapq.heappop(self._timeouts)
          self._cancellations -= 1
        elif self._timeouts[0].deadline <= now:
          due_timeouts.append(heapq.heappop(self._timeouts))
        else:
          break
    if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)):
      self._cancellations = 0
      self._timeouts = [x for x in self._timeouts if x.callback is not None]
      heapq.heapify(self._timeouts)
    

    这部分比较长,主要是处理了延时事件,支持异步的gen.sleep的实现就与这部分有关。

    首先声明空数组due_timeouts来存放被trigger的事件回调,这个timeouts队列比较有意思,它用到了heapq这个包来保证数组内的元素存放顺序是一个堆的结构,这样一来每次取出来的元素都是最小的。然后只需要判断取出的最小元素是不是过时了,如果这个事件被取消了,那么直接pop掉进入下次循环;如果这个事件过时了,就直接加入待执行的due_timeouts中,进入下次循环;如果没有过时,由于这是最小元素,所以它后面的元素肯定也没有过时所以干脆直接跳出循环,接着进行下面的内容。

    接下来是一个优化操作,如果取消的事件多于512个并且大于总数的一半时,就把timeouts进行清理,清理结束后再进行堆排序。

    下面一部分代码就不放了,主要是执行刚才上面准备好的callbacks和timeouts。

    if self._callbacks:
      poll_timeout = 0.0
    elif self._timeouts:
      poll_timeout = self._timeouts[0].deadline - self.time()
      poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
    else:
      poll_timeout = _POLL_TIMEOUT
    

    这里又进行了一次callbacks的检查,如果有需要执行的回调,那么就让poll等待的时间为0,如果有timeouts,就让poll等待的时间为还有多久触发timeout事件的时间,同时这个时间不能超过预设的最长时间。

    接下来进行了一次状态检测,如果IOLoop已经停止,那么跳出循环。

    try:
      event_pairs = self._impl.poll(poll_timeout)
    

    开始等待IO事件了。

    self._events.update(event_pairs)
    while self._events:
      fd, events = self._events.popitem()
      try:
        fd_obj, handler_func = self._handlers[fd]
        handler_func(fd_obj, events)
      except (OSError, IOError) as e:
        if errno_from_exception(e) == errno.EPIPE:
          pass
        else:
          self.handle_callback_exception(self._handlers.get(fd))
       except Exception:
          self.handle_callback_exception(self._handlers.get(fd))
    fd_obj = handler_func = None
    

    这里得到IO事件的触发者,然后得到它的处理函数,并且执行这个函数,然后进行异常处理。

    这就是IOLoop的大致思路,通过IOLoop,我们就可让一个工作变成一组可序列化并且粒度足够小的事件,依次执行。通过select/epoll机制来实现同时对多个socket进行同时处理,避免轮询浪费CPU时间,是效率高的关键因素。

    相关文章

      网友评论

        本文标题:Tornado源码分析手记 —— IOLoop核心实现

        本文链接:https://www.haomeiwen.com/subject/dlijkttx.html