Tornado是一个高性能非阻塞的web框架,利用非阻塞和epoll,每秒可以处理数以千计的连接。Torando框架的源码代码简洁,实现精巧,结构清晰,demo丰富,适合进一步阅读研究。
Tornado框架包括了底层IO的实现,TCP传输控制的实现,HTTP层的实现,以及web相关的静态文件渲染,auth,路由等功能,还有一些测试代码和平台相关的代码。本文从一个简单的例子出发,阅读相关的tornado源码。
说明:
- 本文python版本2.7.12,tornado版本4.5.3
- 在展示类中的函数时,本文中只展示相关的函数。
1. 一个简单的例子
from tornado import ioloop, web, httpserver
class MainHandler(web.RequestHandler):
def get(self):
self.write("Hello, world")
app = web.Application([
(r"/index", MainHandler),
])
if __name__ == "__main__":
server = httpserver.HTTPServer(app)
server.bind(8888)
server.start()
ioloop.IOLoop.current().start()
上述代码首先初始化了一个应用(Application)。Application实现的是路由功能,将请求根据路径,host等属性与相关的处理函数匹配起来。然后用一个Httpserver去实现这个应用。最后启动ioloop,进行网络io的操作。因为ioloop是单例的,所以,启动ioloop时无须指定HTTPServer。
这里有几点值得注意的:
- 可以省略HTTPServer这一步,使用app.listen或server.listen会自动创建一个HTTPServer比如:
app.listen(8888)
ioloop.IOLoop.current().start()
或者
server = httpserver.HTTPServer(app)
server.listen(8888)
ioloop.IOLoop.current().start()
- HTTPServer可以以多进程的方式启动,通过start参数指定子进程个数,None或<=0会默认按照cpu核数启动相应个数的子进程,;不填或等于1的话,就以单进程启动(强迫症晚期的我不禁要问,为啥不能起一个子进程呢?);>1会启动相应个数的子进程。。例如:
server.start(4) # 启动1个父进程,4个子进程
- 突发奇想, 能否同时监听多个端口呢?经过尝试,发现仅当单进程模式时,可以启动两个server,且两个server都有效。具体原因不知。
server1 = httpserver.HTTPServer(app)
server1.bind(8888)
server1.start()
server2 = httpserver.HTTPServer(app)
server2.bind(9888)
server2.start()
ioloop.IOLoop.current().start()
3. server = httpserver.HTTPServer(app)
这行代码是启动了一个HTTPServer。HTTPServer这个类继承了TCPServer, Configurable,
HTTPServerConnectionDelegate这三个类。其中Configurable类可以认为是一个抽象类,利用new函数实现了工厂方法,可以根据不同配置,创建不同类型的对象。同时这里HTTPServer继承了多个父类,从而引出了钻石继承的问题。
- HTTPServer是一个新式类,因此钻石继承的方法解析顺序(MRO)使用广度有限搜索。
- __new__函数是构造对象,__init__是初始化,因此__new__先于__init__被调用。而initialize在__new__中被调用,因此initialize也先于__init__被调用。
可以阅读一下Configurable的__new__函数。
class Configurable(object):
"""Base class for configurable interfaces.
A configurable interface is an (abstract) class whose constructor
acts as a factory function for one of its implementation subclasses.
The implementation subclass as well as optional keyword arguments to
its initializer can be set globally at runtime with `configure`.
By using the constructor as the factory method, the interface
looks like a normal class, `isinstance` works as usual, etc. This
pattern is most useful when the choice of implementation is likely
to be a global decision (e.g. when `~select.epoll` is available,
always use it instead of `~select.select`), or when a
previously-monolithic class has been split into specialized
subclasses.
Configurable subclasses must define the class methods
`configurable_base` and `configurable_default`, and use the instance
method `initialize` instead of ``__init__``.
"""
def __new__(cls, *args, **kwargs):
base = cls.configurable_base()
init_kwargs = {}
if cls is base:
impl = cls.configured_class()
if base.__impl_kwargs:
init_kwargs.update(base.__impl_kwargs)
else:
impl = cls
init_kwargs.update(kwargs)
instance = super(Configurable, cls).__new__(impl)
# initialize vs __init__ chosen for compatibility with AsyncHTTPClient
# singleton magic. If we get rid of that we can switch to __init__
# here too.
instance.initialize(*args, **init_kwargs)
return instance
以下是HTTPServer的代码,HTTPServer是一个非阻塞,单线程的http server。__init__函数是必须的,虽然没有任何有效代码,但是阻止了父类__init__函数被调用。
class HTTPServer(TCPServer, Configurable,
httputil.HTTPServerConnectionDelegate):
def __init__(self, *args, **kwargs):
# Ignore args to __init__; real initialization belongs in
# initialize since we're Configurable. (there's something
# weird in initialization order between this class,
# Configurable, and TCPServer so we can't leave __init__ out
# completely)
pass
def initialize(self, request_callback, no_keep_alive=False, io_loop=None,
xheaders=False, ssl_options=None, protocol=None,
decompress_request=False,
chunk_size=None, max_header_size=None,
idle_connection_timeout=None, body_timeout=None,
max_body_size=None, max_buffer_size=None,
trusted_downstream=None):
self.request_callback = request_callback
self.no_keep_alive = no_keep_alive
self.xheaders = xheaders
self.protocol = protocol
self.conn_params = HTTP1ConnectionParameters(
decompress=decompress_request,
chunk_size=chunk_size,
max_header_size=max_header_size,
header_timeout=idle_connection_timeout or 3600,
max_body_size=max_body_size,
body_timeout=body_timeout,
no_keep_alive=no_keep_alive)
TCPServer.__init__(self, io_loop=io_loop, ssl_options=ssl_options,
max_buffer_size=max_buffer_size,
read_chunk_size=chunk_size)
self._connections = set()
self.trusted_downstream = trusted_downstream
其中,考虑到MRO顺序,初始化TCPServer的代码也可以用super写,但是不够直观:
super(HTTPServer, self).__init__(io_loop=io_loop, ssl_options=ssl_options,
max_buffer_size=max_buffer_size,
read_chunk_size=chunk_size)
2. server.bind(8888); server.start()
class TCPServer(object):
def add_sockets(self, sockets):
"""Makes this server start accepting connections on the given sockets.
The ``sockets`` parameter is a list of socket objects such as
those returned by `~tornado.netutil.bind_sockets`.
`add_sockets` is typically used in combination with that
method and `tornado.process.fork_processes` to provide greater
control over the initialization of a multi-process server.
"""
if self.io_loop is None:
self.io_loop = IOLoop.current()
for sock in sockets:
self._sockets[sock.fileno()] = sock
add_accept_handler(sock, self._handle_connection,
io_loop=self.io_loop)
def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=128,
reuse_port=False):
"""Binds this server to the given port on the given address.
To start the server, call `start`. If you want to run this server
in a single process, you can call `listen` as a shortcut to the
sequence of `bind` and `start` calls.
Address may be either an IP address or hostname. If it's a hostname,
the server will listen on all IP addresses associated with the
name. Address may be an empty string or None to listen on all
available interfaces. Family may be set to either `socket.AF_INET`
or `socket.AF_INET6` to restrict to IPv4 or IPv6 addresses, otherwise
both will be used if available.
The ``backlog`` argument has the same meaning as for
`socket.listen <socket.socket.listen>`. The ``reuse_port`` argument
has the same meaning as for `.bind_sockets`.
This method may be called multiple times prior to `start` to listen
on multiple ports or interfaces.
.. versionchanged:: 4.4
Added the ``reuse_port`` argument.
"""
sockets = bind_sockets(port, address=address, family=family,
backlog=backlog, reuse_port=reuse_port)
if self._started:
self.add_sockets(sockets)
else:
self._pending_sockets.extend(sockets)
def start(self, num_processes=1):
"""Starts this server in the `.IOLoop`.
By default, we run the server in this process and do not fork any
additional child process.
If num_processes is ``None`` or <= 0, we detect the number of cores
available on this machine and fork that number of child
processes. If num_processes is given and > 1, we fork that
specific number of sub-processes.
Since we use processes and not threads, there is no shared memory
between any server code.
Note that multiple processes are not compatible with the autoreload
module (or the ``autoreload=True`` option to `tornado.web.Application`
which defaults to True when ``debug=True``).
When using multiple processes, no IOLoops can be created or
referenced until after the call to ``TCPServer.start(n)``.
"""
assert not self._started
self._started = True
if num_processes != 1:
process.fork_processes(num_processes)
sockets = self._pending_sockets
self._pending_sockets = []
self.add_sockets(sockets)
当有需要监听的IP包含多个实际的IP时(例如0.0.0.0),bind_sockets可能会返回多个socket对象。
3. ioloop.IOLoop.current().start()
IOLoop是底层处理IO事件的库。核心类是IOLoop。IOLoop是单例模式的。
class IOLoop(Configurable):
@staticmethod
def instance():
"""Returns a global `IOLoop` instance.
Most applications have a single, global `IOLoop` running on the
main thread. Use this method to get this instance from
another thread. In most other cases, it is better to use `current()`
to get the current thread's `IOLoop`.
"""
if not hasattr(IOLoop, "_instance"):
with IOLoop._instance_lock:
if not hasattr(IOLoop, "_instance"):
# New instance after double check
IOLoop._instance = IOLoop()
return IOLoop._instance
@staticmethod
def current(instance=True):
"""Returns the current thread's `IOLoop`.
If an `IOLoop` is currently running or has been marked as
current by `make_current`, returns that instance. If there is
no current `IOLoop`, returns `IOLoop.instance()` (i.e. the
main thread's `IOLoop`, creating one if necessary) if ``instance``
is true.
In general you should use `IOLoop.current` as the default when
constructing an asynchronous object, and use `IOLoop.instance`
when you mean to communicate to the main thread from a different
one.
.. versionchanged:: 4.1
Added ``instance`` argument to control the fallback to
`IOLoop.instance()`.
"""
current = getattr(IOLoop._current, "instance", None)
if current is None and instance:
return IOLoop.instance()
return current
IOLoop同样继承了Configurable类。这是IOLoop可以根据操作系统环境,使用EPollLoop还是SelectIOLoop的关键。
class IOLoop(Configurable):
@classmethod
def configurable_default(cls):
if hasattr(select, "epoll"):
from tornado.platform.epoll import EPollIOLoop
return EPollIOLoop
if hasattr(select, "kqueue"):
# Python 2.6+ on BSD or Mac
from tornado.platform.kqueue import KQueueIOLoop
return KQueueIOLoop
from tornado.platform.select import SelectIOLoop
return SelectIOLoop
在python中,select.select是一个函数,而select.epoll是一个类,所以tornado将select封装成了SelectIOLoop类,具有和select.epoll一致的对外接口。这两个类具备注册,修改和删除监听列表并执行监听的能力。
class _Select(object):
"""A simple, select()-based IOLoop implementation for non-Linux systems"""
def __init__(self):
self.read_fds = set()
self.write_fds = set()
self.error_fds = set()
self.fd_sets = (self.read_fds, self.write_fds, self.error_fds)
def close(self):
pass
def register(self, fd, events):
if fd in self.read_fds or fd in self.write_fds or fd in self.error_fds:
raise IOError("fd %s already registered" % fd)
if events & IOLoop.READ:
self.read_fds.add(fd)
if events & IOLoop.WRITE:
self.write_fds.add(fd)
if events & IOLoop.ERROR:
self.error_fds.add(fd)
# Closed connections are reported as errors by epoll and kqueue,
# but as zero-byte reads by select, so when errors are requested
# we need to listen for both read and error.
# self.read_fds.add(fd)
def modify(self, fd, events):
self.unregister(fd)
self.register(fd, events)
def unregister(self, fd):
self.read_fds.discard(fd)
self.write_fds.discard(fd)
self.error_fds.discard(fd)
def poll(self, timeout):
readable, writeable, errors = select.select(
self.read_fds, self.write_fds, self.error_fds, timeout)
events = {}
for fd in readable:
events[fd] = events.get(fd, 0) | IOLoop.READ
for fd in writeable:
events[fd] = events.get(fd, 0) | IOLoop.WRITE
for fd in errors:
events[fd] = events.get(fd, 0) | IOLoop.ERROR
return events.items()
class SelectIOLoop(PollIOLoop):
def initialize(self, **kwargs):
super(SelectIOLoop, self).initialize(impl=_Select(), **kwargs)
Epoll的性能是远远高于select的,因而tornado会优先选择epoll。
此前的add_sockets这一个函数中,就会调用IOLoop的register方法,注册需要监听IO事件。
当IOLoop start以后,就开始进入一个web服务最基本的死循环了——监听IO事件,并在接收到请求后调用相应的handler处理。
网友评论