美文网首页Tornado 源码解读
从一个简单的例子理解tornado源码

从一个简单的例子理解tornado源码

作者: 高稚商de菌 | 来源:发表于2018-07-29 17:19 被阅读0次

    Tornado是一个高性能非阻塞的web框架,利用非阻塞和epoll,每秒可以处理数以千计的连接。Torando框架的源码代码简洁,实现精巧,结构清晰,demo丰富,适合进一步阅读研究。
    Tornado框架包括了底层IO的实现,TCP传输控制的实现,HTTP层的实现,以及web相关的静态文件渲染,auth,路由等功能,还有一些测试代码和平台相关的代码。本文从一个简单的例子出发,阅读相关的tornado源码。

    说明:

    • 本文python版本2.7.12,tornado版本4.5.3
    • 在展示类中的函数时,本文中只展示相关的函数。
    1. 一个简单的例子
    from tornado import ioloop, web, httpserver
    
    
    class MainHandler(web.RequestHandler):
        def get(self):
            self.write("Hello, world")
    
    app = web.Application([
        (r"/index", MainHandler),
    ])
    
    if __name__ == "__main__":
        server = httpserver.HTTPServer(app)
        server.bind(8888)
        server.start()
        ioloop.IOLoop.current().start()
    

    上述代码首先初始化了一个应用(Application)。Application实现的是路由功能,将请求根据路径,host等属性与相关的处理函数匹配起来。然后用一个Httpserver去实现这个应用。最后启动ioloop,进行网络io的操作。因为ioloop是单例的,所以,启动ioloop时无须指定HTTPServer。
    这里有几点值得注意的:

    • 可以省略HTTPServer这一步,使用app.listen或server.listen会自动创建一个HTTPServer比如:
        app.listen(8888)
        ioloop.IOLoop.current().start()
    

    或者

        server = httpserver.HTTPServer(app)
        server.listen(8888)
        ioloop.IOLoop.current().start()
    
    • HTTPServer可以以多进程的方式启动,通过start参数指定子进程个数,None或<=0会默认按照cpu核数启动相应个数的子进程,;不填或等于1的话,就以单进程启动(强迫症晚期的我不禁要问,为啥不能起一个子进程呢?);>1会启动相应个数的子进程。。例如:
    server.start(4) # 启动1个父进程,4个子进程
    
    • 突发奇想, 能否同时监听多个端口呢?经过尝试,发现仅当单进程模式时,可以启动两个server,且两个server都有效。具体原因不知。
        server1 = httpserver.HTTPServer(app)
        server1.bind(8888)
        server1.start()
        server2 = httpserver.HTTPServer(app)
        server2.bind(9888)
        server2.start()
        ioloop.IOLoop.current().start()
    
    3. server = httpserver.HTTPServer(app)

    这行代码是启动了一个HTTPServer。HTTPServer这个类继承了TCPServer, Configurable,
    HTTPServerConnectionDelegate这三个类。其中Configurable类可以认为是一个抽象类,利用new函数实现了工厂方法,可以根据不同配置,创建不同类型的对象。同时这里HTTPServer继承了多个父类,从而引出了钻石继承的问题。

    • HTTPServer是一个新式类,因此钻石继承的方法解析顺序(MRO)使用广度有限搜索。
    • __new__函数是构造对象,__init__是初始化,因此__new__先于__init__被调用。而initialize在__new__中被调用,因此initialize也先于__init__被调用。
      可以阅读一下Configurable的__new__函数。
    
    class Configurable(object):
        """Base class for configurable interfaces.
    
        A configurable interface is an (abstract) class whose constructor
        acts as a factory function for one of its implementation subclasses.
        The implementation subclass as well as optional keyword arguments to
        its initializer can be set globally at runtime with `configure`.
    
        By using the constructor as the factory method, the interface
        looks like a normal class, `isinstance` works as usual, etc.  This
        pattern is most useful when the choice of implementation is likely
        to be a global decision (e.g. when `~select.epoll` is available,
        always use it instead of `~select.select`), or when a
        previously-monolithic class has been split into specialized
        subclasses.
    
        Configurable subclasses must define the class methods
        `configurable_base` and `configurable_default`, and use the instance
        method `initialize` instead of ``__init__``.
        """
    
        def __new__(cls, *args, **kwargs):
            base = cls.configurable_base()
            init_kwargs = {}
            if cls is base:
                impl = cls.configured_class()
                if base.__impl_kwargs:
                    init_kwargs.update(base.__impl_kwargs)
            else:
                impl = cls
            init_kwargs.update(kwargs)
            instance = super(Configurable, cls).__new__(impl)
            # initialize vs __init__ chosen for compatibility with AsyncHTTPClient
            # singleton magic.  If we get rid of that we can switch to __init__
            # here too.
            instance.initialize(*args, **init_kwargs)
            return instance
    

    以下是HTTPServer的代码,HTTPServer是一个非阻塞,单线程的http server。__init__函数是必须的,虽然没有任何有效代码,但是阻止了父类__init__函数被调用。

    class HTTPServer(TCPServer, Configurable,
                     httputil.HTTPServerConnectionDelegate):
    
        def __init__(self, *args, **kwargs):
            # Ignore args to __init__; real initialization belongs in
            # initialize since we're Configurable. (there's something
            # weird in initialization order between this class,
            # Configurable, and TCPServer so we can't leave __init__ out
            # completely)
            pass
    
        def initialize(self, request_callback, no_keep_alive=False, io_loop=None,
                       xheaders=False, ssl_options=None, protocol=None,
                       decompress_request=False,
                       chunk_size=None, max_header_size=None,
                       idle_connection_timeout=None, body_timeout=None,
                       max_body_size=None, max_buffer_size=None,
                       trusted_downstream=None):
            self.request_callback = request_callback
            self.no_keep_alive = no_keep_alive
            self.xheaders = xheaders
            self.protocol = protocol
            self.conn_params = HTTP1ConnectionParameters(
                decompress=decompress_request,
                chunk_size=chunk_size,
                max_header_size=max_header_size,
                header_timeout=idle_connection_timeout or 3600,
                max_body_size=max_body_size,
                body_timeout=body_timeout,
                no_keep_alive=no_keep_alive)
            TCPServer.__init__(self, io_loop=io_loop, ssl_options=ssl_options,
                               max_buffer_size=max_buffer_size,
                               read_chunk_size=chunk_size)
            self._connections = set()
            self.trusted_downstream = trusted_downstream
    

    其中,考虑到MRO顺序,初始化TCPServer的代码也可以用super写,但是不够直观:

            super(HTTPServer, self).__init__(io_loop=io_loop, ssl_options=ssl_options,
                               max_buffer_size=max_buffer_size,
                               read_chunk_size=chunk_size)
    
    2. server.bind(8888); server.start()
    class TCPServer(object):
    
        def add_sockets(self, sockets):
            """Makes this server start accepting connections on the given sockets.
    
            The ``sockets`` parameter is a list of socket objects such as
            those returned by `~tornado.netutil.bind_sockets`.
            `add_sockets` is typically used in combination with that
            method and `tornado.process.fork_processes` to provide greater
            control over the initialization of a multi-process server.
            """
            if self.io_loop is None:
                self.io_loop = IOLoop.current()
    
            for sock in sockets:
                self._sockets[sock.fileno()] = sock
                add_accept_handler(sock, self._handle_connection,
                                   io_loop=self.io_loop)
    
        def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=128,
                 reuse_port=False):
            """Binds this server to the given port on the given address.
    
            To start the server, call `start`. If you want to run this server
            in a single process, you can call `listen` as a shortcut to the
            sequence of `bind` and `start` calls.
    
            Address may be either an IP address or hostname.  If it's a hostname,
            the server will listen on all IP addresses associated with the
            name.  Address may be an empty string or None to listen on all
            available interfaces.  Family may be set to either `socket.AF_INET`
            or `socket.AF_INET6` to restrict to IPv4 or IPv6 addresses, otherwise
            both will be used if available.
    
            The ``backlog`` argument has the same meaning as for
            `socket.listen <socket.socket.listen>`. The ``reuse_port`` argument
            has the same meaning as for `.bind_sockets`.
    
            This method may be called multiple times prior to `start` to listen
            on multiple ports or interfaces.
    
            .. versionchanged:: 4.4
               Added the ``reuse_port`` argument.
            """
            sockets = bind_sockets(port, address=address, family=family,
                                   backlog=backlog, reuse_port=reuse_port)
            if self._started:
                self.add_sockets(sockets)
            else:
                self._pending_sockets.extend(sockets)
    
        def start(self, num_processes=1):
            """Starts this server in the `.IOLoop`.
    
            By default, we run the server in this process and do not fork any
            additional child process.
    
            If num_processes is ``None`` or <= 0, we detect the number of cores
            available on this machine and fork that number of child
            processes. If num_processes is given and > 1, we fork that
            specific number of sub-processes.
    
            Since we use processes and not threads, there is no shared memory
            between any server code.
    
            Note that multiple processes are not compatible with the autoreload
            module (or the ``autoreload=True`` option to `tornado.web.Application`
            which defaults to True when ``debug=True``).
            When using multiple processes, no IOLoops can be created or
            referenced until after the call to ``TCPServer.start(n)``.
            """
            assert not self._started
            self._started = True
            if num_processes != 1:
                process.fork_processes(num_processes)
            sockets = self._pending_sockets
            self._pending_sockets = []
            self.add_sockets(sockets)
    
    

    当有需要监听的IP包含多个实际的IP时(例如0.0.0.0),bind_sockets可能会返回多个socket对象。

    3. ioloop.IOLoop.current().start()

    IOLoop是底层处理IO事件的库。核心类是IOLoop。IOLoop是单例模式的。

    class IOLoop(Configurable):
            @staticmethod
        def instance():
            """Returns a global `IOLoop` instance.
    
            Most applications have a single, global `IOLoop` running on the
            main thread.  Use this method to get this instance from
            another thread.  In most other cases, it is better to use `current()`
            to get the current thread's `IOLoop`.
            """
            if not hasattr(IOLoop, "_instance"):
                with IOLoop._instance_lock:
                    if not hasattr(IOLoop, "_instance"):
                        # New instance after double check
                        IOLoop._instance = IOLoop()
            return IOLoop._instance
            
        @staticmethod
        def current(instance=True):
            """Returns the current thread's `IOLoop`.
    
            If an `IOLoop` is currently running or has been marked as
            current by `make_current`, returns that instance.  If there is
            no current `IOLoop`, returns `IOLoop.instance()` (i.e. the
            main thread's `IOLoop`, creating one if necessary) if ``instance``
            is true.
    
            In general you should use `IOLoop.current` as the default when
            constructing an asynchronous object, and use `IOLoop.instance`
            when you mean to communicate to the main thread from a different
            one.
    
            .. versionchanged:: 4.1
               Added ``instance`` argument to control the fallback to
               `IOLoop.instance()`.
            """
            current = getattr(IOLoop._current, "instance", None)
            if current is None and instance:
                return IOLoop.instance()
            return current
    

    IOLoop同样继承了Configurable类。这是IOLoop可以根据操作系统环境,使用EPollLoop还是SelectIOLoop的关键。

    class IOLoop(Configurable):
        @classmethod
        def configurable_default(cls):
            if hasattr(select, "epoll"):
                from tornado.platform.epoll import EPollIOLoop
                return EPollIOLoop
            if hasattr(select, "kqueue"):
                # Python 2.6+ on BSD or Mac
                from tornado.platform.kqueue import KQueueIOLoop
                return KQueueIOLoop
            from tornado.platform.select import SelectIOLoop
            return SelectIOLoop
    

    在python中,select.select是一个函数,而select.epoll是一个类,所以tornado将select封装成了SelectIOLoop类,具有和select.epoll一致的对外接口。这两个类具备注册,修改和删除监听列表并执行监听的能力。

    class _Select(object):
        """A simple, select()-based IOLoop implementation for non-Linux systems"""
        def __init__(self):
            self.read_fds = set()
            self.write_fds = set()
            self.error_fds = set()
            self.fd_sets = (self.read_fds, self.write_fds, self.error_fds)
    
        def close(self):
            pass
    
        def register(self, fd, events):
            if fd in self.read_fds or fd in self.write_fds or fd in self.error_fds:
                raise IOError("fd %s already registered" % fd)
            if events & IOLoop.READ:
                self.read_fds.add(fd)
            if events & IOLoop.WRITE:
                self.write_fds.add(fd)
            if events & IOLoop.ERROR:
                self.error_fds.add(fd)
                # Closed connections are reported as errors by epoll and kqueue,
                # but as zero-byte reads by select, so when errors are requested
                # we need to listen for both read and error.
                # self.read_fds.add(fd)
    
        def modify(self, fd, events):
            self.unregister(fd)
            self.register(fd, events)
    
        def unregister(self, fd):
            self.read_fds.discard(fd)
            self.write_fds.discard(fd)
            self.error_fds.discard(fd)
    
        def poll(self, timeout):
            readable, writeable, errors = select.select(
                self.read_fds, self.write_fds, self.error_fds, timeout)
            events = {}
            for fd in readable:
                events[fd] = events.get(fd, 0) | IOLoop.READ
            for fd in writeable:
                events[fd] = events.get(fd, 0) | IOLoop.WRITE
            for fd in errors:
                events[fd] = events.get(fd, 0) | IOLoop.ERROR
            return events.items()
    
    
    class SelectIOLoop(PollIOLoop):
        def initialize(self, **kwargs):
            super(SelectIOLoop, self).initialize(impl=_Select(), **kwargs)
    

    Epoll的性能是远远高于select的,因而tornado会优先选择epoll。
    此前的add_sockets这一个函数中,就会调用IOLoop的register方法,注册需要监听IO事件。
    当IOLoop start以后,就开始进入一个web服务最基本的死循环了——监听IO事件,并在接收到请求后调用相应的handler处理。

    相关文章

      网友评论

        本文标题:从一个简单的例子理解tornado源码

        本文链接:https://www.haomeiwen.com/subject/kkqtvftx.html