美文网首页
tornado流程第二次分析

tornado流程第二次分析

作者: llicety | 来源:发表于2018-10-18 08:58 被阅读0次

    先上代码例子:

    import tornado.httpserver
    import tornado.ioloop
    import tornado.options
    import tornado.web
    
    from tornado.options import define, options
    define("port", default=8000, help="run on the given port", type=int)
    
    class IndexHandler(tornado.web.RequestHandler):
        def get(self):
            greeting = self.get_argument('greeting', 'Hello')
            self.write(greeting + ', friendly user!')
    
    if __name__ == "__main__":
        tornado.options.parse_command_line()
        app = tornado.web.Application(handlers=[(r"/", IndexHandler)])
        http_server = tornado.httpserver.HTTPServer(app)
        http_server.listen(options.port)
        tornado.ioloop.IOLoop.instance().start()
    

    关于tornado,它既是web服务器(看成nginx服务),又是web框架(看成一个wsgi程序),不过官方推荐两个一起用才能发挥最大功效。所以不管是tornado服务器,加上其他的wsgi程序(比如django), 还是nginx + WSGI + tornado框架都不能很好的发挥tornado的能力。

    基于上面的代码,程序都是按照顺序执行,那么入口一般都是最后一个,所以这里也就是start(),先了解start前的准备工作,start之前最近的准备工作listen()函数,那先看listen做了什么?

        def listen(self, port, address=""):
            """Starts accepting connections on the given port.
    
            This method may be called more than once to listen on multiple ports.
            `listen` takes effect immediately; it is not necessary to call
            `TCPServer.start` afterwards.  It is, however, necessary to start
            the `.IOLoop`.
            """
            sockets = bind_sockets(port, address=address)
            self.add_sockets(sockets)
    

    bind_sockets函数作用其实就是socket编程过程中的前2步组合(创建socket,绑定socket到对应的端口),然后再看self.add_sockets(sockets)

        def add_sockets(self, sockets):
            """Makes this server start accepting connections on the given sockets.
    
            The ``sockets`` parameter is a list of socket objects such as
            those returned by `~tornado.netutil.bind_sockets`.
            `add_sockets` is typically used in combination with that
            method and `tornado.process.fork_processes` to provide greater
            control over the initialization of a multi-process server.
            """
            for sock in sockets:
                self._sockets[sock.fileno()] = sock
                self._handlers[sock.fileno()] = add_accept_handler(
                    sock, self._handle_connection)
    

    这里重点关注self._handlers[sock.fileno()] = add_accept_handler(sock, self._handle_connection),其实他的作用就是把这个socket加入ioloop的循环队列,当ioloop监听到这个socket有新的连接请求的时候(READ事件)就调用回调函数self._handle_connection去处理。add_accept_handler函数的源码如下:

    def add_accept_handler(sock, callback):
        """Adds an `.IOLoop` event handler to accept new connections on ``sock``.
    
        When a connection is accepted, ``callback(connection, address)`` will
        be run (``connection`` is a socket object, and ``address`` is the
        address of the other end of the connection).  Note that this signature
        is different from the ``callback(fd, events)`` signature used for
        `.IOLoop` handlers.
    
        A callable is returned which, when called, will remove the `.IOLoop`
        event handler and stop processing further incoming connections.
    
        .. versionchanged:: 5.0
           The ``io_loop`` argument (deprecated since version 4.1) has been removed.
    
        .. versionchanged:: 5.0
           A callable is returned (``None`` was returned before).
        """
        io_loop = IOLoop.current()
        removed = [False]
    
        def accept_handler(fd, events):
            # More connections may come in while we're handling callbacks;
            # to prevent starvation of other tasks we must limit the number
            # of connections we accept at a time.  Ideally we would accept
            # up to the number of connections that were waiting when we
            # entered this method, but this information is not available
            # (and rearranging this method to call accept() as many times
            # as possible before running any callbacks would have adverse
            # effects on load balancing in multiprocess configurations).
            # Instead, we use the (default) listen backlog as a rough
            # heuristic for the number of connections we can reasonably
            # accept at once.
            for i in xrange(_DEFAULT_BACKLOG):
                if removed[0]:
                    # The socket was probably closed
                    return
                try:
                    connection, address = sock.accept()
                except socket.error as e:
                    # _ERRNO_WOULDBLOCK indicate we have accepted every
                    # connection that is available.
                    if errno_from_exception(e) in _ERRNO_WOULDBLOCK:
                        return
                    # ECONNABORTED indicates that there was a connection
                    # but it was closed while still in the accept queue.
                    # (observed on FreeBSD).
                    if errno_from_exception(e) == errno.ECONNABORTED:
                        continue
                    raise
                set_close_exec(connection.fileno())
                callback(connection, address)
    
        def remove_handler():
            io_loop.remove_handler(sock)
            removed[0] = True
    
        io_loop.add_handler(sock, accept_handler, IOLoop.READ)
        return remove_handler
    

    重点先关注这一句:io_loop.add_handler(sock, accept_handler, IOLoop.READ),也就是将监听了端口的socket加到ioloop的循环队列中,监听事件为IOLoop.READ,并有可读事件发生(比如http请求)时执行回调accept_handler,而accept_handler函数的作用就是处理socket.accept接收到的socket,然后用回调函数callback处理socket,那么这里的callback函数是啥呢?就是add_accept_handler函数的参数,那再回到上面分析的add_sockets函数中self._handlers[sock.fileno()] = add_accept_handler(sock, self._handle_connection),所以callback其实就是self._handle_connection啦,也就是该端口的所有的socket请求都是由self._handle_connection去处理的。简化其处理流程,其实就是:
    tornado/tcpserver.py中的TcpServer(代码中的HttpServer继承了TcpServer)类的listen函数将监听端口的socket加到了ioloop循环中,监听事件为可读,当有读的事件发生时,执行self._handle_connection回调。
    接下来看回调函数self._handle_connection。

        def _handle_connection(self, connection, address):
            if self.ssl_options is not None:
                assert ssl, "Python 2.6+ and OpenSSL required for SSL"
                try:
                    connection = ssl_wrap_socket(connection,
                                                 self.ssl_options,
                                                 server_side=True,
                                                 do_handshake_on_connect=False)
                except ssl.SSLError as err:
                    if err.args[0] == ssl.SSL_ERROR_EOF:
                        return connection.close()
                    else:
                        raise
                except socket.error as err:
                    # If the connection is closed immediately after it is created
                    # (as in a port scan), we can get one of several errors.
                    # wrap_socket makes an internal call to getpeername,
                    # which may return either EINVAL (Mac OS X) or ENOTCONN
                    # (Linux).  If it returns ENOTCONN, this error is
                    # silently swallowed by the ssl module, so we need to
                    # catch another error later on (AttributeError in
                    # SSLIOStream._do_ssl_handshake).
                    # To test this behavior, try nmap with the -sT flag.
                    # https://github.com/tornadoweb/tornado/pull/750
                    if errno_from_exception(err) in (errno.ECONNABORTED, errno.EINVAL):
                        return connection.close()
                    else:
                        raise
            try:
                if self.ssl_options is not None:
                    stream = SSLIOStream(connection,
                                         max_buffer_size=self.max_buffer_size,
                                         read_chunk_size=self.read_chunk_size)
                else:
                    stream = IOStream(connection,
                                      max_buffer_size=self.max_buffer_size,
                                      read_chunk_size=self.read_chunk_size)
    
                future = self.handle_stream(stream, address)
                if future is not None:
                    IOLoop.current().add_future(gen.convert_yielded(future),
                                                lambda f: f.result())
            except Exception:
                app_log.error("Error in connection callback", exc_info=True)
    

    其实就是创建读取socket数据的stream,然后调用self.handle_stream函数去处理stream。TcpServer类中的self.handle_stream是一个虚函数,需要继承它的类自己去实现,其代码如下:

        def handle_stream(self, stream, address):
            """Override to handle a new `.IOStream` from an incoming connection.
    
            This method may be a coroutine; if so any exceptions it raises
            asynchronously will be logged. Accepting of incoming connections
            will not be blocked by this coroutine.
    
            If this `TCPServer` is configured for SSL, ``handle_stream``
            may be called before the SSL handshake has completed. Use
            `.SSLIOStream.wait_for_handshake` if you need to verify the client's
            certificate or use NPN/ALPN.
    
            .. versionchanged:: 4.2
               Added the option for this method to be a coroutine.
            """
            raise NotImplementedError()
    

    这样设计也更加灵活,不同的上层协议有自己不同的处理,比如http或者ftp。开头的代码例子中用的是HttpServer,所以这里关注HttpServer类的handle_stream函数

        def handle_stream(self, stream, address):
            context = _HTTPRequestContext(stream, address,
                                          self.protocol,
                                          self.trusted_downstream)
            conn = HTTP1ServerConnection(
                stream, self.conn_params, context)
            self._connections.add(conn)
            conn.start_serving(self)
    

    这里的处理其实也就是,根据获取到的数据新建上下文信息,创建一个新的http连接服务,然后将连接记录到httpserver对象中,以便在关闭服务或者连接的时候找到这个连接对象。后续的操作都是对应这个HTTP1ServerConnection对象,一个http请求会有一个这样的HTTP1ServerConnection对象。接下来看看conn.start_serving(self)都做了什么。

        def start_serving(self, delegate):
            """Starts serving requests on this connection.
    
            :arg delegate: a `.HTTPServerConnectionDelegate`
            """
            assert isinstance(delegate, httputil.HTTPServerConnectionDelegate)
            self._serving_future = self._server_request_loop(delegate)
            # Register the future on the IOLoop so its errors get logged.
            self.stream.io_loop.add_future(self._serving_future,
                                           lambda f: f.result())
    

    简单来说就是根据HTTP1ServerConnection的信息,生成一个HTTP1Connection,并在循环中处理这个HTTP1Connection,这个循环用一个新的协程去处理,然后不断的接收stream的数据,然后处理stream的数据。具体见self._server_request_loop代码

        @gen.coroutine
        def _server_request_loop(self, delegate):
            try:
                while True:
                    conn = HTTP1Connection(self.stream, False,
                                           self.params, self.context)
                    request_delegate = delegate.start_request(self, conn)
                    try:
                        ret = yield conn.read_response(request_delegate)
                    except (iostream.StreamClosedError,
                            iostream.UnsatisfiableReadError):
                        return
                    except _QuietException:
                        # This exception was already logged.
                        conn.close()
                        return
                    except Exception:
                        gen_log.error("Uncaught exception", exc_info=True)
                        conn.close()
                        return
                    if not ret:
                        return
                    yield gen.moment
            finally:
                delegate.on_close(self)
    

    关键点在于delegate.start_request函数。这个delegate是HTTPServer类中handle_stream函数中的conn.start_serving(self)中的self,所以self也就是HTTPServer类,所以看HTTPServer中的start_request就好了。

        def start_request(self, server_conn, request_conn):
            if isinstance(self.request_callback, httputil.HTTPServerConnectionDelegate):
                delegate = self.request_callback.start_request(server_conn, request_conn)
            else:
                delegate = _CallableAdapter(self.request_callback, request_conn)
    
            if self.xheaders:
                delegate = _ProxyAdapter(delegate, request_conn)
    
            return delegate
    

    start_request函数中的self.request_callback就是例子中Application,这个在HTTPServer的init函数中可以看到。Application是Application(ReversibleRouter),继承自Router(httputil.HTTPServerConnectionDelegate),Router继承自httputil.HTTPServerConnectionDelegate,所以这里会走delegate = self.request_callback.start_request(server_conn, request_conn)这里,所以也就是走Application类的start_request函数。tornado/web.py中的Application类是没有start_request函数的,所以这里去它的父类中寻找,在Router类中可以找到

    class Router(httputil.HTTPServerConnectionDelegate):
        """Abstract router interface."""
    
        def find_handler(self, request, **kwargs):
            # type: (httputil.HTTPServerRequest, typing.Any)->httputil.HTTPMessageDelegate
            """Must be implemented to return an appropriate instance of `~.httputil.HTTPMessageDelegate`
            that can serve the request.
            Routing implementations may pass additional kwargs to extend the routing logic.
    
            :arg httputil.HTTPServerRequest request: current HTTP request.
            :arg kwargs: additional keyword arguments passed by routing implementation.
            :returns: an instance of `~.httputil.HTTPMessageDelegate` that will be used to
                process the request.
            """
            raise NotImplementedError()
    
        def start_request(self, server_conn, request_conn):
            return _RoutingDelegate(self, server_conn, request_conn)
    

    那么到这里,再回看_server_request_loop函数,request_delegate = delegate.start_request(self, conn)调用链已经返回,request_delegate就是_RoutingDelegate类的实例。

        @gen.coroutine
        def _server_request_loop(self, delegate):
            try:
                while True:
                    conn = HTTP1Connection(self.stream, False,
                                           self.params, self.context)
                    request_delegate = delegate.start_request(self, conn)
                    try:
                        ret = yield conn.read_response(request_delegate)
                    except (iostream.StreamClosedError,
                            iostream.UnsatisfiableReadError):
                        return
                    except _QuietException:
                        # This exception was already logged.
                        conn.close()
                        return
                    except Exception:
                        gen_log.error("Uncaught exception", exc_info=True)
                        conn.close()
                        return
                    if not ret:
                        return
                    yield gen.moment
            finally:
                delegate.on_close(self)
    

    函数中的request_delegate = delegate.start_request(self, conn),就表明了request_delegate 是一个_RoutingDelegate对象,接下来分析ret = yield conn.read_response(request_delegate),在这之前,先看一下_RoutingDelegate类的定义。

    _RoutingDelegate类的定义如下:

    class _RoutingDelegate(httputil.HTTPMessageDelegate):
        def __init__(self, router, server_conn, request_conn):
            self.server_conn = server_conn
            self.request_conn = request_conn
            self.delegate = None
            self.router = router  # type: Router
    
        def headers_received(self, start_line, headers):
            request = httputil.HTTPServerRequest(
                connection=self.request_conn,
                server_connection=self.server_conn,
                start_line=start_line, headers=headers)
    
            self.delegate = self.router.find_handler(request) #这里的delegate是_HandlerDelegate
            if self.delegate is None:
                app_log.debug("Delegate for %s %s request not found",
                              start_line.method, start_line.path)
                self.delegate = _DefaultMessageDelegate(self.request_conn)
    
            return self.delegate.headers_received(start_line, headers) 
    
        def data_received(self, chunk):
            return self.delegate.data_received(chunk)
    
        def finish(self):
            self.delegate.finish()
    
        def on_connection_close(self):
            self.delegate.on_connection_close()
    

    在了解了request_delegate就是_RoutingDelegate实例,所以接下来看看conn.read_response对request_delegate做了什么?

        def read_response(self, delegate):
            """Read a single HTTP response.
    
            Typical client-mode usage is to write a request using `write_headers`,
            `write`, and `finish`, and then call ``read_response``.
    
            :arg delegate: a `.HTTPMessageDelegate`
    
            Returns a `.Future` that resolves to None after the full response has
            been read.
            """
            if self.params.decompress:
                delegate = _GzipMessageDelegate(delegate, self.params.chunk_size)
            return self._read_message(delegate)
    

    _GzipMessageDelegate类其实就是对delegate,也就是_RoutingDelegate类的封装。不管怎么样,看_read_message(delegate)做了什么处理就好了。

        @gen.coroutine
        def _read_message(self, delegate):
            need_delegate_close = False
            try:
                header_future = self.stream.read_until_regex(
                    b"\r?\n\r?\n",
                    max_bytes=self.params.max_header_size)
                if self.params.header_timeout is None:
                    header_data = yield header_future
                else:
                    try:
                        header_data = yield gen.with_timeout(
                            self.stream.io_loop.time() + self.params.header_timeout,
                            header_future,
                            quiet_exceptions=iostream.StreamClosedError)
                    except gen.TimeoutError:
                        self.close()
                        raise gen.Return(False)
                start_line, headers = self._parse_headers(header_data)
                if self.is_client:
                    start_line = httputil.parse_response_start_line(start_line)
                    self._response_start_line = start_line
                else:
                    start_line = httputil.parse_request_start_line(start_line)
                    self._request_start_line = start_line
                    self._request_headers = headers
    
                self._disconnect_on_finish = not self._can_keep_alive(
                    start_line, headers)
                need_delegate_close = True
                with _ExceptionLoggingContext(app_log):
                    header_future = delegate.headers_received(start_line, headers)
                    if header_future is not None:
                        yield header_future
                if self.stream is None:
                    # We've been detached.
                    need_delegate_close = False
                    raise gen.Return(False)
                skip_body = False
                if self.is_client:
                    if (self._request_start_line is not None and
                            self._request_start_line.method == 'HEAD'):
                        skip_body = True
                    code = start_line.code
                    if code == 304:
                        # 304 responses may include the content-length header
                        # but do not actually have a body.
                        # http://tools.ietf.org/html/rfc7230#section-3.3
                        skip_body = True
                    if code >= 100 and code < 200:
                        # 1xx responses should never indicate the presence of
                        # a body.
                        if ('Content-Length' in headers or
                                'Transfer-Encoding' in headers):
                            raise httputil.HTTPInputError(
                                "Response code %d cannot have body" % code)
                        # TODO: client delegates will get headers_received twice
                        # in the case of a 100-continue.  Document or change?
                        yield self._read_message(delegate)
                else:
                    if (headers.get("Expect") == "100-continue" and
                            not self._write_finished):
                        self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n")
                if not skip_body:
                    body_future = self._read_body(
                        start_line.code if self.is_client else 0, headers, delegate)
                    if body_future is not None:
                        if self._body_timeout is None:
                            yield body_future
                        else:
                            try:
                                yield gen.with_timeout(
                                    self.stream.io_loop.time() + self._body_timeout,
                                    body_future,
                                    quiet_exceptions=iostream.StreamClosedError)
                            except gen.TimeoutError:
                                gen_log.info("Timeout reading body from %s",
                                             self.context)
                                self.stream.close()
                                raise gen.Return(False)
                self._read_finished = True
                if not self._write_finished or self.is_client:
                    need_delegate_close = False
                    with _ExceptionLoggingContext(app_log):
                        delegate.finish()
                # If we're waiting for the application to produce an asynchronous
                # response, and we're not detached, register a close callback
                # on the stream (we didn't need one while we were reading)
                if (not self._finish_future.done() and
                        self.stream is not None and
                        not self.stream.closed()):
                    self.stream.set_close_callback(self._on_connection_close)
                    yield self._finish_future
                if self.is_client and self._disconnect_on_finish:
                    self.close()
                if self.stream is None:
                    raise gen.Return(False)
            except httputil.HTTPInputError as e:
                gen_log.info("Malformed HTTP message from %s: %s",
                             self.context, e)
                if not self.is_client:
                    yield self.stream.write(b'HTTP/1.1 400 Bad Request\r\n\r\n')
                self.close()
                raise gen.Return(False)
            finally:
                if need_delegate_close:
                    with _ExceptionLoggingContext(app_log):
                        delegate.on_connection_close()
                header_future = None
                self._clear_callbacks()
            raise gen.Return(True)
    

    这个函数内容很多,在这里面我们可以找到_read_message(delegate)中对delegate.headers_received和delegate.finish的调用,前面提到这里的delegate是_RoutingDelegate,再次回顾一下_RoutingDelegate的源码:

    class _RoutingDelegate(httputil.HTTPMessageDelegate):
        def __init__(self, router, server_conn, request_conn):
            self.server_conn = server_conn
            self.request_conn = request_conn
            self.delegate = None
            self.router = router  # type: Router
    
        def headers_received(self, start_line, headers):
            request = httputil.HTTPServerRequest(
                connection=self.request_conn,
                server_connection=self.server_conn,
                start_line=start_line, headers=headers)
    
            self.delegate = self.router.find_handler(request) #这里的delegate是_HandlerDelegate
            if self.delegate is None:
                app_log.debug("Delegate for %s %s request not found",
                              start_line.method, start_line.path)
                self.delegate = _DefaultMessageDelegate(self.request_conn)
    
            return self.delegate.headers_received(start_line, headers) 
    
        def data_received(self, chunk):
            return self.delegate.data_received(chunk)
    
        def finish(self):
            self.delegate.finish()
    
        def on_connection_close(self):
            self.delegate.on_connection_close()
    

    _RoutingDelegate中的headers_received函数会将收到的请求信息封装成一个request对象,并且在headers_received函数在最后会调用通过 self.router.find_handler(request)返回的对象self.delegate,进而调用self.delegate的headers_received函数和data_received函数。那么这里的self.router.find_hander函数返回的self.delegate到底是什么呢?接着往下面看。
    然后关注self.delegate = self.router.find_handler(request),这里的self.router其实就是Application,所以看看Application的find_handler函数做了什么?

        def find_handler(self, request, **kwargs):
            route = self.default_router.find_handler(request)
            if route is not None:
                return route
    
            if self.settings.get('default_handler_class'):
                return self.get_handler_delegate(
                    request,
                    self.settings['default_handler_class'],
                    self.settings.get('default_handler_args', {}))
    
            return self.get_handler_delegate(
                request, ErrorHandler, {'status_code': 404})
    

    其实就是从Application的self.default_router中找出处理request请求的handler,这里的self.default_router在Application的init函数中有实现:

            self.default_router = _ApplicationRouter(self, [
                Rule(AnyMatches(), self.wildcard_router)
            ])
    

    但是从_ApplicationRouter类中并没有找到find_handler函数,所以去它的父类中找,在class RuleRouter(Router):中可以找到find_handler函数的定义:

        def find_handler(self, request, **kwargs):
            for rule in self.rules:
                target_params = rule.matcher.match(request)
                if target_params is not None:
                    if rule.target_kwargs:
                        target_params['target_kwargs'] = rule.target_kwargs
    
                    delegate = self.get_target_delegate(
                        rule.target, request, **target_params)
    
                    if delegate is not None:
                        return delegate
    
            return None
    

    首先这里的self.rules是什么?self.rules是经过self.add_rules(rules)处理的,self.rules = [ self.wildcard_router],所以 rule.target = self.wildcard_router。此时的rule.matcher是AnyMachers,所以target_params是空{}不等于None,所以执行delegate = self.get_target_delegate,这时候的self.get_target_delegate函数是class _ApplicationRouter(ReversibleRuleRouter):类中的get_target_delegate函数,而不是父类RuleRouter中的(继承的时候,子类同名函数覆盖父类的),所以看class _ApplicationRouter(ReversibleRuleRouter)类中的get_target_delegate函数。

        def get_target_delegate(self, target, request, **target_params):
            if isclass(target) and issubclass(target, RequestHandler):
                return self.application.get_handler_delegate(request, target, **target_params)
    
            return super(_ApplicationRouter, self).get_target_delegate(target, request, **target_params)
    

    这里的target是self.wildcard_router,是一个_ApplicationRouter类的实例,所以走了super(_ApplicationRouter, self).get_target_delegate(target, request, **target_params),也就是调用RuleRouter的get_target_delegate

        def get_target_delegate(self, target, request, **target_params):
            """Returns an instance of `~.httputil.HTTPMessageDelegate` for a
            Rule's target. This method is called by `~.find_handler` and can be
            extended to provide additional target types.
    
            :arg target: a Rule's target.
            :arg httputil.HTTPServerRequest request: current request.
            :arg target_params: additional parameters that can be useful
                for `~.httputil.HTTPMessageDelegate` creation.
            """
            if isinstance(target, Router):
                return target.find_handler(request, **target_params)
    
            elif isinstance(target, httputil.HTTPServerConnectionDelegate):
                return target.start_request(request.server_connection, request.connection)
    
            elif callable(target):
                return _CallableAdapter(
                    partial(target, **target_params), request.connection
                )
    
            return None
    

    这里的target是self.wildcard_router = _ApplicationRouter(self, handlers),也还是_ApplicationRouter,他是Router的子类,所以直接走了第一个target.find_handler(request, **target_params),这里的target.find_handler还是上面的_ApplicationRouter的find_handler。但是self.wildcard_router = _ApplicationRouter(self, handlers)不一样的是,他的参数rules不再是Rule对象,而是一个用户自己定义的RequestHandler列表(就是用户自定义的路由),所以在_ApplicationRouter的父类中class RuleRouter(Router):中,用户自定义的路由handlers经过self.add_rules(rules)处理的得到self.rules(上面的self.rules = [Rule(AnyMatches(), self.wildcard_router)]也是一样)。

    class RuleRouter(Router):
        """Rule-based router implementation."""
    
        def __init__(self, rules=None):
            """Constructs a router from an ordered list of rules::
    
                RuleRouter([
                    Rule(PathMatches("/handler"), Target),
                    # ... more rules
                ])
    
            You can also omit explicit `Rule` constructor and use tuples of arguments::
    
                RuleRouter([
                    (PathMatches("/handler"), Target),
                ])
    
            `PathMatches` is a default matcher, so the example above can be simplified::
    
                RuleRouter([
                    ("/handler", Target),
                ])
    
            In the examples above, ``Target`` can be a nested `Router` instance, an instance of
            `~.httputil.HTTPServerConnectionDelegate` or an old-style callable,
            accepting a request argument.
    
            :arg rules: a list of `Rule` instances or tuples of `Rule`
                constructor arguments.
            """
            self.rules = []  # type: typing.List[Rule]
            if rules:
                self.add_rules(rules)
    
        def add_rules(self, rules):
            """Appends new rules to the router.
    
            :arg rules: a list of Rule instances (or tuples of arguments, which are
                passed to Rule constructor).
            """
            for rule in rules:
                if isinstance(rule, (tuple, list)):
                    assert len(rule) in (2, 3, 4)
                    if isinstance(rule[0], basestring_type):
                        rule = Rule(PathMatches(rule[0]), *rule[1:])
                    else:
                        rule = Rule(*rule)
    
                self.rules.append(self.process_rule(rule))
    

    在add_rules函数里面,这时候的rules就是我们定义的一个路由handlers,而for循环的rule是一个元组,即开头给的代码样例中的app = tornado.web.Application(handlers=[(r"/", IndexHandler)])中的(r"/", IndexHandler),rule[0]是一个路径字符串,所以最后的执行rule = Rule(PathMatches(rule[0]), *rule[1:]),这时候rule.targe就成了*rule[1:],也就是自己写的IndexHandler这个基于RequestHandler的类。这时候再回到_ApplicationRouter的find_handler函数,这一次的rule.matcher是PatchMatches,所以看看PatchMatches的match函数。

    class PathMatches(Matcher):
        """Matches requests with paths specified by ``path_pattern`` regex."""
    
        def __init__(self, path_pattern):
            if isinstance(path_pattern, basestring_type):
                if not path_pattern.endswith('$'):
                    path_pattern += '$'
                self.regex = re.compile(path_pattern)
            else:
                self.regex = path_pattern
    
            assert len(self.regex.groupindex) in (0, self.regex.groups), \
                ("groups in url regexes must either be all named or all "
                 "positional: %r" % self.regex.pattern)
    
            self._path, self._group_count = self._find_groups()
    
        def match(self, request):
            match = self.regex.match(request.path)
            if match is None:
                return None
            if not self.regex.groups:
                return {}
    
            path_args, path_kwargs = [], {}
    
            # Pass matched groups to the handler.  Since
            # match.groups() includes both named and
            # unnamed groups, we want to use either groups
            # or groupdict but not both.
            if self.regex.groupindex:
                path_kwargs = dict(
                    (str(k), _unquote_or_none(v))
                    for (k, v) in match.groupdict().items())
            else:
                path_args = [_unquote_or_none(s) for s in match.groups()]
    
            return dict(path_args=path_args, path_kwargs=path_kwargs)
    

    总结来说,就是根据patch制定的正则表达式,然后从request里面的url去匹配,没有匹配上返回None,find_handler函数执行下一个循环继续找,直到找到或者找完。这里假如找到,那么此时的targe就是RequestHandler的子类Indexhandler,所以再从_ApplicationRouter的get_target_delegate类中就知道了这次返回的target是self.application.get_handler_delegate(request, target, **target_params)的返回值。

        def get_target_delegate(self, target, request, **target_params):
            if isclass(target) and issubclass(target, RequestHandler):
                return self.application.get_handler_delegate(request, target, **target_params)
    
            return super(_ApplicationRouter, self).get_target_delegate(target, request, **target_params)
    

    self.application.get_handler_delegate调用的就是Application的get_handler_delegate,而Application的get_handler_delegate函数定义如下:

        def get_handler_delegate(self, request, target_class, target_kwargs=None,
                                 path_args=None, path_kwargs=None):
            """Returns `~.httputil.HTTPMessageDelegate` that can serve a request
            for application and `RequestHandler` subclass.
    
            :arg httputil.HTTPServerRequest request: current HTTP request.
            :arg RequestHandler target_class: a `RequestHandler` class.
            :arg dict target_kwargs: keyword arguments for ``target_class`` constructor.
            :arg list path_args: positional arguments for ``target_class`` HTTP method that
                will be executed while handling a request (``get``, ``post`` or any other).
            :arg dict path_kwargs: keyword arguments for ``target_class`` HTTP method.
            """
            return _HandlerDelegate(
                self, request, target_class, target_kwargs, path_args, path_kwargs)
    

    它返回的是_HandlerDelegate类。也就是说self.router.find_handler(request)返回的self.delegate 就是_HandlerDelegate。再次回顾一下_RoutingDelegate的源码:

    class _RoutingDelegate(httputil.HTTPMessageDelegate):
        def __init__(self, router, server_conn, request_conn):
            self.server_conn = server_conn
            self.request_conn = request_conn
            self.delegate = None
            self.router = router  # type: Router
    
        def headers_received(self, start_line, headers):
            request = httputil.HTTPServerRequest(
                connection=self.request_conn,
                server_connection=self.server_conn,
                start_line=start_line, headers=headers)
    
            self.delegate = self.router.find_handler(request) #这里的delegate是_HandlerDelegate
            if self.delegate is None:
                app_log.debug("Delegate for %s %s request not found",
                              start_line.method, start_line.path)
                self.delegate = _DefaultMessageDelegate(self.request_conn)
    
            return self.delegate.headers_received(start_line, headers) 
    
        def data_received(self, chunk):
            return self.delegate.data_received(chunk)
    
        def finish(self):
            self.delegate.finish()
    
        def on_connection_close(self):
            self.delegate.on_connection_close()
    

    得到_read_message(delegate)函数内部的调用链关系是:_read_message调用了_RoutingDelegate的headers_received和delegate.finish,也就调用了_HandlerDelegate的headers_received和delegate.finish。
    先直接看_HandlerDelegate类的定义,重点关注handler_class,headers_received和data_received函数:

    class _HandlerDelegate(httputil.HTTPMessageDelegate):
        def __init__(self, application, request, handler_class, handler_kwargs,
                     path_args, path_kwargs):
            self.application = application
            self.connection = request.connection
            self.request = request
            self.handler_class = handler_class
            self.handler_kwargs = handler_kwargs or {}
            self.path_args = path_args or []
            self.path_kwargs = path_kwargs or {}
            self.chunks = []
            self.stream_request_body = _has_stream_request_body(self.handler_class)
    
        def headers_received(self, start_line, headers):
            if self.stream_request_body:
                self.request.body = Future()
                return self.execute()
    
        def data_received(self, data):
            if self.stream_request_body:
                return self.handler.data_received(data)
            else:
                self.chunks.append(data)
        
        def finish(self):
            if self.stream_request_body:
                future_set_result_unless_cancelled(self.request.body, None)
            else:
                self.request.body = b''.join(self.chunks)
                self.request._parse_body()
                self.execute()
    

    这里关注delegate.excute也就是_HandlerDelegate类的excute函数,他是通往调用tornado框架中用户自定义RequestHandler的调用入口了。

        def execute(self):
            # If template cache is disabled (usually in the debug mode),
            # re-compile templates and reload static files on every
            # request so you don't need to restart to see changes
            if not self.application.settings.get("compiled_template_cache", True):
                with RequestHandler._template_loader_lock:
                    for loader in RequestHandler._template_loaders.values():
                        loader.reset()
            if not self.application.settings.get('static_hash_cache', True):
                StaticFileHandler.reset()
    
            self.handler = self.handler_class(self.application, self.request,
                                              **self.handler_kwargs)
            transforms = [t(self.request) for t in self.application.transforms]
    
            if self.stream_request_body:
                self.handler._prepared_future = Future()
            # Note that if an exception escapes handler._execute it will be
            # trapped in the Future it returns (which we are ignoring here,
            # leaving it to be logged when the Future is GC'd).
            # However, that shouldn't happen because _execute has a blanket
            # except handler, and we cannot easily access the IOLoop here to
            # call add_future (because of the requirement to remain compatible
            # with WSGI)
            self.handler._execute(transforms, *self.path_args,
                                  **self.path_kwargs)
            # If we are streaming the request body, then execute() is finished
            # when the handler has prepared to receive the body.  If not,
            # it doesn't matter when execute() finishes (so we return None)
            return self.handler._prepared_future
    

    这里重点关注self.handler = self.handler_class(self.application, self.request,**self.handler_kwargs)和self.handler._execute函数。刚刚上面分析了_ApplicationRouter类的find_handler函数如下:

        def find_handler(self, request, **kwargs):
            for rule in self.rules:
                target_params = rule.matcher.match(request)
                if target_params is not None:
                    if rule.target_kwargs:
                        target_params['target_kwargs'] = rule.target_kwargs
    
                    delegate = self.get_target_delegate(
                        rule.target, request, **target_params)
    
                    if delegate is not None:
                        return delegate
    
            return None
    

    因为delegate = self.get_target_delegate,所以delegate为_HandlerDelegate的实例,他的初始化参数handler_class=rule.targe,也就是根据路由Router获取到的自己定义的IndexHandler类。而self.handler._execute的定义如下:

        @gen.coroutine
        def _execute(self, transforms, *args, **kwargs):
            """Executes this request with the given output transforms."""
            self._transforms = transforms
            try:
                if self.request.method not in self.SUPPORTED_METHODS:
                    raise HTTPError(405)
                self.path_args = [self.decode_argument(arg) for arg in args]
                self.path_kwargs = dict((k, self.decode_argument(v, name=k))
                                        for (k, v) in kwargs.items())
                # If XSRF cookies are turned on, reject form submissions without
                # the proper cookie
                if self.request.method not in ("GET", "HEAD", "OPTIONS") and \
                        self.application.settings.get("xsrf_cookies"):
                    self.check_xsrf_cookie()
    
                result = self.prepare()
                if result is not None:
                    result = yield result
                if self._prepared_future is not None:
                    # Tell the Application we've finished with prepare()
                    # and are ready for the body to arrive.
                    future_set_result_unless_cancelled(self._prepared_future, None)
                if self._finished:
                    return
    
                if _has_stream_request_body(self.__class__):
                    # In streaming mode request.body is a Future that signals
                    # the body has been completely received.  The Future has no
                    # result; the data has been passed to self.data_received
                    # instead.
                    try:
                        yield self.request.body
                    except iostream.StreamClosedError:
                        return
    
                method = getattr(self, self.request.method.lower())
                result = method(*self.path_args, **self.path_kwargs)
                if result is not None:
                    result = yield result
                if self._auto_finish and not self._finished:
                    self.finish()
            except Exception as e:
                try:
                    self._handle_request_exception(e)
                except Exception:
                    app_log.error("Exception in exception handler", exc_info=True)
                finally:
                    # Unset result to avoid circular references
                    result = None
                if (self._prepared_future is not None and
                        not self._prepared_future.done()):
                    # In case we failed before setting _prepared_future, do it
                    # now (to unblock the HTTP server).  Note that this is not
                    # in a finally block to avoid GC issues prior to Python 3.4.
                    self._prepared_future.set_result(None)
    

    重点关注

     method = getattr(self, self.request.method.lower())
     result = method(*self.path_args, **self.path_kwargs)
    

    getattr(self, self.request.method.lower())中的self就是self.handler_class=rule.targe的实例,到了这里,也就是IndexHandler类,到此,tornado的请求流程就到了我们自己写的函数处理了啦。
    总结一下流程就是:

    tcpserver(listen) ->httpserver(request_delegate = delegate.start_request(self, conn)) -> Application -> _RoutingDelegate -> ret = yield conn.read_response(request_delegate) -> _HandlerDelegate -> requestHandler(get post xxx) 
    

    请求响应的数据怎么写回socket以及tornado中的高并发现在哪儿,下次再接着分析。

    相关文章

      网友评论

          本文标题:tornado流程第二次分析

          本文链接:https://www.haomeiwen.com/subject/bnydzftx.html