Tornado异步非阻塞详解

作者: 不爱去冒险的少年y | 来源:发表于2020-03-01 22:03 被阅读0次

    前言:鉴于Google了大片关于Tornado框架关于其异步非阻塞的实现方法和缘由结果都不尽理想,在此写一篇个人了解的博客来向诸位解释Tornado的异步非阻塞的原理和实现方法,在此感谢前人栽树!

    异步非阻塞是针对另一请求来说的,本次的请求该是阻塞的仍然是阻塞的,这跟Python里面的异步是不一样的,Python里面的异步是指异步的代码段独立执行,原代码中会持续执行异步代码段下面的代码

    系统:MAC
    python:3.6
    Tornado:6.1
    接口测试:ab
    

    一、异步实现

    1.使用 gen.coroutine 异步编程

    在 Tornado 中两个装饰器:

    • tornado.web.asynchronous :长连接装饰器 python2中,Python3中取消了
    • tornado.gen.coroutine :协程模式装饰器 跟python3中 async await功能一致
      asynchronous 装饰器是让请求变成长连接的方式,必须手动调用 self.finish() 才会响应
    class MainHandler(tornado.web.RequestHandler):
        @tornado.web.asynchronous
        def get(self):
            # bad 
            self.write("Hello, world")
    
    

    asynchronous 装饰器不会自动调用self.finish() ,如果没有没有指定结束,该长连接会一直保持直到 pending 状态。

    peding

    所以正确是使用方式是使用了 asynchronous 需要手动 finish

    class MainHandler(tornado.web.RequestHandler):
        @tornado.web.asynchronous
        def get(self):
            self.write("Hello, world")
            self.finish()
    

    coroutine 装饰器是指定改请求为协程模式,说明白点就是能使用 yield 配合 Tornado 编写异步程序。

    Tronado 为协程实现了一套自己的协议,不能使用 Python 普通的生成器。

    在使用协程模式编程之前要知道如何编写 Tornado 中的异步函数,Tornado 提供了多种的异步编写形式:回调、Future、协程等,其中以协程模式最是简单和用的最多。

    编写一个基于协程的异步函数同样需要 coroutine 装饰器

    class SleepHandler(BaseHandler):
        """
        异步的延时10秒的接口
        """
        @gen.coroutine
        def get(self):
            yield gen.sleep(10)
            self.write("when i sleep 5s")
    

    使用 coroutine 方式有个很明显是缺点就是严重依赖第三方库的实现,如果库本身不支持 Tornado 的异步操作再怎么使用协程也是白搭依然会是阻塞的,放个例子感受一下。

    class SyncSleepHandler(BaseHandler):
        """
        同步的方式,一个延时10s的接口
        """
        def get(self):
            print(3)
            time.sleep(10)
            print(4)
            self.write("when i sleep 10s")
    
    
    class SleepHandler(BaseHandler):
        """
        异步的延时10秒的接口,gen.sleep支持Tornado异步
        """
        @gen.coroutine
        def get(self):
            print(1)
            yield gen.sleep(10)
            print(2)
            self.write("when i sleep 10s")
    
    
    class NoSleepHandler(BaseHandler):
        """
        time库不支持Tornado异步
        """
        @gen.coroutine
        def get(self):
            print(5)
            yield time.sleep(5)
            print(6)
            self.write("when i sleep 10s")
    

    1 .运行SleepHandler接口,执行异步非阻塞,执行100个并发,通过打印发现一个线程在一个请求在执行gen.sleep(10)并没有阻塞其他请求,正因为如此所以它是异步非阻塞的。

    image.png
    image.png
    2 .运行SyncSleepHandler接口,执行同步阻塞,执行10个并发(由于同步100个等待时间太长,减少到10个,并不影响),通过打印发现一个线程在一个请求执行time.sleep(10)的时候会阻塞其他请求,直到该请求完毕后才会执行下一个请求
    image.png image.png
    3 . 运行NoSleepHandler接口,执行不支持Tornado异步的协程接口,5个并发(理由如上),通过ab发现5个并发数耗时25秒,没有实现异步,在第二图中打印出来发现接口存在任务调度,执行第二个接口后没等接口执行完毕,系统线程任务调度暂停第二个接口,执行第三个接口以此类推,这就表明使用@gen.coroutine如果库本身不支持Tornado 的异步操作再怎么使用协程也是白搭依然会是阻塞的,
    image.png
    image.png
    2.基于线程的异步编程

    使用 gen.coroutine 装饰器编写异步函数,如果库本身不支持异步,那么响应任然是阻塞的。

    在 Tornado 中有个装饰器能使用 ThreadPoolExecutor 来让阻塞过程变成非阻塞,其原理是在 Tornado 本身这个线程之外另外启动一个线程来执行阻塞的程序,从而让 Tornado 变得阻塞。

    futures 在 Python3 是标准库,但是在 Python2 中需要手动安装
    pip install futures

    class ThreadSleepHandler(BaseHandler):
        """
        time库不支持Tornado异步
        """
        # 必须定义一个executor的属性,然后run_on_executor 注解才管用。
        executor = ThreadPoolExecutor(max_workers=4)
    
        @gen.coroutine
        def get(self):
            print(5)
            yield self.sleep_fun()
            print(6)
            self.write("when i sleep 10s")
    
        @run_on_executor
        def sleep_fun(self):
            time.sleep(5)
    

    通过下图发现5个并发只需10秒,实现了异步非阻塞
    但是与之而来的问题是,如果大量使用线程化的异步函数做一些高负载的活动,会导致该 Tornado 进程性能低下响应缓慢,这只是从一个问题到了另一个问题而已。

    所以在处理一些小负载的工作,是能起到很好的效果,让 Tornado 异步非阻塞的跑起来。

    但是明明知道这个函数中做的是高负载的工作,那么你应该采用另一种方式,使用 Tornado 结合 Celery 来实现异步非阻塞。


    image.png image.png
    3.基于 Celery 的异步编程

    先编写一个异步任务

    import time
    
    from celery import Celery
    
    app = Celery("tasks", broker="amqp://guest:guest@localhost:5672")
    app.conf.CELERY_RESULT_BACKEND = "amqp://guest:guest@localhost:5672"
    
    @app.task
    def sleep_fun(second):
        time.sleep(second)
        return 'ok'
    
    if __name__ == "__main__":
        app.start()
    

    然后启动celery celery -A apps.foo.tasks.app worker --loglevel=info

    class CelerySleepHandler(tornado.web.RequestHandler):
        @gen.coroutine
        def get(self):
            print(7)
            response = yield gen.Task(tasks.sleep_fun.apply_async, args=[5])
            print(8)
    
            self.write("when i sleep 10s")
    

    调用该接口就会发现结过跟异步非阻塞一致

    Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的任务队列,同时也支持任务调度。

    Celery 并不是唯一选择,你可选择其他的任务队列来实现,但是 Celery 是 Python 所编写,能很快的上手,同时 Celery 提供了优雅的接口,易于与 Python Web 框架集成等特点。

    与 Tornado 的配合可以使用 tornado-celery ,该包已经把 Celery 封装到 Tornado 中,可以直接使用。

    实际测试中,由于 tornado-celery 很久没有更新,导致请求会一直阻塞,不会返回
    解决办法是:

    1. 把 celery 降级到 3.1 pip install celery==3.1
    2. 把 pika 降级到 0.9.14 pip install pika==0.9.14
    4.python的原生协程关键字:Async和Await

    它们的底层基于生成器函数,使得协程的实现更加方便。

    Async 用来声明一个函数为异步函数,异步函数的特点是能在函数执行过程中挂起,去执行其他异步函数,等到挂起条件(假设挂起条件是sleep(5))消失后,也就是5秒到了再回来执行。

    Await 用来用来声明程序挂起,比如异步程序执行到某一步时需要等待的时间很长,就将此挂起,去执行其他的异步程序
    首先我们先来看一个程序(其中 asyncio 库是支持异步的)

    class AsynchronousSleepHandler(BaseHandler):
    
    
        async def get(self):
            print(5)
            await asyncio.sleep(5)
            print(6)
            self.write("when i sleep 10s")
    

    通过打印可知是异步非阻塞的,想问一下为什么是10s呢 ,5个并发休眠5秒不应该是5秒吗?为何第一个执行完毕才开始并发执行下面的?


    image.png
    image.png
    参考文献:

    真正的 Tornado 异步非阻塞

    欢迎参观个人博客:

    不爱去冒险的少年y

    相关文章

      网友评论

        本文标题:Tornado异步非阻塞详解

        本文链接:https://www.haomeiwen.com/subject/vczphhtx.html