tornado简介
tornado是python的一个异步web框架,能撑起10K的连接,tornado这个库有以下部分组成:
- web框架,主要通过
RequestHandler
建立web应用。 - http客户端和服务端(
HTTPServer
andAsyncHTTPClient
). - 异步网络库,包括这些类
IOLoop
andIOStream
- 协程库 (
tornado.gen
)
Tornado Web框架和HTTP服务器一起提供了WSGI的完整堆栈,虽然可以将Tornado HTTP服务器用作其他WSGi框架(wsGicontainer)的容器,但这种组合有局限性,要充分利用Tornado,您需要将Tornado的Web框架和HTTP服务器一起使用。
异步非阻塞IO
实时Web功能要求每个用户都有一个长期的、空闲的连接。在传统的同步Web服务器中,这意味着每个用户一个线程,代价很大。
为了最小化并发连接的成本,Tornado使用了单线程事件循环。这意味着所有应用程序代码都应该以异步和非阻塞为目标,因为一次只能有一个操作处于活动状态。
异步和非阻塞这两个术语是密切相关的,通常可以互换使用,但它们并不完全相同。
- 同步异步主要是针对客户端而言
同步:客户端发出一个功能调用时,在没有得到结果之前,该调用就不返回。例:
from tornado.httpclient import HTTPClient
def synchronous_fetch(url):
http_client = HTTPClient()
response = http_client.fetch(url)
return response.body
异步:客户端发出一个功能调用,调用者不能立刻得到结果,但会立刻返回,不影响后续代码运行。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。例如:
from tornado.httpclient import AsyncHTTPClient
def asynchronous_fetch(url):
http_client = AsyncHTTPClient()
response = http_client.fetch(url)
- 阻塞非阻塞是针对服务端而言
阻塞:调用是指调用结果返回之前,当前线程会被挂起(线程进入非可执行状态,在这个状态下,cpu不会给线程分配时间片,即线程暂停运行)。函数只有在得到结果之后才会返回。
非阻塞:非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回。
协程
在tornado中,推荐以协程的方式写异步代码
async def fetch_coroutine(url):
http_client = AsyncHTTPClient()
response = await http_client.fetch(url)
return response.body
python3.5后用关键字async和await的是原生协程,为了兼容以前的版本,可使用装饰器tornado.gen.coroutine
写协程。tornado推荐使用原生协程。
原生协程和装饰器协程,有些区别:
- 原生协程通常更加快。
- 可以使用async for 和 async with语句。
- 装饰器协程返回一个future对象,原生协程返回的是awaitable 对象,不是future,在tornado中,两者大部分时候可以互换。
- 装饰器协程对concurrent.futures包有集成,而对应的原生协程是使用
IOLoop.run_in_executor
的 - 装饰器协程支持yield后面有list和dict对象,原生协程使用这个
tornado.gen.multi
代替
一些例子
协程调用的方式:
- yield或者await
- 使用IOLoop.spawn_callback
- 使用run_sync
协程调用方式yield或者await:
async def divide(x, y):
return x / y
async def good_call():
# await will unwrap the object returned by divide() and raise
# the exception.
await divide(1, 0)
- 如果想快速执行协程且不需直到结果,那么可以使用IOLoop.spawn_callback,不会等待协程完成,且没有返回值。
from tornado import gen # 引入协程库
from tornado.ioloop import IOLoop
from tornado.httpclient import AsyncHTTPClient
import asyncio
import time
async def test():
r = await asyncio.sleep(3)
print(r)
def func_normal():
start = time.time()
print('开始调用协程')
IOLoop.current().spawn_callback(test)
print('结束协程调用',time.time()-start)
func_normal()
输出结果:
开始调用协程
结束协程调用 0.003989696502685547
spawn_callback调用后,IOloop会在合适的时候调用test协程。
- 当然,如果IOloop还未启动,想要调用协程的话,可以使用run_sync方法,是阻塞的。
from tornado import gen # 引入协程库
from tornado.ioloop import IOLoop
from tornado.httpclient import AsyncHTTPClient
import asyncio
import time
async def test():
r = await asyncio.sleep(3)
print(r)
def func_normal():
start = time.time()
print('开始调用协程')
IOLoop.current().run_sync(lambda: test())
print('结束协程调用',time.time()-start)
func_normal()
输出结果:
开始调用协程
None
结束协程调用 3.0050225257873535
- 如果在IOloop中想调用一个阻塞函数,可以使用run_in_executor,这个方法返回一个future,可以使用await等待结果返回的
from tornado.ioloop import IOLoop
from tornado.httpclient import AsyncHTTPClient
import asyncio
import time
async def call_blocking():
await IOLoop.current().run_in_executor(None, time.sleep, 3)
def func_normal():
start = time.time()
print('开始调用协程')
IOLoop.current().run_sync(lambda:call_blocking())
print('结束协程调用',time.time()-start)
func_normal()
输出结果:
开始调用协程
结束协程调用 3.0072133541107178
-
multi
方法可以接受一个list或者dict,返回futures。
from tornado.gen import multi
async def parallel_fetch(url1, url2):
resp1, resp2 = await multi([http_client.fetch(url1),
http_client.fetch(url2)])
async def parallel_fetch_many(urls):
responses = await multi ([http_client.fetch(url) for url in urls])
# responses is a list of HTTPResponses in the same order
async def parallel_fetch_dict(urls):
responses = await multi({url: http_client.fetch(url)
for url in urls})
# responses is a dict {url: HTTPResponse}
用装饰器协程的话:
@gen.coroutine
def parallel_fetch_decorated(url1, url2):
resp1, resp2 = yield [http_client.fetch(url1),
http_client.fetch(url2)]
- 有时保存一个future而不是立即yield是很有用的,这时可以开始另一个操作的。
from tornado.gen import convert_yielded
async def get(self):
# convert_yielded() starts the native coroutine in the background.
# This is equivalent to asyncio.ensure_future() (both work in Tornado).
fetch_future = convert_yielded(self.fetch_next_chunk())
while True:
chunk = yield fetch_future
if chunk is None: break
self.write(chunk)
fetch_future = convert_yielded(self.fetch_next_chunk())
yield self.flush()
装饰器协程想实现类似功能,可以:
@gen.coroutine
def get(self):
fetch_future = self.fetch_next_chunk()
while True:
chunk = yield fetch_future
if chunk is None: break
self.write(chunk)
fetch_future = self.fetch_next_chunk()
yield self.flush()
- 循环
原生协程可以使用async for语法。 - 后台运行
PeriodicCallback
定时器通常不与协程一起使用。协程可以包含while true:loop并使用tornado.gen.sleep实现如下60s定时器:
async def minute_loop():
while True:
await do_something()
await gen.sleep(60)
# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)
上面实现其实是实现了60+n的定时器,n是do_something运行时间。为了精准的实现60s,可以使用下面方法:
async def minute_loop2():
while True:
nxt = gen.sleep(60) # Start the clock.
await do_something() # Run while the clock is ticking.
await nxt # Wait for the timer to run out.
tornado.queues
tornado.queues模块实现了异步生产者消费者模型
Queue.get
语句会被yield,直到队列里有元素。如果这个队列有设置最大值,直到有空间存放新的值。才会yield Queue.put
语句。队列中维持未完成的任务,开始是0,put是增加数量,task_done是减少数量。
以web爬虫举例,队列中原始url有base_url,当一个worker获取一个页面时,它解析链接并将新的链接放入队列中。然后调用 task_done
减少数量。最终,worker会获取一个其URL都已经被解析过的页面,并且队列中也没有剩余的工作。因此,work调用task_done将计数器递减为零。主协程等待join().
#!/usr/bin/env python3
import time
from datetime import timedelta
from html.parser import HTMLParser
from urllib.parse import urljoin, urldefrag
from tornado import gen, httpclient, ioloop, queues
base_url = "http://www.tornadoweb.org/en/stable/"
concurrency = 10
async def get_links_from_url(url):
"""Download the page at `url` and parse it for links.
Returned links have had the fragment after `#` removed, and have been made
absolute so, e.g. the URL 'gen.html#tornado.gen.coroutine' becomes
'http://www.tornadoweb.org/en/stable/gen.html'.
"""
response = await httpclient.AsyncHTTPClient().fetch(url)
print("fetched %s" % url)
html = response.body.decode(errors="ignore")
return [urljoin(url, remove_fragment(new_url)) for new_url in get_links(html)]
def remove_fragment(url):
pure_url, frag = urldefrag(url)
return pure_url
def get_links(html):
class URLSeeker(HTMLParser):
def __init__(self):
HTMLParser.__init__(self)
self.urls = []
def handle_starttag(self, tag, attrs):
href = dict(attrs).get("href")
if href and tag == "a":
self.urls.append(href)
url_seeker = URLSeeker()
url_seeker.feed(html)
return url_seeker.urls
async def main():
q = queues.Queue()
start = time.time()
fetching, fetched = set(), set()
async def fetch_url(current_url):
if current_url in fetching:
return
print("fetching %s" % current_url)
fetching.add(current_url)
urls = await get_links_from_url(current_url)
fetched.add(current_url)
for new_url in urls:
# Only follow links beneath the base URL
if new_url.startswith(base_url):
await q.put(new_url)
async def worker():
async for url in q:
if url is None:
return
try:
await fetch_url(url)
except Exception as e:
print("Exception: %s %s" % (e, url))
finally:
q.task_done()
await q.put(base_url)
# Start workers, then wait for the work queue to be empty.
workers = gen.multi([worker() for _ in range(concurrency)])
await q.join(timeout=timedelta(seconds=300))
assert fetching == fetched
print("Done in %d seconds, fetched %s URLs." % (time.time() - start, len(fetched)))
# Signal all the workers to exit.
for _ in range(concurrency):
await q.put(None)
await workers
if __name__ == "__main__":
io_loop = ioloop.IOLoop.current()
io_loop.run_sync(main)
- 每个put对应一个task_done,task_done调用一次,就代表完成一个任务。
- join等待所有任务完成。
下面看一个简单的生产消费者例子:
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue
q = Queue(maxsize=10)
async def consumer():
async for item in q:
if item is None:
return
try:
print('Doing work on %s' % item)
await gen.sleep(0.01)
finally:
q.task_done()
async def producer():
for item in range(15):
await q.put(item)
print('Put %s' % item)
async def main():
# Start consumer without waiting (since it never finishes).
workers = gen.multi([consumer() for _ in range(3)])
await producer() # Wait for producer to put all tasks.
await q.join() # Wait for consumer to finish all tasks.
print('Done')
for _ in range(3):
await q.put(None)
await workers
IOLoop.current().run_sync(main)
tornado web应用
import tornado.ioloop
import tornado.web
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")
def make_app():
return tornado.web.Application([
(r"/", MainHandler),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
Application对象是针对全局配置的,包括请求的路由表映射。路由表是一个list,list元素是 URLSpec
对象或者tuple,每个元素至少有路由规则和handler类,如果tuple的第三个元素是一个dict,这个dict会作为 RequestHandler.initialize
的参数,另外,URLSpec
也可能有name,用于RequestHandler.reverse_url
.
例如:
import tornado.ioloop
from tornado.web import RequestHandler, Application,url
class MainHandler(RequestHandler):
def get(self):
self.write('<a href="%s">link to story 1</a>' %
self.reverse_url("story", "1"))
class StoryHandler(RequestHandler):
def initialize(self, db):
self.db = db
def get(self, story_id):
self.write("this is story %s" % story_id)
if __name__ == "__main__":
db = 'test'
app = Application([
url(r"/", MainHandler),
url(r"/story/([0-9]+)", StoryHandler, dict(db=db), name="story")
])
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
Application能接受很多关键字参数,参考 Application.settings
RequestHandler子类
很多tornado应用都是在RequestHandler子类中完成的,可以在子类中写http方法,get,post等,在一个handler中,调用方法 RequestHandler.render
or RequestHandler.write
产生回应response。render()是基于模板的,write不是,它接受参数类型可以是strings, bytes, and dict.
处理请求
当前请求可以使用这个代表 self.request,可以在HTTPServerRequest
中看完整的属性。可以通过get_query_arguments
and get_body_arguments
获取get和post的参数。
文件上传的数据在 self.request.files。
为了统一格式处理请求,我们可以使用prepare。在处理get,post等之前调用。
def prepare(self):
if self.request.headers.get("Content-Type", "").startswith("application/json"):
self.json_args = json.loads(self.request.body)
else:
self.json_args = None
重载RequestHandler的方法
在每个请求来到时:
- 每个request会新建一个RequestHandler对象
-
initialize()
方法的参数来自Application
,initialize方法通常只将接受参数赋给成员属性,而不输出东西,或者调用方法。 -
prepare()
的调用,可能会产生输出,另外,如果这里调用了finish或者redirect等,这个词请求就会结束了。 - 当请求结束时,会调用
on_finish()
所有可以重载的方法都可以在RequestHandler
文档中看到,下面是一些常见的:
-
write_error
用于错误页面的输出。 -
get_current_user
参考User authentication. -
set_default_headers
可用于自定义响应headers
错误处理
如果handler抛出异常,tornado将会调用RequestHandler.write_error
生成一个错误页面。 tornado.web.HTTPError
能用于生成指定错误码。
如果想自定义错误页面,重写RequestHandler.write_error
这个方法,对于404错误,可以使用 default_handler_class
Application setting
,也可使用self.set_status(404)进行设置。
重定向
有两种方式重定向:
app = tornado.web.Application([
url(r"/app", tornado.web.RedirectHandler,
dict(url="http://itunes.apple.com/my-app-id")),
])
#或者
app = tornado.web.Application([
url(r"/photos/(.*)", MyPhotoHandler),
url(r"/pictures/(.*)", tornado.web.RedirectHandler,
dict(url=r"/photos/{0}")),
])
- RedirectHandler中调用redirect方法
异步handle
例如下面使用协程方式:
class MainHandler(tornado.web.RequestHandler):
async def get(self):
http = tornado.httpclient.AsyncHTTPClient()
response = await http.fetch("http://friendfeed-api.com/v2/feed/bret")
json = tornado.escape.json_decode(response.body)
self.write("Fetched " + str(len(json["entries"])) + " entries "
"from the FriendFeed API")
更多的例子可以参考 chat example application
参考
tornado官方文档
网友评论