美文网首页爬虫相关
利用tornado使请求实现异步非阻塞

利用tornado使请求实现异步非阻塞

作者: 蒋狗 | 来源:发表于2017-04-01 17:06 被阅读2095次
    基本IO模型

    网上搜了很多关于同步异步,阻塞非阻塞的说法,理解还是不能很透彻,有必要买书看下。
    参考:使用异步 I/O 大大提高应用程序的性能
    怎样理解阻塞非阻塞与同步异步的区别?

    1. 同步和异步:主要关注消息通信机制(重点在B?)。
      同步:A调用B,B处理直到获得结果,才返回给A。
      异步:A调用B,B直接返回。无需等待结果,B通过状态,通知等来通知A或回调函数来处理。

    2. 阻塞和非阻塞:主要关注程序等待(重点在A?)。
      阻塞:A调用B,A被挂起直到B返回结果给A,A继续执行。
      非阻塞:A调用B,A不会被挂起,A可以执行其他操作(但可能A需要轮询检查B是否返回)。

    3. 同步阻塞:A调用B,A挂起,B处理直到获得结果,返回给A,A继续执行。

    4. 同步非阻塞:A调用B,A继续执行,B处理直到获得结果,处理的同时A轮询检查B是否返回结果。

    5. 异步阻塞:异步阻塞 I/O 模型的典型流程 (select)。

    6. 异步非阻塞:A调用B,B立即返回,A继续执行,B得到结果后通过状态,通知等通知A或回调函数处理。

    tornado实现异步非阻塞

    参考:

    1. 使用tornado让你的请求异步非阻塞
    2. 官方文档
    利用异步方法(回调)和@tornado.web.asynchronous

    @tornado.web.asynchronous 并不能将一个同步方法变成异步,所以修饰在同步方法上是无效的,只是告诉框架,这个方法是异步的,且只能适用于HTTP verb方法(get、post、delete、put等)。@tornado.web.asynchronous 装饰器适用于callback-style的异步方法,如果是协程则可以用@tornado.gen.coroutine来修饰。对于用@tornado.web.asynchronous 修饰的异步方法,需要主动self.finish()来结束该请求,普通的方法(get()等)会自动结束请求在方法返回的时候。
    最基本的使用callback-style的例子,直接使用异步方法,并定义callback方法。

    # sync blocking
    class SleepHandler(BaseHandler):
    
        # no effective
        @tornado.web.asynchronous
        def get(self):
            time.sleep(5)
            self.write('sleep for 5s')
    
    
    class SleepHandler(BaseHandler):
    
        @tornado.web.asynchronous
        def get(self):
            tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 5, callback=self.on_response)
    
        def on_response(self):
            self.write('sleep for 5s')
            self.finish()
    
    
    # call back
    class MyRequestHandler(BaseHandler):
        @tornado.web.asynchronous
        def get(self):
            http = httpclient.AsyncHTTPClient()
            http.fetch('http://www.baidu.com', self._on_download)
    
        def _on_download(self, response):
            self.write(response.body)
            self.finish()
    

    利用ThreadPoolExecutor

    利用ThreadPoolExecutor的submit和future对象的add_done_callback 方法,将一个同步方法变成异步。如下例所示,若没有无参可以不用partial()

    class SleepHandler(BaseHandler):
        @tornado.web.asynchronous
        def get(self):
            sleep_time = 5
    
            def callback(future):
                self.write(future.result())
                self.finish()
    
            EXECUTOR.submit(partial(self.get_sleep, sleep_time)).add_done_callback(
                lambda future: tornado.ioloop.IOLoop.instance().add_callback(
                    partial(callback, future)))
    
        def get_sleep(self, sleep_time):
            time.sleep(sleep_time)
            return "Awake! %s" % time.time()
    

    分解一下:

    1. future = EXECUTOR.submit(partial(self.get_sleep, sleep_time))返回了一个future对象。
    2. future.add_done_callback()future添加一个完成回调函数callback_func
    3. 实际上这个回调函数是
    callback_func = lambda future: tornado.ioloop.IOLoop.instance().add_callback(partial(callback, future))``` 
    4. 结合起来就是:
    

    class SleepHandler(BaseHandler):
    @tornado.web.asynchronous
    def get(self):
    sleep_time = 5

        def final_callback(future_param):
            self.write(future_param.result())
            self.finish()
    
        def executor_callback(future_param):
            tornado.ioloop.IOLoop.instance().add_callback(partial(final_callback, future_param))
    
        future = EXECUTOR.submit(partial(self.get_sleep, sleep_time))
        future.add_done_callback(executor_callback)
    
    def get_sleep(self, sleep_time):
        time.sleep(sleep_time)
        return "Awake! %s" % time.time()
    
    5. 没看文档,源码前本认为为什么要多此一举进行两次callback,直接
    

    tornado.ioloop.IOLoop.instance().add_callback(partial(final_callback, future))

    不就ok了吗,为何还要再加上一层
    

    future.add_done_callback(executor_callback)

    但发现直接这样是会阻塞IOLoop的。查阅相关文档后发现,```
    tornado.ioloop.IOLoop.instance().add_callback
    ``` 这个方法会将控制权从其他线程转到IOLoop线程上,直接用
    

    tornado.ioloop.IOLoop.instance().add_callback(partial(final_callback, future))
    时,final_callback()中获取future_param.result()仍然会阻塞,所以需要future.add_done_callback()```,在该线程完成获取结果后在回callback给IOLoop。


    另一种类似的写法,将上例的方法写成了一个装饰器,见下例,但个人认为利用这种方式异步时没有必要单独写一个装饰器吧,而且也不通用吧?

    def unblock(f):
        @tornado.web.asynchronous
        @wraps(f)
        def wrapper(*args, **kwargs):
            self = args[0]
    
            def callback(future):
                self.write(future.result())
                self.finish()
            EXECUTOR.submit(
                partial(f, *args, **kwargs)
            ).add_done_callback(
                lambda future: tornado.ioloop.IOLoop.instance().add_callback(
                    partial(callback, future)))
        return wrapper
    
    
    class SleepHandler(BaseHandler):
        @unblock
        def get(self):
            time.sleep(5)
            return "Awake! %s" % time.time()
    

    使用tornado.concurrent.run_on_executor简化

    上述的两例都相当于开启了新的线程池?
    除此之外还可以利用@run_on_executor装饰器将同步阻塞函数变成异步(或者说被tornado的装饰器理解和识别)。首先阅读一下@run_on_executor源码:

    def run_on_executor(*args, **kwargs):
        """Decorator to run a synchronous method asynchronously on an executor.
    
        The decorated method may be called with a ``callback`` keyword
        argument and returns a future.
    
        The `.IOLoop` and executor to be used are determined by the ``io_loop``
        and ``executor`` attributes of ``self``. To use different attributes,
        pass keyword arguments to the decorator::
    
            @run_on_executor(executor='_thread_pool')
            def foo(self):
                pass
    
        .. versionchanged:: 4.2
           Added keyword arguments to use alternative attributes.
        """
        def run_on_executor_decorator(fn):
            executor = kwargs.get("executor", "executor")
            io_loop = kwargs.get("io_loop", "io_loop")
    
            @functools.wraps(fn)
            def wrapper(self, *args, **kwargs):
                callback = kwargs.pop("callback", None)
                future = getattr(self, executor).submit(fn, self, *args, **kwargs)
                if callback:
                    getattr(self, io_loop).add_future(
                        future, lambda future: callback(future.result()))
                return future
            return wrapper
        if args and kwargs:
            raise ValueError("cannot combine positional and keyword args")
        if len(args) == 1:
            return run_on_executor_decorator(args[0])
        elif len(args) != 0:
            raise ValueError("expected 1 argument, got %d", len(args))
        return run_on_executor_decorator
    

    可以很快发现在不存在callback关键字参数时,该装饰器返回了一个future对象,由此并结合上述两例子可以很快的利用@run_on_executor写出,感觉相当于简化了上述两例的代码并使之变得通用:

    class SleepHandler(BaseHandler):
    
        executor = ThreadPoolExecutor(2)
    
        @tornado.web.asynchronous
        def get(self):
            def callback(future_param):
                self.write('sleep %ss' % future_param.result())
                self.finish()
    
            future = self.sleep()
            future.add_done_callback(lambda f: tornado.ioloop.IOLoop.instance().add_callback(
                    partial(callback, f)))
    
        @run_on_executor
        def sleep(self):
            time.sleep(5)
            return 5
    

    当然也可以利用callback参数:

    class SleepHandler(BaseHandler):
        executor = ThreadPoolExecutor(2)
        io_loop = tornado.ioloop.IOLoop.instance()
    
        @tornado.web.asynchronous
        def get(self):
            def callback(res):
                self.write('sleep %ss' % res)
                self.finish()
    
            self.sleep(callback=callback)
    
        @run_on_executor
        def sleep(self):
            time.sleep(5)
            return 5
    

    协程方式:@tornado.gen.coroutineyield

    除了上述的用利用@tornado.web.asynchronous和callback的方式,还可以用@tornado.gen.coroutineyield,如下例子:

    class GenRequestHandler(BaseHandler):
        @tornado.gen.coroutine
        def get(self):
            http = httpclient.AsyncHTTPClient()
            res = yield http.fetch('http://www.baidu.com')
            self.write(res.body)
    

    参考官方文档

    Most asynchronous functions in Tornado return a Future
    ; yielding this object returns its result
    .

    利用tornado.gen.Task修饰一个callback型的异步函数使其能够与yield使用。

    gen.Task is now a function that returns a Future

    看一下例子(对于sleep可以直接用yield tornado.gen.sleep(5)):

    # gen.Task
    class SleepHandler(BaseHandler):
    
        @tornado.gen.coroutine
        def get(self):
            yield tornado.gen.Task(tornado.ioloop.IOLoop.instance().add_timeout, time.time() + 5)
            # yield tornado.gen.sleep(5)
            self.write('sleep for 5s')
    

    还可以用结合celery使用(但并不是最好的方案)。


    Last

    https://github.com/tornadoweb/tornado/wiki/Links tornado的wiki上有很多异步相关支持的库。

    相关文章

      网友评论

        本文标题:利用tornado使请求实现异步非阻塞

        本文链接:https://www.haomeiwen.com/subject/qbfpottx.html