先上代码例子:
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
from tornado.options import define, options
define("port", default=8000, help="run on the given port", type=int)
class IndexHandler(tornado.web.RequestHandler):
def get(self):
greeting = self.get_argument('greeting', 'Hello')
self.write(greeting + ', friendly user!')
if __name__ == "__main__":
tornado.options.parse_command_line()
app = tornado.web.Application(handlers=[(r"/", IndexHandler)])
http_server = tornado.httpserver.HTTPServer(app)
http_server.listen(options.port)
tornado.ioloop.IOLoop.instance().start()
关于tornado,它既是web服务器(看成nginx服务),又是web框架(看成一个wsgi程序),不过官方推荐两个一起用才能发挥最大功效。所以不管是tornado服务器,加上其他的wsgi程序(比如django), 还是nginx + WSGI + tornado框架都不能很好的发挥tornado的能力。
基于上面的代码,程序都是按照顺序执行,那么入口一般都是最后一个,所以这里也就是start(),先了解start前的准备工作,start之前最近的准备工作listen()函数,那先看listen做了什么?
def listen(self, port, address=""):
"""Starts accepting connections on the given port.
This method may be called more than once to listen on multiple ports.
`listen` takes effect immediately; it is not necessary to call
`TCPServer.start` afterwards. It is, however, necessary to start
the `.IOLoop`.
"""
sockets = bind_sockets(port, address=address)
self.add_sockets(sockets)
bind_sockets函数作用其实就是socket编程过程中的前2步组合(创建socket,绑定socket到对应的端口),然后再看self.add_sockets(sockets)
def add_sockets(self, sockets):
"""Makes this server start accepting connections on the given sockets.
The ``sockets`` parameter is a list of socket objects such as
those returned by `~tornado.netutil.bind_sockets`.
`add_sockets` is typically used in combination with that
method and `tornado.process.fork_processes` to provide greater
control over the initialization of a multi-process server.
"""
for sock in sockets:
self._sockets[sock.fileno()] = sock
self._handlers[sock.fileno()] = add_accept_handler(
sock, self._handle_connection)
这里重点关注self._handlers[sock.fileno()] = add_accept_handler(sock, self._handle_connection),其实他的作用就是把这个socket加入ioloop的循环队列,当ioloop监听到这个socket有新的连接请求的时候(READ事件)就调用回调函数self._handle_connection去处理。add_accept_handler函数的源码如下:
def add_accept_handler(sock, callback):
"""Adds an `.IOLoop` event handler to accept new connections on ``sock``.
When a connection is accepted, ``callback(connection, address)`` will
be run (``connection`` is a socket object, and ``address`` is the
address of the other end of the connection). Note that this signature
is different from the ``callback(fd, events)`` signature used for
`.IOLoop` handlers.
A callable is returned which, when called, will remove the `.IOLoop`
event handler and stop processing further incoming connections.
.. versionchanged:: 5.0
The ``io_loop`` argument (deprecated since version 4.1) has been removed.
.. versionchanged:: 5.0
A callable is returned (``None`` was returned before).
"""
io_loop = IOLoop.current()
removed = [False]
def accept_handler(fd, events):
# More connections may come in while we're handling callbacks;
# to prevent starvation of other tasks we must limit the number
# of connections we accept at a time. Ideally we would accept
# up to the number of connections that were waiting when we
# entered this method, but this information is not available
# (and rearranging this method to call accept() as many times
# as possible before running any callbacks would have adverse
# effects on load balancing in multiprocess configurations).
# Instead, we use the (default) listen backlog as a rough
# heuristic for the number of connections we can reasonably
# accept at once.
for i in xrange(_DEFAULT_BACKLOG):
if removed[0]:
# The socket was probably closed
return
try:
connection, address = sock.accept()
except socket.error as e:
# _ERRNO_WOULDBLOCK indicate we have accepted every
# connection that is available.
if errno_from_exception(e) in _ERRNO_WOULDBLOCK:
return
# ECONNABORTED indicates that there was a connection
# but it was closed while still in the accept queue.
# (observed on FreeBSD).
if errno_from_exception(e) == errno.ECONNABORTED:
continue
raise
set_close_exec(connection.fileno())
callback(connection, address)
def remove_handler():
io_loop.remove_handler(sock)
removed[0] = True
io_loop.add_handler(sock, accept_handler, IOLoop.READ)
return remove_handler
重点先关注这一句:io_loop.add_handler(sock, accept_handler, IOLoop.READ),也就是将监听了端口的socket加到ioloop的循环队列中,监听事件为IOLoop.READ,并有可读事件发生(比如http请求)时执行回调accept_handler,而accept_handler函数的作用就是处理socket.accept接收到的socket,然后用回调函数callback处理socket,那么这里的callback函数是啥呢?就是add_accept_handler函数的参数,那再回到上面分析的add_sockets函数中self._handlers[sock.fileno()] = add_accept_handler(sock, self._handle_connection),所以callback其实就是self._handle_connection啦,也就是该端口的所有的socket请求都是由self._handle_connection去处理的。简化其处理流程,其实就是:
tornado/tcpserver.py中的TcpServer(代码中的HttpServer继承了TcpServer)类的listen函数将监听端口的socket加到了ioloop循环中,监听事件为可读,当有读的事件发生时,执行self._handle_connection回调。
接下来看回调函数self._handle_connection。
def _handle_connection(self, connection, address):
if self.ssl_options is not None:
assert ssl, "Python 2.6+ and OpenSSL required for SSL"
try:
connection = ssl_wrap_socket(connection,
self.ssl_options,
server_side=True,
do_handshake_on_connect=False)
except ssl.SSLError as err:
if err.args[0] == ssl.SSL_ERROR_EOF:
return connection.close()
else:
raise
except socket.error as err:
# If the connection is closed immediately after it is created
# (as in a port scan), we can get one of several errors.
# wrap_socket makes an internal call to getpeername,
# which may return either EINVAL (Mac OS X) or ENOTCONN
# (Linux). If it returns ENOTCONN, this error is
# silently swallowed by the ssl module, so we need to
# catch another error later on (AttributeError in
# SSLIOStream._do_ssl_handshake).
# To test this behavior, try nmap with the -sT flag.
# https://github.com/tornadoweb/tornado/pull/750
if errno_from_exception(err) in (errno.ECONNABORTED, errno.EINVAL):
return connection.close()
else:
raise
try:
if self.ssl_options is not None:
stream = SSLIOStream(connection,
max_buffer_size=self.max_buffer_size,
read_chunk_size=self.read_chunk_size)
else:
stream = IOStream(connection,
max_buffer_size=self.max_buffer_size,
read_chunk_size=self.read_chunk_size)
future = self.handle_stream(stream, address)
if future is not None:
IOLoop.current().add_future(gen.convert_yielded(future),
lambda f: f.result())
except Exception:
app_log.error("Error in connection callback", exc_info=True)
其实就是创建读取socket数据的stream,然后调用self.handle_stream函数去处理stream。TcpServer类中的self.handle_stream是一个虚函数,需要继承它的类自己去实现,其代码如下:
def handle_stream(self, stream, address):
"""Override to handle a new `.IOStream` from an incoming connection.
This method may be a coroutine; if so any exceptions it raises
asynchronously will be logged. Accepting of incoming connections
will not be blocked by this coroutine.
If this `TCPServer` is configured for SSL, ``handle_stream``
may be called before the SSL handshake has completed. Use
`.SSLIOStream.wait_for_handshake` if you need to verify the client's
certificate or use NPN/ALPN.
.. versionchanged:: 4.2
Added the option for this method to be a coroutine.
"""
raise NotImplementedError()
这样设计也更加灵活,不同的上层协议有自己不同的处理,比如http或者ftp。开头的代码例子中用的是HttpServer,所以这里关注HttpServer类的handle_stream函数
def handle_stream(self, stream, address):
context = _HTTPRequestContext(stream, address,
self.protocol,
self.trusted_downstream)
conn = HTTP1ServerConnection(
stream, self.conn_params, context)
self._connections.add(conn)
conn.start_serving(self)
这里的处理其实也就是,根据获取到的数据新建上下文信息,创建一个新的http连接服务,然后将连接记录到httpserver对象中,以便在关闭服务或者连接的时候找到这个连接对象。后续的操作都是对应这个HTTP1ServerConnection对象,一个http请求会有一个这样的HTTP1ServerConnection对象。接下来看看conn.start_serving(self)都做了什么。
def start_serving(self, delegate):
"""Starts serving requests on this connection.
:arg delegate: a `.HTTPServerConnectionDelegate`
"""
assert isinstance(delegate, httputil.HTTPServerConnectionDelegate)
self._serving_future = self._server_request_loop(delegate)
# Register the future on the IOLoop so its errors get logged.
self.stream.io_loop.add_future(self._serving_future,
lambda f: f.result())
简单来说就是根据HTTP1ServerConnection的信息,生成一个HTTP1Connection,并在循环中处理这个HTTP1Connection,这个循环用一个新的协程去处理,然后不断的接收stream的数据,然后处理stream的数据。具体见self._server_request_loop代码
@gen.coroutine
def _server_request_loop(self, delegate):
try:
while True:
conn = HTTP1Connection(self.stream, False,
self.params, self.context)
request_delegate = delegate.start_request(self, conn)
try:
ret = yield conn.read_response(request_delegate)
except (iostream.StreamClosedError,
iostream.UnsatisfiableReadError):
return
except _QuietException:
# This exception was already logged.
conn.close()
return
except Exception:
gen_log.error("Uncaught exception", exc_info=True)
conn.close()
return
if not ret:
return
yield gen.moment
finally:
delegate.on_close(self)
关键点在于delegate.start_request函数。这个delegate是HTTPServer类中handle_stream函数中的conn.start_serving(self)中的self,所以self也就是HTTPServer类,所以看HTTPServer中的start_request就好了。
def start_request(self, server_conn, request_conn):
if isinstance(self.request_callback, httputil.HTTPServerConnectionDelegate):
delegate = self.request_callback.start_request(server_conn, request_conn)
else:
delegate = _CallableAdapter(self.request_callback, request_conn)
if self.xheaders:
delegate = _ProxyAdapter(delegate, request_conn)
return delegate
start_request函数中的self.request_callback就是例子中Application,这个在HTTPServer的init函数中可以看到。Application是Application(ReversibleRouter),继承自Router(httputil.HTTPServerConnectionDelegate),Router继承自httputil.HTTPServerConnectionDelegate,所以这里会走delegate = self.request_callback.start_request(server_conn, request_conn)这里,所以也就是走Application类的start_request函数。tornado/web.py中的Application类是没有start_request函数的,所以这里去它的父类中寻找,在Router类中可以找到
class Router(httputil.HTTPServerConnectionDelegate):
"""Abstract router interface."""
def find_handler(self, request, **kwargs):
# type: (httputil.HTTPServerRequest, typing.Any)->httputil.HTTPMessageDelegate
"""Must be implemented to return an appropriate instance of `~.httputil.HTTPMessageDelegate`
that can serve the request.
Routing implementations may pass additional kwargs to extend the routing logic.
:arg httputil.HTTPServerRequest request: current HTTP request.
:arg kwargs: additional keyword arguments passed by routing implementation.
:returns: an instance of `~.httputil.HTTPMessageDelegate` that will be used to
process the request.
"""
raise NotImplementedError()
def start_request(self, server_conn, request_conn):
return _RoutingDelegate(self, server_conn, request_conn)
那么到这里,再回看_server_request_loop函数,request_delegate = delegate.start_request(self, conn)调用链已经返回,request_delegate就是_RoutingDelegate类的实例。
@gen.coroutine
def _server_request_loop(self, delegate):
try:
while True:
conn = HTTP1Connection(self.stream, False,
self.params, self.context)
request_delegate = delegate.start_request(self, conn)
try:
ret = yield conn.read_response(request_delegate)
except (iostream.StreamClosedError,
iostream.UnsatisfiableReadError):
return
except _QuietException:
# This exception was already logged.
conn.close()
return
except Exception:
gen_log.error("Uncaught exception", exc_info=True)
conn.close()
return
if not ret:
return
yield gen.moment
finally:
delegate.on_close(self)
函数中的request_delegate = delegate.start_request(self, conn),就表明了request_delegate 是一个_RoutingDelegate对象,接下来分析ret = yield conn.read_response(request_delegate),在这之前,先看一下_RoutingDelegate类的定义。
_RoutingDelegate类的定义如下:
class _RoutingDelegate(httputil.HTTPMessageDelegate):
def __init__(self, router, server_conn, request_conn):
self.server_conn = server_conn
self.request_conn = request_conn
self.delegate = None
self.router = router # type: Router
def headers_received(self, start_line, headers):
request = httputil.HTTPServerRequest(
connection=self.request_conn,
server_connection=self.server_conn,
start_line=start_line, headers=headers)
self.delegate = self.router.find_handler(request) #这里的delegate是_HandlerDelegate
if self.delegate is None:
app_log.debug("Delegate for %s %s request not found",
start_line.method, start_line.path)
self.delegate = _DefaultMessageDelegate(self.request_conn)
return self.delegate.headers_received(start_line, headers)
def data_received(self, chunk):
return self.delegate.data_received(chunk)
def finish(self):
self.delegate.finish()
def on_connection_close(self):
self.delegate.on_connection_close()
在了解了request_delegate就是_RoutingDelegate实例,所以接下来看看conn.read_response对request_delegate做了什么?
def read_response(self, delegate):
"""Read a single HTTP response.
Typical client-mode usage is to write a request using `write_headers`,
`write`, and `finish`, and then call ``read_response``.
:arg delegate: a `.HTTPMessageDelegate`
Returns a `.Future` that resolves to None after the full response has
been read.
"""
if self.params.decompress:
delegate = _GzipMessageDelegate(delegate, self.params.chunk_size)
return self._read_message(delegate)
_GzipMessageDelegate类其实就是对delegate,也就是_RoutingDelegate类的封装。不管怎么样,看_read_message(delegate)做了什么处理就好了。
@gen.coroutine
def _read_message(self, delegate):
need_delegate_close = False
try:
header_future = self.stream.read_until_regex(
b"\r?\n\r?\n",
max_bytes=self.params.max_header_size)
if self.params.header_timeout is None:
header_data = yield header_future
else:
try:
header_data = yield gen.with_timeout(
self.stream.io_loop.time() + self.params.header_timeout,
header_future,
quiet_exceptions=iostream.StreamClosedError)
except gen.TimeoutError:
self.close()
raise gen.Return(False)
start_line, headers = self._parse_headers(header_data)
if self.is_client:
start_line = httputil.parse_response_start_line(start_line)
self._response_start_line = start_line
else:
start_line = httputil.parse_request_start_line(start_line)
self._request_start_line = start_line
self._request_headers = headers
self._disconnect_on_finish = not self._can_keep_alive(
start_line, headers)
need_delegate_close = True
with _ExceptionLoggingContext(app_log):
header_future = delegate.headers_received(start_line, headers)
if header_future is not None:
yield header_future
if self.stream is None:
# We've been detached.
need_delegate_close = False
raise gen.Return(False)
skip_body = False
if self.is_client:
if (self._request_start_line is not None and
self._request_start_line.method == 'HEAD'):
skip_body = True
code = start_line.code
if code == 304:
# 304 responses may include the content-length header
# but do not actually have a body.
# http://tools.ietf.org/html/rfc7230#section-3.3
skip_body = True
if code >= 100 and code < 200:
# 1xx responses should never indicate the presence of
# a body.
if ('Content-Length' in headers or
'Transfer-Encoding' in headers):
raise httputil.HTTPInputError(
"Response code %d cannot have body" % code)
# TODO: client delegates will get headers_received twice
# in the case of a 100-continue. Document or change?
yield self._read_message(delegate)
else:
if (headers.get("Expect") == "100-continue" and
not self._write_finished):
self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n")
if not skip_body:
body_future = self._read_body(
start_line.code if self.is_client else 0, headers, delegate)
if body_future is not None:
if self._body_timeout is None:
yield body_future
else:
try:
yield gen.with_timeout(
self.stream.io_loop.time() + self._body_timeout,
body_future,
quiet_exceptions=iostream.StreamClosedError)
except gen.TimeoutError:
gen_log.info("Timeout reading body from %s",
self.context)
self.stream.close()
raise gen.Return(False)
self._read_finished = True
if not self._write_finished or self.is_client:
need_delegate_close = False
with _ExceptionLoggingContext(app_log):
delegate.finish()
# If we're waiting for the application to produce an asynchronous
# response, and we're not detached, register a close callback
# on the stream (we didn't need one while we were reading)
if (not self._finish_future.done() and
self.stream is not None and
not self.stream.closed()):
self.stream.set_close_callback(self._on_connection_close)
yield self._finish_future
if self.is_client and self._disconnect_on_finish:
self.close()
if self.stream is None:
raise gen.Return(False)
except httputil.HTTPInputError as e:
gen_log.info("Malformed HTTP message from %s: %s",
self.context, e)
if not self.is_client:
yield self.stream.write(b'HTTP/1.1 400 Bad Request\r\n\r\n')
self.close()
raise gen.Return(False)
finally:
if need_delegate_close:
with _ExceptionLoggingContext(app_log):
delegate.on_connection_close()
header_future = None
self._clear_callbacks()
raise gen.Return(True)
这个函数内容很多,在这里面我们可以找到_read_message(delegate)中对delegate.headers_received和delegate.finish的调用,前面提到这里的delegate是_RoutingDelegate,再次回顾一下_RoutingDelegate的源码:
class _RoutingDelegate(httputil.HTTPMessageDelegate):
def __init__(self, router, server_conn, request_conn):
self.server_conn = server_conn
self.request_conn = request_conn
self.delegate = None
self.router = router # type: Router
def headers_received(self, start_line, headers):
request = httputil.HTTPServerRequest(
connection=self.request_conn,
server_connection=self.server_conn,
start_line=start_line, headers=headers)
self.delegate = self.router.find_handler(request) #这里的delegate是_HandlerDelegate
if self.delegate is None:
app_log.debug("Delegate for %s %s request not found",
start_line.method, start_line.path)
self.delegate = _DefaultMessageDelegate(self.request_conn)
return self.delegate.headers_received(start_line, headers)
def data_received(self, chunk):
return self.delegate.data_received(chunk)
def finish(self):
self.delegate.finish()
def on_connection_close(self):
self.delegate.on_connection_close()
_RoutingDelegate中的headers_received函数会将收到的请求信息封装成一个request对象,并且在headers_received函数在最后会调用通过 self.router.find_handler(request)返回的对象self.delegate,进而调用self.delegate的headers_received函数和data_received函数。那么这里的self.router.find_hander函数返回的self.delegate到底是什么呢?接着往下面看。
然后关注self.delegate = self.router.find_handler(request),这里的self.router其实就是Application,所以看看Application的find_handler函数做了什么?
def find_handler(self, request, **kwargs):
route = self.default_router.find_handler(request)
if route is not None:
return route
if self.settings.get('default_handler_class'):
return self.get_handler_delegate(
request,
self.settings['default_handler_class'],
self.settings.get('default_handler_args', {}))
return self.get_handler_delegate(
request, ErrorHandler, {'status_code': 404})
其实就是从Application的self.default_router中找出处理request请求的handler,这里的self.default_router在Application的init函数中有实现:
self.default_router = _ApplicationRouter(self, [
Rule(AnyMatches(), self.wildcard_router)
])
但是从_ApplicationRouter类中并没有找到find_handler函数,所以去它的父类中找,在class RuleRouter(Router):中可以找到find_handler函数的定义:
def find_handler(self, request, **kwargs):
for rule in self.rules:
target_params = rule.matcher.match(request)
if target_params is not None:
if rule.target_kwargs:
target_params['target_kwargs'] = rule.target_kwargs
delegate = self.get_target_delegate(
rule.target, request, **target_params)
if delegate is not None:
return delegate
return None
首先这里的self.rules是什么?self.rules是经过self.add_rules(rules)处理的,self.rules = [ self.wildcard_router],所以 rule.target = self.wildcard_router。此时的rule.matcher是AnyMachers,所以target_params是空{}不等于None,所以执行delegate = self.get_target_delegate,这时候的self.get_target_delegate函数是class _ApplicationRouter(ReversibleRuleRouter):类中的get_target_delegate函数,而不是父类RuleRouter中的(继承的时候,子类同名函数覆盖父类的),所以看class _ApplicationRouter(ReversibleRuleRouter)类中的get_target_delegate函数。
def get_target_delegate(self, target, request, **target_params):
if isclass(target) and issubclass(target, RequestHandler):
return self.application.get_handler_delegate(request, target, **target_params)
return super(_ApplicationRouter, self).get_target_delegate(target, request, **target_params)
这里的target是self.wildcard_router,是一个_ApplicationRouter类的实例,所以走了super(_ApplicationRouter, self).get_target_delegate(target, request, **target_params),也就是调用RuleRouter的get_target_delegate
def get_target_delegate(self, target, request, **target_params):
"""Returns an instance of `~.httputil.HTTPMessageDelegate` for a
Rule's target. This method is called by `~.find_handler` and can be
extended to provide additional target types.
:arg target: a Rule's target.
:arg httputil.HTTPServerRequest request: current request.
:arg target_params: additional parameters that can be useful
for `~.httputil.HTTPMessageDelegate` creation.
"""
if isinstance(target, Router):
return target.find_handler(request, **target_params)
elif isinstance(target, httputil.HTTPServerConnectionDelegate):
return target.start_request(request.server_connection, request.connection)
elif callable(target):
return _CallableAdapter(
partial(target, **target_params), request.connection
)
return None
这里的target是self.wildcard_router = _ApplicationRouter(self, handlers),也还是_ApplicationRouter,他是Router的子类,所以直接走了第一个target.find_handler(request, **target_params),这里的target.find_handler还是上面的_ApplicationRouter的find_handler。但是self.wildcard_router = _ApplicationRouter(self, handlers)不一样的是,他的参数rules不再是Rule对象,而是一个用户自己定义的RequestHandler列表(就是用户自定义的路由),所以在_ApplicationRouter的父类中class RuleRouter(Router):中,用户自定义的路由handlers经过self.add_rules(rules)处理的得到self.rules(上面的self.rules = [Rule(AnyMatches(), self.wildcard_router)]也是一样)。
class RuleRouter(Router):
"""Rule-based router implementation."""
def __init__(self, rules=None):
"""Constructs a router from an ordered list of rules::
RuleRouter([
Rule(PathMatches("/handler"), Target),
# ... more rules
])
You can also omit explicit `Rule` constructor and use tuples of arguments::
RuleRouter([
(PathMatches("/handler"), Target),
])
`PathMatches` is a default matcher, so the example above can be simplified::
RuleRouter([
("/handler", Target),
])
In the examples above, ``Target`` can be a nested `Router` instance, an instance of
`~.httputil.HTTPServerConnectionDelegate` or an old-style callable,
accepting a request argument.
:arg rules: a list of `Rule` instances or tuples of `Rule`
constructor arguments.
"""
self.rules = [] # type: typing.List[Rule]
if rules:
self.add_rules(rules)
def add_rules(self, rules):
"""Appends new rules to the router.
:arg rules: a list of Rule instances (or tuples of arguments, which are
passed to Rule constructor).
"""
for rule in rules:
if isinstance(rule, (tuple, list)):
assert len(rule) in (2, 3, 4)
if isinstance(rule[0], basestring_type):
rule = Rule(PathMatches(rule[0]), *rule[1:])
else:
rule = Rule(*rule)
self.rules.append(self.process_rule(rule))
在add_rules函数里面,这时候的rules就是我们定义的一个路由handlers,而for循环的rule是一个元组,即开头给的代码样例中的app = tornado.web.Application(handlers=[(r"/", IndexHandler)])中的(r"/", IndexHandler),rule[0]是一个路径字符串,所以最后的执行rule = Rule(PathMatches(rule[0]), *rule[1:]),这时候rule.targe就成了*rule[1:],也就是自己写的IndexHandler这个基于RequestHandler的类。这时候再回到_ApplicationRouter的find_handler函数,这一次的rule.matcher是PatchMatches,所以看看PatchMatches的match函数。
class PathMatches(Matcher):
"""Matches requests with paths specified by ``path_pattern`` regex."""
def __init__(self, path_pattern):
if isinstance(path_pattern, basestring_type):
if not path_pattern.endswith('$'):
path_pattern += '$'
self.regex = re.compile(path_pattern)
else:
self.regex = path_pattern
assert len(self.regex.groupindex) in (0, self.regex.groups), \
("groups in url regexes must either be all named or all "
"positional: %r" % self.regex.pattern)
self._path, self._group_count = self._find_groups()
def match(self, request):
match = self.regex.match(request.path)
if match is None:
return None
if not self.regex.groups:
return {}
path_args, path_kwargs = [], {}
# Pass matched groups to the handler. Since
# match.groups() includes both named and
# unnamed groups, we want to use either groups
# or groupdict but not both.
if self.regex.groupindex:
path_kwargs = dict(
(str(k), _unquote_or_none(v))
for (k, v) in match.groupdict().items())
else:
path_args = [_unquote_or_none(s) for s in match.groups()]
return dict(path_args=path_args, path_kwargs=path_kwargs)
总结来说,就是根据patch制定的正则表达式,然后从request里面的url去匹配,没有匹配上返回None,find_handler函数执行下一个循环继续找,直到找到或者找完。这里假如找到,那么此时的targe就是RequestHandler的子类Indexhandler,所以再从_ApplicationRouter的get_target_delegate类中就知道了这次返回的target是self.application.get_handler_delegate(request, target, **target_params)的返回值。
def get_target_delegate(self, target, request, **target_params):
if isclass(target) and issubclass(target, RequestHandler):
return self.application.get_handler_delegate(request, target, **target_params)
return super(_ApplicationRouter, self).get_target_delegate(target, request, **target_params)
self.application.get_handler_delegate调用的就是Application的get_handler_delegate,而Application的get_handler_delegate函数定义如下:
def get_handler_delegate(self, request, target_class, target_kwargs=None,
path_args=None, path_kwargs=None):
"""Returns `~.httputil.HTTPMessageDelegate` that can serve a request
for application and `RequestHandler` subclass.
:arg httputil.HTTPServerRequest request: current HTTP request.
:arg RequestHandler target_class: a `RequestHandler` class.
:arg dict target_kwargs: keyword arguments for ``target_class`` constructor.
:arg list path_args: positional arguments for ``target_class`` HTTP method that
will be executed while handling a request (``get``, ``post`` or any other).
:arg dict path_kwargs: keyword arguments for ``target_class`` HTTP method.
"""
return _HandlerDelegate(
self, request, target_class, target_kwargs, path_args, path_kwargs)
它返回的是_HandlerDelegate类。也就是说self.router.find_handler(request)返回的self.delegate 就是_HandlerDelegate。再次回顾一下_RoutingDelegate的源码:
class _RoutingDelegate(httputil.HTTPMessageDelegate):
def __init__(self, router, server_conn, request_conn):
self.server_conn = server_conn
self.request_conn = request_conn
self.delegate = None
self.router = router # type: Router
def headers_received(self, start_line, headers):
request = httputil.HTTPServerRequest(
connection=self.request_conn,
server_connection=self.server_conn,
start_line=start_line, headers=headers)
self.delegate = self.router.find_handler(request) #这里的delegate是_HandlerDelegate
if self.delegate is None:
app_log.debug("Delegate for %s %s request not found",
start_line.method, start_line.path)
self.delegate = _DefaultMessageDelegate(self.request_conn)
return self.delegate.headers_received(start_line, headers)
def data_received(self, chunk):
return self.delegate.data_received(chunk)
def finish(self):
self.delegate.finish()
def on_connection_close(self):
self.delegate.on_connection_close()
得到_read_message(delegate)函数内部的调用链关系是:_read_message调用了_RoutingDelegate的headers_received和delegate.finish,也就调用了_HandlerDelegate的headers_received和delegate.finish。
先直接看_HandlerDelegate类的定义,重点关注handler_class,headers_received和data_received函数:
class _HandlerDelegate(httputil.HTTPMessageDelegate):
def __init__(self, application, request, handler_class, handler_kwargs,
path_args, path_kwargs):
self.application = application
self.connection = request.connection
self.request = request
self.handler_class = handler_class
self.handler_kwargs = handler_kwargs or {}
self.path_args = path_args or []
self.path_kwargs = path_kwargs or {}
self.chunks = []
self.stream_request_body = _has_stream_request_body(self.handler_class)
def headers_received(self, start_line, headers):
if self.stream_request_body:
self.request.body = Future()
return self.execute()
def data_received(self, data):
if self.stream_request_body:
return self.handler.data_received(data)
else:
self.chunks.append(data)
def finish(self):
if self.stream_request_body:
future_set_result_unless_cancelled(self.request.body, None)
else:
self.request.body = b''.join(self.chunks)
self.request._parse_body()
self.execute()
这里关注delegate.excute也就是_HandlerDelegate类的excute函数,他是通往调用tornado框架中用户自定义RequestHandler的调用入口了。
def execute(self):
# If template cache is disabled (usually in the debug mode),
# re-compile templates and reload static files on every
# request so you don't need to restart to see changes
if not self.application.settings.get("compiled_template_cache", True):
with RequestHandler._template_loader_lock:
for loader in RequestHandler._template_loaders.values():
loader.reset()
if not self.application.settings.get('static_hash_cache', True):
StaticFileHandler.reset()
self.handler = self.handler_class(self.application, self.request,
**self.handler_kwargs)
transforms = [t(self.request) for t in self.application.transforms]
if self.stream_request_body:
self.handler._prepared_future = Future()
# Note that if an exception escapes handler._execute it will be
# trapped in the Future it returns (which we are ignoring here,
# leaving it to be logged when the Future is GC'd).
# However, that shouldn't happen because _execute has a blanket
# except handler, and we cannot easily access the IOLoop here to
# call add_future (because of the requirement to remain compatible
# with WSGI)
self.handler._execute(transforms, *self.path_args,
**self.path_kwargs)
# If we are streaming the request body, then execute() is finished
# when the handler has prepared to receive the body. If not,
# it doesn't matter when execute() finishes (so we return None)
return self.handler._prepared_future
这里重点关注self.handler = self.handler_class(self.application, self.request,**self.handler_kwargs)和self.handler._execute函数。刚刚上面分析了_ApplicationRouter类的find_handler函数如下:
def find_handler(self, request, **kwargs):
for rule in self.rules:
target_params = rule.matcher.match(request)
if target_params is not None:
if rule.target_kwargs:
target_params['target_kwargs'] = rule.target_kwargs
delegate = self.get_target_delegate(
rule.target, request, **target_params)
if delegate is not None:
return delegate
return None
因为delegate = self.get_target_delegate,所以delegate为_HandlerDelegate的实例,他的初始化参数handler_class=rule.targe,也就是根据路由Router获取到的自己定义的IndexHandler类。而self.handler._execute的定义如下:
@gen.coroutine
def _execute(self, transforms, *args, **kwargs):
"""Executes this request with the given output transforms."""
self._transforms = transforms
try:
if self.request.method not in self.SUPPORTED_METHODS:
raise HTTPError(405)
self.path_args = [self.decode_argument(arg) for arg in args]
self.path_kwargs = dict((k, self.decode_argument(v, name=k))
for (k, v) in kwargs.items())
# If XSRF cookies are turned on, reject form submissions without
# the proper cookie
if self.request.method not in ("GET", "HEAD", "OPTIONS") and \
self.application.settings.get("xsrf_cookies"):
self.check_xsrf_cookie()
result = self.prepare()
if result is not None:
result = yield result
if self._prepared_future is not None:
# Tell the Application we've finished with prepare()
# and are ready for the body to arrive.
future_set_result_unless_cancelled(self._prepared_future, None)
if self._finished:
return
if _has_stream_request_body(self.__class__):
# In streaming mode request.body is a Future that signals
# the body has been completely received. The Future has no
# result; the data has been passed to self.data_received
# instead.
try:
yield self.request.body
except iostream.StreamClosedError:
return
method = getattr(self, self.request.method.lower())
result = method(*self.path_args, **self.path_kwargs)
if result is not None:
result = yield result
if self._auto_finish and not self._finished:
self.finish()
except Exception as e:
try:
self._handle_request_exception(e)
except Exception:
app_log.error("Exception in exception handler", exc_info=True)
finally:
# Unset result to avoid circular references
result = None
if (self._prepared_future is not None and
not self._prepared_future.done()):
# In case we failed before setting _prepared_future, do it
# now (to unblock the HTTP server). Note that this is not
# in a finally block to avoid GC issues prior to Python 3.4.
self._prepared_future.set_result(None)
重点关注
method = getattr(self, self.request.method.lower())
result = method(*self.path_args, **self.path_kwargs)
getattr(self, self.request.method.lower())中的self就是self.handler_class=rule.targe的实例,到了这里,也就是IndexHandler类,到此,tornado的请求流程就到了我们自己写的函数处理了啦。
总结一下流程就是:
tcpserver(listen) ->httpserver(request_delegate = delegate.start_request(self, conn)) -> Application -> _RoutingDelegate -> ret = yield conn.read_response(request_delegate) -> _HandlerDelegate -> requestHandler(get post xxx)
请求响应的数据怎么写回socket以及tornado中的高并发现在哪儿,下次再接着分析。
网友评论