美文网首页
Tornado应用笔记04-浅析源码

Tornado应用笔记04-浅析源码

作者: simplue | 来源:发表于2017-06-14 20:51 被阅读0次

    索引

    本节内容将分析Tornado中利用协程实现异步原理, 主要分析的是装饰器@gen.coroutine, 包括源码分析和异常捕获等问题, 另外也包括了对@asynchronous, Future等相关对象的分析.

    "未来的坑" Future

    在介绍两个重要的装饰器之前, 先来说说Future, 它是实现异步的一个重要对象. Future就像它的名字一样, 装载的是"未来"(未完成操作的结果), 文档描述它是"异步操作结果的占位符". 在Tornado中, 常见用法有下面两种:

    # 在`IOLoop`注册`future`
    tornado.ioloop.IOLoop.add_future(future, future_done_callback_func)
    
    # `@gen.coroutine`内部结合`yield`使用
    @gen.coroutine
    def foo():
        result = yield future
    

    Tornado中内置的Future(tornado.concurrent.Future)与futures包中的Future(concurrent.futures.Future)很相似, 不过Tornado的Future不是"线程安全"的, 因为Tornado本身是单线程, 所以用起来并没什么不妥, 而且速度更快

    Tornado 4.0以前, Tornado的Future实际上还是引用的"线程安全"的concurrent.futures.Future, 只有在没有安装future包时才会使用"非线程安全"的Tornado 内置Future. Tornado 4.0以后的版本, 所有的Future都变成内置的, 并为其加入了exc_info方法. 这两种Futrue基本上是"兼容"的, 不过这里所谓的"兼容"只是在"调用"层面上的, 部分操作不一定会生效或执行.

    Tornado 4.1后, 如果Future中的异常没有被触发(比如调用result(),exception()exc_info()), 那在Future被垃圾回收时, 会log异常信息. 如果你既想"发现"异常, 又不想让它log, 可以这么做future.add_done_callback(lambda future: future.exception())

    下面介绍Future中最主要三个方法:

    class Future(object):
    
        def result(self, timeout=None):
            # 返回future的值(future._result), 如果有执行异常, 那么将会触发异常
            self._clear_tb_log()
            if self._result is not None:
                return self._result
            if self._exc_info is not None:
                raise_exc_info(self._exc_info)
            self._check_done()
            return self._result
    
        def add_done_callback(self, fn):
            # 为future添加回调到回调列表中, 在`.set_result`后执行,
            # 不过如果future已经完成了, 那么会直接执行这个回调, 不放入回调列表
            if self._done:
                fn(self)
            else:
                self._callbacks.append(fn)
    
        def set_result(self, result):
            # 为future设置值, 然后执行回调列表中的所有回调, 
            # 回调传入的唯一参数就是future本身
            self._result = result
            self._set_done()
    
    

    Future对于刚开始接触这个问题的开发者来说, 可能是一个不容易理解的对象, 是需要一定时间的去消化. 虽然你可能在之前已经借助gen.coroutine@asynchronous写过一些异步代码, 但是Future都是被封装到里边的, 你并不清楚其中的原理. 当你看到一些更灵活的异步应用时, 你可能就没有办法理解其中的逻辑. 所以Tornado作者建议大家都用Future练习写异步代码, 以便更好理解其所以然.

    下面的例子实现了异步HTTP请求, 一个用@gen.coroutine实现, 一个用较原始的Future实现, 对比其中的不同, 或者动手改改, 但愿能帮助你理解Future.

    # @gen.coroutine 实现
    class AsyncFetch(tornado.web.RequestHandler):
        @gen.coroutine
        def get(self, *args, **kwargs):
            client = tornado.httpclient.AsyncHTTPClient()
            response = yield client.fetch('http://www.baidu.com', request_timeout=2)
            self.finish(response.body)
    
    # Future 实现
    class AsyncFetch(tornado.web.RequestHandler):
        @asynchronous
        def get(self, *args, **kwargs):
            client = tornado.httpclient.AsyncHTTPClient()
            fetch_future = client.fetch('http://www.baidu.com', request_timeout=2)
            tornado.ioloop.IOLoop.current().add_future(fetch_future, callback=self.on_response)
    
        def on_response(self, future):
            response = future.result()
            self.finish(response .body)
    
    异步装饰器 @asynchronous

    这个装饰器适合处理回调式的异步操作, 如果你想使用协程实现异步, 那么应该单独使用@gen.coroutine. 考虑到某些历史遗留问题, 同时使用 @gen.coroutine@asynchronous也是可以的, 但是 @asynchronous 必须放在 @gen.coroutine的前面, 否则@asynchronous将被忽略.

    注意, 这个装饰器能且只能用在get post一类方法上, 用在其他任意方法都是无意义的. 同时装饰器并不会"真正"使一个请求变为异步, 而仅仅是"告诉"tornado这个请求是异步的, 要使请求异步化, 则必须要在请求内完成一些异步操作, 里面的阻塞操作是会阻塞整个线程的, 不会响应新的请求, 如果你在里面sleep了, 那线程就sleep了.

    另外, 用了这个装饰器以后, 请求并不会在return后结束(因为这个请求是异步的, Tornado"不知道"何时会完成, 所以会一直保持与客户端的连接), 需要显式调用 self.finish() 才会结束请求

    附: Tornado 作者对 @gen.coroutine@asynchronous 一起使用的回答:

    Order matters because @asynchronous looks at the Future returned by @gen.coroutine, and calls finish for you when the coroutine returns. Since Tornado 3.1, the combination of @asynchronous and @gen.coroutine has been unnecessary and discouraged; in most cases you should use @gen.coroutine alone.

    @gen.coroutine@asynchronous共用需要注意顺序是因为, @asynchronous监控着@gen.coroutine 返回的 Future 然后在Future完成的时候自动调用 finish.自tornado 3.1开始, 两者就可以独立使用且并不鼓励共用, 实际上在绝大多数情况下,只需要使用 @gen.coroutine

    源码注释:
    @functools.wraps(method)
    def wrapper(self, *args, **kwargs):
    
        # 关闭自动finish, 需要显式调用self.finish()
        self._auto_finish = False
        with stack_context.ExceptionStackContext(
                self._stack_context_handle_exception):
    
            # 执行method内的函数, 并将结果转换成future, 
            # 使用`add_future`将回调函数`future_complete`注册到`ioloop`中,
            # 回调做了两件事, 一是通过调用`future.result()`检查异常
            # 二是自动finish请求, 无需在请求内显式finish
            result = method(self, *args, **kwargs)
            if result is not None:
                result = gen.convert_yielded(result)
                def future_complete(f):
                    f.result()
                    if not self._finished:
                        self.finish()
                IOLoop.current().add_future(result, future_complete)
                return None
            return result
    return wrapper
    
    
    协程装饰器 @gen.coroutine

    在理解这个装饰器前, 需要你已经了解生成器的工作方式, 比如看懂下面这段代码和执行结果. 如果你对此还不了解, 那么建议你先看看这篇文章, 然后再往下读.

    >>> def echo(value=None):
    ...   while 1:
    ...     value = (yield value)
    ...     print("The value is", value)
    ...     if value:
    ...       value += 1
    ...
    >>> g = echo(1)
    >>> next(g)
    1
    >>> g.send(2)
    The value is 2
    3
    >>> g.send(5)
    The value is 5
    6
    >>> next(g)
    The value is None
    

    Py 3.3以前的版本, 使用了这个装饰器的生成器(含yield的函数)都不能直接使用return来返回值, 需要触发一种特殊的异常gen.Return来达到return的效果, 不过在任意版本中均可通过不带参数的return提前退出生成器.

    装饰器返回的是一个Future对象, 如果调用时设置了回调函数callback, 那么callback将会在Futureset_result后调用, 若协程执行失败, callback也不会执行. 需要注意的是, callback并不需要作为被修饰函数的"可见"参数, 因为callback是被gen.coroutine处理的(具体用法见上一节线程池处理阻塞操作部分).

    需要特别注意的是其中的异常处理. 执行发生异常时, 异常信息会存储在.Future 对象内. 所以必须检查.Future 对象的结果, 否则潜在的异常将被忽略. 在一个@gen.coroutine内调用另外一个@gen.coroutine, 官方文档推荐两种方式

    # 在顶层使用下面的方式调用
    tornado.ioloop.IOLoop.run_sync(coroutine_task_func)
    
    # 使用`add_future`
    tornado.ioloop.IOLoop.add_future(future, callback)
    

    其实实际上只要调用了futrueresult方法, 那么异常就会被触发, 所以也可以使用下面两种方式

    # 使用了`@gen.coroutine`的生成器, 靠`Runner`调用`future.result`触发异常, 下面会分析`Runner`源码
    yield tornado.gen.Task(coroutine_task_func)
    yield the_coroutine_task(callback=my_callback_func)
    
    异常捕获
    @tornado.gen.coroutine
    def catch_exc():
        r = yield tornado.gen.sleep(0.1)
        raise KeyError
    
    
    @tornado.gen.coroutine
    def uncatch_exc():
        # 需要注意的是, 这里的阻塞操作, 也是会阻塞整个线程的
        time.sleep(0.1)
        raise KeyError
    
    
    class CoroutineCatchExc(tornado.web.RequestHandler):
        @tornado.gen.coroutine
        def get(self):
            # 直接调用 `catch_exc` 也是可以触发异常的, 不过无法在这里捕获
            # 因为里面有, 在 `Runner` 中对生成器 `send` 操作的时候会触发
            # 不过如果只是想丢到`后台`执行, 这样做也是可以的, 异常都交给任务自身处理
            catch_exc()
    
            # 如果单独使用下面的调用是会彻底忽略掉协程执行中的异常的, 不会输出任何信息,
            uncatch_exc()
    
            # 下面的用法也会触发异常, 不过同样的, 并没有办法在这里捕获
            # gen.coroutine 在调用 callback 时自动传入 future.result(), 抛出异常
            uncatch_exc(callback=lambda future_result: -1)
    
            # 捕获并处理异常的方法
    
            # 方法1
            # 需要注意的是使用`ioloop`回调传入的是`future`, 不是`future.result()`
            # 所以, 在回调里面不调用`future.result()`也是白搭
            def foo(future):
                fu = future  # 这样做也是触发不了异常的
                try:
                    future_result = fu.result()  # 这样才可以
                except:
                    import traceback
                    print 'catch exc in callback, the exc info is:'
                    print traceback.format_exc()
                else:
                    print 'future completed and the result is %s' % future_result
            
            fu = uncatch_exc()
            tornado.ioloop.IOLoop.current().add_future(fu, callback=foo)
    
            # 方法2
            # 使用 yield 后, 就成了生成器, 在`gen.coroutine`中会调用`Runner`
            # 驱动生成器, `Runner`内部有调用`future.result()`
            try:
                future_result = yield uncatch_exc('catch exc')
            except:
                import traceback
                print 'catch exc in yield, the exc info is:'
                print traceback.format_exc()
            else:
                print 'future completed and the result is %s' % future_result
    
            self.finish("coroutine catch exc test")
    
    
    源码注释:
    def coroutine(func, replace_callback=True):
        # `coroutine`的功能实际上由`_make_coroutine_wrapper`实现
        return _make_coroutine_wrapper(func, replace_callback=True)
    
    def _make_coroutine_wrapper(func, replace_callback):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # 创建一个 `future`
            future = TracebackFuture()
    
            # 如果调用时设置了`callback`, 则在`IOLoop`注册`future`及其回调事件
            # 因为被修饰的函数没有`callback`这个"可见"参数, 所以需要`pop`掉, 以免报错
            if replace_callback and 'callback' in kwargs:
                callback = kwargs.pop('callback')
                IOLoop.current().add_future(
                    future, lambda future: callback(future.result()))
    
            # 执行被修饰函数, 获取结果
            # 抛出的执行异常将被`set_exc_info`进`Future`内, 在执行`future.result()`时, 异常会被触发,
            # 对于`Return`和`StopIteration`, 这类特殊的异常, 将返回函数的返回值
            # 不过在`Python 3.3+`中`StopIteration`才会有`value`属性, 也就是可以直接使用`return`返回
            try:
                result = func(*args, **kwargs)
            except (Return, StopIteration) as e:
                result = _value_from_stopiteration(e)
            except Exception:
                future.set_exc_info(sys.exc_info())
                return future
    
            # 这里使用的`else`只有在`try`正常结束时执行
            # 如果被修饰的是一个`生成器`, 获取生成器生成的第一个结果, 异常处理与上面一致
            # 如果只是普通的"同步"函数(不是生成器), 那就跳过这步, 避免创建`Runner`浪费资源
            # 将第一个`yield`的结果, `生成器`(函数本身)和上面新建的`Future`一同传入`Runner`
            # `Runner`是实现协程异步的关键, 下面接着分析其中的代码
            else:
                if isinstance(result, GeneratorType):
                    try:
                        orig_stack_contexts = stack_context._state.contexts
                        yielded = next(result)
                        if stack_context._state.contexts is not orig_stack_contexts:
                            yielded = TracebackFuture()
                            yielded.set_exception(
                                stack_context.StackContextInconsistentError(
                                    'stack_context inconsistency (probably caused '
                                    'by yield within a "with StackContext" block)'))
                    except (StopIteration, Return) as e:
                        future.set_result(_value_from_stopiteration(e))
                    except Exception:
                        future.set_exc_info(sys.exc_info())
                    else:
                        Runner(result, future, yielded)
                    try:
                        # 生成器, 经过`Runner`, 已经`set_result`, 直接返回`future`
                        return future
                    finally:
                        future = None
            # 非生成器, 没经过`Runner`, `set_result`后返回
            future.set_result(result)
            return future
        return wrapper
    
    # `Runner`主要看`run`和`handle_yield`两个函数
    class Runner(object):
        def __init__(self, gen, result_future, first_yielded):
            self.gen = gen
            self.result_future = result_future
            self.future = _null_future
            # 将结果转换成future, 然后判断状态, 择机进入run
            if self.handle_yield(first_yielded):
                self.run()
    
        # `run`实际上就是一个生成器驱动器, 与`IOLoop.add_future`配合, 利用协程实现异步
        # `run`内部虽然是个死循环, 但是因为调用了`gen.send`, 
        # 所以在`gen.send`时可以暂时离开循环, 返回到生成器中(即yield的`断点`), 使得生成器得以继续工作
        # 当生成器返回一个新的`future`时, 再次调用`handle_yield`, 
        # 若`future`完成了就进入下一次`yield`, 
        # 没完成就等到完成以后在进入到`run`进入下一次`yield`
       
        # 简化的`run`可表示成下面的样子
        # def run(self):
        #    future = self.gen.send(self.next)
        #    def callback(f):
        #        self.next = f.result()
        #        self.run()
        #    future.add_done_callback(callback)
    
        def run(self):
            # 各种运行状态判断, 异常处理
            if self.running or self.finished:
                return
            try:
                self.running = True
                while True:
                    future = self.future
                    if not future.done():
                        return
                    self.future = None
                    try:
                        orig_stack_contexts = stack_context._state.contexts
                        exc_info = None
    
                        # 查异常, 有则抛出
                        try:
                            value = future.result()
                        except Exception:
                            self.had_exception = True
                            exc_info = sys.exc_info()
    
                        if exc_info is not None:
                            yielded = self.gen.throw(*exc_info)
                            exc_info = None
    
                        # 正常情况, 无异常
                        else:
                            # 驱动生成器运行, 恢复到`yield`断点继续执行, 是整个函数的关键
                            yielded = self.gen.send(value)
    
                        if stack_context._state.contexts is not orig_stack_contexts:
                            self.gen.throw(
                                stack_context.StackContextInconsistentError(
                                    'stack_context inconsistency (probably caused '
                                    'by yield within a "with StackContext" block)'))
    
                    # 生成器被掏空, 结束
                    except (StopIteration, Return) as e:
                        self.finished = True
                        self.future = _null_future
                        if self.pending_callbacks and not self.had_exception:
                            raise LeakedCallbackError(
                                "finished without waiting for callbacks %r" %
                                self.pending_callbacks)
                        self.result_future.set_result(_value_from_stopiteration(e))
                        self.result_future = None
                        self._deactivate_stack_context()
                        return
    
                    # 其他异常
                    except Exception:
                        self.finished = True
                        self.future = _null_future
                        self.result_future.set_exc_info(sys.exc_info())
                        self.result_future = None
                        self._deactivate_stack_context()
                        return
                    # 配合`handle_yield`, 使用`IOLoop`注册事件
                    if not self.handle_yield(yielded):
                        return
            finally:
                self.running = False
    
        def handle_yield(self, yielded):
            # 省略部分无关代码
            # 先将传入的第一个生成器结果转换为`Future`对象
            # 如果`Future`还没有执行完毕, 或者是`moment`(一种内置的特殊`Future`, 这里可以忽视)
            # 那就等待`Future`执行完毕后执行`run`
            # 其余情况则直接执行`run`
            ...
            if ...:
                ...
            else:
                try:
                    self.future = convert_yielded(yielded)
                except BadYieldError:
                    self.future = TracebackFuture()
                    self.future.set_exc_info(sys.exc_info())
    
            if not self.future.done() or self.future is moment:
                self.io_loop.add_future(
                    self.future, lambda f: self.run())
                return False
            return True
    
    特殊函数gen.Task

    gen.Task的操作就是将回调式异步函数的输出转换成future类型并返回, 目的是方便被yield. 函数会自动为执行函数设置回调, 回调的工作是将操作的返回值传递给内部创建的future. 其代码可以简化为:

    def Task(func, *args, **kwargs):
        future = Future()
        callback = lambda func_result: future.set_result(func_result)
        func(*args, callback=callback, **kwargs)
        return future
    

    本节内容就是这些, 下节内容将讨论Tornado内置的异步HTTP客户端.

    NEXT ===> Tornado应用笔记05-异步客户端

    相关文章

      网友评论

          本文标题:Tornado应用笔记04-浅析源码

          本文链接:https://www.haomeiwen.com/subject/ktpeqxtx.html