美文网首页
tornado 系列讲解之一 (整体介绍 上)

tornado 系列讲解之一 (整体介绍 上)

作者: 落羽归尘 | 来源:发表于2019-09-20 21:29 被阅读0次

    tornado简介

    tornado是python的一个异步web框架,能撑起10K的连接,tornado这个库有以下部分组成:

    • web框架,主要通过RequestHandler建立web应用。
    • http客户端和服务端(HTTPServer and AsyncHTTPClient).
    • 异步网络库,包括这些类IOLoop and IOStream
    • 协程库 (tornado.gen)
      Tornado Web框架和HTTP服务器一起提供了WSGI的完整堆栈,虽然可以将Tornado HTTP服务器用作其他WSGi框架(wsGicontainer)的容器,但这种组合有局限性,要充分利用Tornado,您需要将Tornado的Web框架和HTTP服务器一起使用。

    异步非阻塞IO

    实时Web功能要求每个用户都有一个长期的、空闲的连接。在传统的同步Web服务器中,这意味着每个用户一个线程,代价很大。
    为了最小化并发连接的成本,Tornado使用了单线程事件循环。这意味着所有应用程序代码都应该以异步和非阻塞为目标,因为一次只能有一个操作处于活动状态。
    异步和非阻塞这两个术语是密切相关的,通常可以互换使用,但它们并不完全相同。

    • 同步异步主要是针对客户端而言
      同步:客户端发出一个功能调用时,在没有得到结果之前,该调用就不返回。例:
    from tornado.httpclient import HTTPClient
    
    def synchronous_fetch(url):
        http_client = HTTPClient()
        response = http_client.fetch(url)
        return response.body
    

    异步:客户端发出一个功能调用,调用者不能立刻得到结果,但会立刻返回,不影响后续代码运行。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。例如:

    from tornado.httpclient import AsyncHTTPClient
    
    def asynchronous_fetch(url):
        http_client = AsyncHTTPClient()
        response = http_client.fetch(url)
    
    • 阻塞非阻塞是针对服务端而言
      阻塞:调用是指调用结果返回之前,当前线程会被挂起(线程进入非可执行状态,在这个状态下,cpu不会给线程分配时间片,即线程暂停运行)。函数只有在得到结果之后才会返回。
      非阻塞:非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回。

    协程

    在tornado中,推荐以协程的方式写异步代码

    async def fetch_coroutine(url):
        http_client = AsyncHTTPClient()
        response = await http_client.fetch(url)
        return response.body
    

    python3.5后用关键字async和await的是原生协程,为了兼容以前的版本,可使用装饰器tornado.gen.coroutine
    写协程。tornado推荐使用原生协程。

    原生协程和装饰器协程,有些区别:

    • 原生协程通常更加快。
    • 可以使用async for 和 async with语句。
    • 装饰器协程返回一个future对象,原生协程返回的是awaitable 对象,不是future,在tornado中,两者大部分时候可以互换。
    • 装饰器协程对concurrent.futures包有集成,而对应的原生协程是使用IOLoop.run_in_executor
    • 装饰器协程支持yield后面有list和dict对象,原生协程使用这个tornado.gen.multi代替

    一些例子

    协程调用的方式:

    • yield或者await
    • 使用IOLoop.spawn_callback
    • 使用run_sync
      协程调用方式yield或者await:
    async def divide(x, y):
        return x / y
    async def good_call():
        # await will unwrap the object returned by divide() and raise
        # the exception.
        await divide(1, 0)
    
    • 如果想快速执行协程且不需直到结果,那么可以使用IOLoop.spawn_callback,不会等待协程完成,且没有返回值。
    from tornado import gen # 引入协程库
    from tornado.ioloop import IOLoop
    from tornado.httpclient import AsyncHTTPClient
    import asyncio
    import time
    
    async def test():
        r = await asyncio.sleep(3)
        print(r)
    
    def func_normal():
        start = time.time()
        print('开始调用协程')
        IOLoop.current().spawn_callback(test)
        print('结束协程调用',time.time()-start)
    func_normal()
    输出结果:
    开始调用协程
    结束协程调用 0.003989696502685547
    

    spawn_callback调用后,IOloop会在合适的时候调用test协程。

    • 当然,如果IOloop还未启动,想要调用协程的话,可以使用run_sync方法,是阻塞的。
    from tornado import gen # 引入协程库
    from tornado.ioloop import IOLoop
    from tornado.httpclient import AsyncHTTPClient
    import asyncio
    import time
    
    async def test():
        r = await asyncio.sleep(3)
        print(r)
    
    def func_normal():
        start = time.time()
        print('开始调用协程')
        IOLoop.current().run_sync(lambda: test())
        print('结束协程调用',time.time()-start)
    func_normal()
    输出结果:
    开始调用协程
    None
    结束协程调用 3.0050225257873535
    
    • 如果在IOloop中想调用一个阻塞函数,可以使用run_in_executor,这个方法返回一个future,可以使用await等待结果返回的
    from tornado.ioloop import IOLoop
    from tornado.httpclient import AsyncHTTPClient
    import asyncio
    import time
    
    async def call_blocking():
        await IOLoop.current().run_in_executor(None, time.sleep, 3)
    
    def func_normal():
        start = time.time()
        print('开始调用协程')
        IOLoop.current().run_sync(lambda:call_blocking())
        print('结束协程调用',time.time()-start)
    func_normal()
    输出结果:
    开始调用协程
    结束协程调用 3.0072133541107178
    
    • multi方法可以接受一个list或者dict,返回futures。
    from tornado.gen import multi
    
    async def parallel_fetch(url1, url2):
        resp1, resp2 = await multi([http_client.fetch(url1),
                                    http_client.fetch(url2)])
    
    async def parallel_fetch_many(urls):
        responses = await multi ([http_client.fetch(url) for url in urls])
        # responses is a list of HTTPResponses in the same order
    
    async def parallel_fetch_dict(urls):
        responses = await multi({url: http_client.fetch(url)
                                 for url in urls})
        # responses is a dict {url: HTTPResponse}
    

    用装饰器协程的话:

    @gen.coroutine
    def parallel_fetch_decorated(url1, url2):
        resp1, resp2 = yield [http_client.fetch(url1),
                              http_client.fetch(url2)]
    
    • 有时保存一个future而不是立即yield是很有用的,这时可以开始另一个操作的。
    from tornado.gen import convert_yielded
    
    async def get(self):
        # convert_yielded() starts the native coroutine in the background.
        # This is equivalent to asyncio.ensure_future() (both work in Tornado).
        fetch_future = convert_yielded(self.fetch_next_chunk())
        while True:
            chunk = yield fetch_future
            if chunk is None: break
            self.write(chunk)
            fetch_future = convert_yielded(self.fetch_next_chunk())
            yield self.flush()
    

    装饰器协程想实现类似功能,可以:

    @gen.coroutine
    def get(self):
        fetch_future = self.fetch_next_chunk()
        while True:
            chunk = yield fetch_future
            if chunk is None: break
            self.write(chunk)
            fetch_future = self.fetch_next_chunk()
            yield self.flush()
    
    • 循环
      原生协程可以使用async for语法。
    • 后台运行
      PeriodicCallback定时器通常不与协程一起使用。协程可以包含while true:loop并使用tornado.gen.sleep实现如下60s定时器:
    async def minute_loop():
        while True:
            await do_something()
            await gen.sleep(60)
    
    # Coroutines that loop forever are generally started with
    # spawn_callback().
    IOLoop.current().spawn_callback(minute_loop)
    

    上面实现其实是实现了60+n的定时器,n是do_something运行时间。为了精准的实现60s,可以使用下面方法:

    async def minute_loop2():
        while True:
            nxt = gen.sleep(60)   # Start the clock.
            await do_something()  # Run while the clock is ticking.
            await nxt             # Wait for the timer to run out.
    

    tornado.queues

    tornado.queues模块实现了异步生产者消费者模型
    Queue.get语句会被yield,直到队列里有元素。如果这个队列有设置最大值,直到有空间存放新的值。才会yield Queue.put语句。队列中维持未完成的任务,开始是0,put是增加数量,task_done是减少数量。
    以web爬虫举例,队列中原始url有base_url,当一个worker获取一个页面时,它解析链接并将新的链接放入队列中。然后调用 task_done减少数量。最终,worker会获取一个其URL都已经被解析过的页面,并且队列中也没有剩余的工作。因此,work调用task_done将计数器递减为零。主协程等待join().

    #!/usr/bin/env python3
    
    import time
    from datetime import timedelta
    
    from html.parser import HTMLParser
    from urllib.parse import urljoin, urldefrag
    
    from tornado import gen, httpclient, ioloop, queues
    
    base_url = "http://www.tornadoweb.org/en/stable/"
    concurrency = 10
    
    
    async def get_links_from_url(url):
        """Download the page at `url` and parse it for links.
    
        Returned links have had the fragment after `#` removed, and have been made
        absolute so, e.g. the URL 'gen.html#tornado.gen.coroutine' becomes
        'http://www.tornadoweb.org/en/stable/gen.html'.
        """
        response = await httpclient.AsyncHTTPClient().fetch(url)
        print("fetched %s" % url)
    
        html = response.body.decode(errors="ignore")
        return [urljoin(url, remove_fragment(new_url)) for new_url in get_links(html)]
    
    
    def remove_fragment(url):
        pure_url, frag = urldefrag(url)
        return pure_url
    
    
    def get_links(html):
        class URLSeeker(HTMLParser):
            def __init__(self):
                HTMLParser.__init__(self)
                self.urls = []
    
            def handle_starttag(self, tag, attrs):
                href = dict(attrs).get("href")
                if href and tag == "a":
                    self.urls.append(href)
    
        url_seeker = URLSeeker()
        url_seeker.feed(html)
        return url_seeker.urls
    
    
    async def main():
        q = queues.Queue()
        start = time.time()
        fetching, fetched = set(), set()
    
        async def fetch_url(current_url):
            if current_url in fetching:
                return
    
            print("fetching %s" % current_url)
            fetching.add(current_url)
            urls = await get_links_from_url(current_url)
            fetched.add(current_url)
    
            for new_url in urls:
                # Only follow links beneath the base URL
                if new_url.startswith(base_url):
                    await q.put(new_url)
    
        async def worker():
            async for url in q:
                if url is None:
                    return
                try:
                    await fetch_url(url)
                except Exception as e:
                    print("Exception: %s %s" % (e, url))
                finally:
                    q.task_done()
    
        await q.put(base_url)
    
        # Start workers, then wait for the work queue to be empty.
        workers = gen.multi([worker() for _ in range(concurrency)])
        await q.join(timeout=timedelta(seconds=300))
        assert fetching == fetched
        print("Done in %d seconds, fetched %s URLs." % (time.time() - start, len(fetched)))
    
        # Signal all the workers to exit.
        for _ in range(concurrency):
            await q.put(None)
        await workers
    
    
    if __name__ == "__main__":
        io_loop = ioloop.IOLoop.current()
        io_loop.run_sync(main)
    
    • 每个put对应一个task_done,task_done调用一次,就代表完成一个任务。
    • join等待所有任务完成。

    下面看一个简单的生产消费者例子:

    from tornado import gen
    from tornado.ioloop import IOLoop
    from tornado.queues import Queue
    
    q = Queue(maxsize=10)
    
    async def consumer():
        async for item in q:
            if item is None:
                return
            try:
                print('Doing work on %s' % item)
                await gen.sleep(0.01)
            finally:
                q.task_done()
    
    async def producer():
        for item in range(15):
            await q.put(item)
            print('Put %s' % item)
    
    async def main():
        # Start consumer without waiting (since it never finishes).
        workers = gen.multi([consumer() for _ in range(3)])
        await producer()     # Wait for producer to put all tasks.
        await q.join()       # Wait for consumer to finish all tasks.
        print('Done')
        for _ in range(3):
            await q.put(None)
        await workers
    
    IOLoop.current().run_sync(main)
    

    tornado web应用

    import tornado.ioloop
    import tornado.web
    
    class MainHandler(tornado.web.RequestHandler):
        def get(self):
            self.write("Hello, world")
    
    def make_app():
        return tornado.web.Application([
            (r"/", MainHandler),
        ])
    
    if __name__ == "__main__":
        app = make_app()
        app.listen(8888)
        tornado.ioloop.IOLoop.current().start()
    

    Application对象是针对全局配置的,包括请求的路由表映射。路由表是一个list,list元素是 URLSpec对象或者tuple,每个元素至少有路由规则和handler类,如果tuple的第三个元素是一个dict,这个dict会作为 RequestHandler.initialize的参数,另外,URLSpec也可能有name,用于RequestHandler.reverse_url.
    例如:

    import tornado.ioloop
    from tornado.web import RequestHandler, Application,url
    
    class MainHandler(RequestHandler):
        def get(self):
            self.write('<a href="%s">link to story 1</a>' %
                       self.reverse_url("story", "1"))
    
    class StoryHandler(RequestHandler):
        def initialize(self, db):
            self.db = db
    
        def get(self, story_id):
            self.write("this is story %s" % story_id)
    
    
    
    if __name__ == "__main__":
        db = 'test'
        app = Application([
            url(r"/", MainHandler),
            url(r"/story/([0-9]+)", StoryHandler, dict(db=db), name="story")
        ])
        app.listen(8888)
        tornado.ioloop.IOLoop.current().start()
    

    Application能接受很多关键字参数,参考 Application.settings

    RequestHandler子类

    很多tornado应用都是在RequestHandler子类中完成的,可以在子类中写http方法,get,post等,在一个handler中,调用方法 RequestHandler.render or RequestHandler.write产生回应response。render()是基于模板的,write不是,它接受参数类型可以是strings, bytes, and dict.

    处理请求

    当前请求可以使用这个代表 self.request,可以在HTTPServerRequest中看完整的属性。可以通过get_query_arguments and get_body_arguments获取get和post的参数。
    文件上传的数据在 self.request.files。
    为了统一格式处理请求,我们可以使用prepare。在处理get,post等之前调用。

    def prepare(self):
        if self.request.headers.get("Content-Type", "").startswith("application/json"):
            self.json_args = json.loads(self.request.body)
        else:
            self.json_args = None
    

    重载RequestHandler的方法

    在每个请求来到时:

    • 每个request会新建一个RequestHandler对象
    • initialize()方法的参数来自Application,initialize方法通常只将接受参数赋给成员属性,而不输出东西,或者调用方法。
    • prepare()的调用,可能会产生输出,另外,如果这里调用了finish或者redirect等,这个词请求就会结束了。
    • 当请求结束时,会调用on_finish()

    所有可以重载的方法都可以在RequestHandler文档中看到,下面是一些常见的:

    错误处理

    如果handler抛出异常,tornado将会调用RequestHandler.write_error生成一个错误页面。 tornado.web.HTTPError能用于生成指定错误码。
    如果想自定义错误页面,重写RequestHandler.write_error这个方法,对于404错误,可以使用 default_handler_class Application setting,也可使用self.set_status(404)进行设置。

    重定向

    有两种方式重定向:

    app = tornado.web.Application([
        url(r"/app", tornado.web.RedirectHandler,
            dict(url="http://itunes.apple.com/my-app-id")),
        ])
    #或者
    app = tornado.web.Application([
        url(r"/photos/(.*)", MyPhotoHandler),
        url(r"/pictures/(.*)", tornado.web.RedirectHandler,
            dict(url=r"/photos/{0}")),
        ])
    
    • RedirectHandler中调用redirect方法

    异步handle

    例如下面使用协程方式:

    class MainHandler(tornado.web.RequestHandler):
        async def get(self):
            http = tornado.httpclient.AsyncHTTPClient()
            response = await http.fetch("http://friendfeed-api.com/v2/feed/bret")
            json = tornado.escape.json_decode(response.body)
            self.write("Fetched " + str(len(json["entries"])) + " entries "
                       "from the FriendFeed API")
    

    更多的例子可以参考 chat example application

    参考

    tornado官方文档

    相关文章

      网友评论

          本文标题:tornado 系列讲解之一 (整体介绍 上)

          本文链接:https://www.haomeiwen.com/subject/dgvzectx.html