最近开始阅读开源项目的源码了,鉴于一直用Tornado做项目,就从它着手开始吧。
今天分析的是Tornado的"Killer Technique",哈哈,其实没那么夸张了。我们知道Tornado采用了与Node.js相同的单线程事件驱动模型,那么它就需要一个事件轮询机制,我没有看过Node.js的源码,所以不太清楚它的机制。Tornado在IO层面主要使用了两种解决方案:
- select
- epoll
通过Configurable
类实现IOLoop
的工厂模式,在*NIX系统下默认采用了epoll的方式。
接下来我们就逐行进行分析,IOLoop
类的代码不会都看,阅读的顺序是从start
函数开始。
if self._running:
raise RuntimeError("IOLoop is already running")
self._setup_logging()
if self._stopped:
self._stopped = False
return
old_current = getattr(IOLoop._current, "instance", None)
IOLoop._current.instance = self
self._thread_ident = thread.get_ident()
self._running = True
这部分其实就是进行了一些状态检测,然后设置日志,设置标志位的值。不多啰嗦了。
主要来看下面这个大的循环体:
with self._callback_lock:
callbacks = self._callbacks
self._callbacks = []
首先在线程同步锁下将所有的callback获取出来,然后清空原来的callback数组。
due_timeouts = []
if self._timeouts:
now = self.time()
while self._timeouts:
if self._timeouts[0].callback is None:
heapq.heappop(self._timeouts)
self._cancellations -= 1
elif self._timeouts[0].deadline <= now:
due_timeouts.append(heapq.heappop(self._timeouts))
else:
break
if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)):
self._cancellations = 0
self._timeouts = [x for x in self._timeouts if x.callback is not None]
heapq.heapify(self._timeouts)
这部分比较长,主要是处理了延时事件,支持异步的gen.sleep
的实现就与这部分有关。
首先声明空数组due_timeouts
来存放被trigger的事件回调,这个timeouts队列比较有意思,它用到了heapq
这个包来保证数组内的元素存放顺序是一个堆的结构,这样一来每次取出来的元素都是最小的。然后只需要判断取出的最小元素是不是过时了,如果这个事件被取消了,那么直接pop掉进入下次循环;如果这个事件过时了,就直接加入待执行的due_timeouts
中,进入下次循环;如果没有过时,由于这是最小元素,所以它后面的元素肯定也没有过时所以干脆直接跳出循环,接着进行下面的内容。
接下来是一个优化操作,如果取消的事件多于512个并且大于总数的一半时,就把timeouts进行清理,清理结束后再进行堆排序。
下面一部分代码就不放了,主要是执行刚才上面准备好的callbacks和timeouts。
if self._callbacks:
poll_timeout = 0.0
elif self._timeouts:
poll_timeout = self._timeouts[0].deadline - self.time()
poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
else:
poll_timeout = _POLL_TIMEOUT
这里又进行了一次callbacks的检查,如果有需要执行的回调,那么就让poll等待的时间为0,如果有timeouts,就让poll等待的时间为还有多久触发timeout事件的时间,同时这个时间不能超过预设的最长时间。
接下来进行了一次状态检测,如果IOLoop
已经停止,那么跳出循环。
try:
event_pairs = self._impl.poll(poll_timeout)
开始等待IO事件了。
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
fd_obj, handler_func = self._handlers[fd]
handler_func(fd_obj, events)
except (OSError, IOError) as e:
if errno_from_exception(e) == errno.EPIPE:
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func = None
这里得到IO事件的触发者,然后得到它的处理函数,并且执行这个函数,然后进行异常处理。
这就是IOLoop的大致思路,通过IOLoop,我们就可让一个工作变成一组可序列化并且粒度足够小的事件,依次执行。通过select/epoll机制来实现同时对多个socket进行同时处理,避免轮询浪费CPU时间,是效率高的关键因素。
网友评论