asyncio源码-01

作者: 霡霂976447044 | 来源:发表于2019-07-01 16:25 被阅读3次

这篇文章从简单的几行代码开始,一步步跟进asyncio的执行过程。

测试代码


import asyncio

from concurrent import futures
from asyncio.unix_events import _UnixDefaultEventLoopPolicy
from asyncio.base_events import BaseEventLoop
from asyncio.tasks import Task
from asyncio.events import Handle
from asyncio.futures import Future

async def test():
    await asyncio.sleep(3)
    print('hello')


loop = asyncio.get_event_loop()

import asyncio.futures
import asyncio.coroutines
import asyncio.tasks

print(asyncio.futures.isfuture(test()))  # False
print(asyncio.coroutines.iscoroutine(test()))  # True


loop.run_until_complete(test())

asyncio.get_event_loop()

初始化Loop,是查看源码的第一步

def get_event_loop():
    """Return an asyncio event loop.

    When called from a coroutine or a callback (e.g. scheduled with call_soon
    or similar API), this function will always return the running event loop.

    If there is no running event loop set, the function will return
    the result of `get_event_loop_policy().get_event_loop()` call.
    """
    current_loop = _get_running_loop()
    if current_loop is not None:
        return current_loop
    return get_event_loop_policy().get_event_loop()
  1. return get_event_loop_policy().get_event_loop()
    get_event_loop_policy()是_UnixDefaultEventLoopPolicy()
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
    _loop_factory = _UnixSelectorEventLoop
    def set_event_loop(self, loop):
        super().set_event_loop(loop)

        if self._watcher is not None and \
            isinstance(threading.current_thread(), threading._MainThread):
            self._watcher.attach_loop(loop)
     def get_event_loop(self):
        """Get the event loop.

        This may be None or an instance of EventLoop.
        """
        if (self._local._loop is None and
            not self._local._set_called and
            isinstance(threading.current_thread(), threading._MainThread)):
            self.set_event_loop(self.new_event_loop())
        if self._local._loop is None:
            raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)
        return self._local._loop

get_event_loop里面调用了set_event_loop,set_event_loop调用new_event_loop,new_event_loop调用了_loop_factory
2 . _UnixSelectorEventLoop继承关系

class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop)
class BaseSelectorEventLoop(base_events.BaseEventLoop)
class BaseEventLoop(events.AbstractEventLoop):
class AbstractEventLoop

loop.run_until_complete(test())

将test()封装为Future,或者说Task,存到消息队列里面去,下一次循环执行这个test(),由于是一个类生成器函数,需要Task来控制这个生成器的恢复执行开始执行,一个Future执行完后(set_result),会添加它之前设置好的callback到Loop里面。这个回调通常用来恢复test()协程函数。

loop是_UnixSelectorEventLoop,此方法在BaseEventLoop里

    def run_until_complete(self, future):
        """Run until the Future is done.

        If the argument is a coroutine, it is wrapped in a Task.

        WARNING: It would be disastrous to call run_until_complete()
        with the same coroutine twice -- it would wrap it in two
        different Tasks and that can't be good.

        Return the Future's result, or raise its exception.
        """
        self._check_closed()

        new_task = not futures.isfuture(future)
        future = tasks.ensure_future(future, loop=self)
        if new_task:
            # An exception is raised if the future didn't complete, so there
            # is no need to log the "destroy pending task" message
            future._log_destroy_pending = False

        future.add_done_callback(_run_until_complete_cb)
        try:
            self.run_forever()
        except:
            if new_task and future.done() and not future.cancelled():
                # The coroutine raised a BaseException. Consume the exception
                # to not log a warning, the caller doesn't have access to the
                # local task.
                future.exception()
            raise
        finally:
            future.remove_done_callback(_run_until_complete_cb)
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')

        return future.result()

1.调用 future = tasks.ensure_future(future, loop=self)

def ensure_future(coro_or_future, *, loop=None):
    """Wrap a coroutine or an awaitable in a future.

    If the argument is a Future, it is returned directly.
    """
    if futures.isfuture(coro_or_future):
        if loop is not None and loop is not coro_or_future._loop:
            raise ValueError('loop argument must agree with Future')
        return coro_or_future
    elif coroutines.iscoroutine(coro_or_future):
        if loop is None:
            loop = events.get_event_loop()
        task = loop.create_task(coro_or_future)
        if task._source_traceback:
            del task._source_traceback[-1]
        return task
    elif compat.PY35 and inspect.isawaitable(coro_or_future):
        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
    else:
        raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
                        'required')

会走到第二个判断逻辑,执行 task = loop.create_task(coro_or_future),create_task函数如下,创建task实例,然后返回。

    def create_task(self, coro):
        """Schedule a coroutine object.

        Return a task object.
        """
        self._check_closed()
        if self._task_factory is None:
            task = tasks.Task(coro, loop=self)
            if task._source_traceback:
                del task._source_traceback[-1]
        else:
            task = self._task_factory(self, coro)
        return task

在创建Task实例的时候,构造函数做了如下

def __init__():
        self._loop.call_soon(self.__step, context=self._context)
        _register_task(self)

call_soon方法是BaseEventLoop的方法,它内部调用_call_soon,做了如下

    def _call_soon(self, callback, args):
        handle = events.Handle(callback, args, self)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        self._ready.append(handle)
        return handle

把回调(Task的_setup方法)封装为一个handler,添加到_ready容器里面去,接下来再看Loop的轮询检查这个容器,提取回调,并且执行。

2 开始轮询run_forever()
run_forever会调用_run_once(),下面是run_once()的一个片段

...
       ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            if handle._cancelled:
                continue
            if self._debug:
                try:
                    self._current_handle = handle
                    t0 = self.time()
                    handle._run()
                    dt = self.time() - t0
                    if dt >= self.slow_callback_duration:
                        logger.warning('Executing %s took %.3f seconds',
                                       _format_handle(handle), dt)
                finally:
                    self._current_handle = None
            else:
                handle._run()

不断的从_ready读取需要执行的handle(回调,也就是Task),也就是说,执行Task的_setup函数。可以说这个_setup函数是用来执行一个协程的。

class Task(futures.Future):
     try:
            if exc is None:
                # We use the `send` method directly, because coroutines
                # don't have `__iter__` and `__next__` methods.
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        except StopIteration as exc:
            if self._must_cancel:
                # Task is cancelled right before coro stops.
                self._must_cancel = False
                self.set_exception(futures.CancelledError())
            else:
                self.set_result(exc.value)

可以看到,协程不过就是生成器,coro.send(None)来执行生成器,当生成器没有yield了,调用Future的set_result。set_result又会造成一系列连串反应,再次调用Loop的call_soon将后续要做的事情方法回调添加到_ready里面,下一次Loop轮询就会执行到。这样,一个Future的执行使命就完成了,同时,他还会返回执行好的结果给yield from/await的左值,其源码在Future的__iter__里面:

    def __iter__(self):
        if not self.done():
            self._asyncio_future_blocking = True
            yield self  # This tells Task to wait for completion.
        assert self.done(), "yield from wasn't used with future"
        return self.result()  # May raise too. 返回值

    if compat.PY35:
        __await__ = __iter__ # make compatible with 'await' expression

也可以看到,最后一行,为了兼容await语法,增加了__await__魔法函数。这样,你写的

res = await asyncio.sleep(3, result='test result')

res就能够顺利的得到值了。

能够得到信息

根据上面的简单源码分析,我们知道

  • 必须把异步函数加入Loop才能达到协程的效果,使用Loop.gather()、ensure_future()、create_task()等
  • Task继承自Future, 都可以调用add_done_callback(),一个Task里面维护了协程函数,Task调度这个函数
  • Loop的call_soon只是将回调添加进队列里面,下次循环就会取出来执行,类似的方法还有call_at,call_later
  • 每一次yield,会交出执行权,每一次send,会得到执行权。
  • Future和Task的_setp方法是尤为重要的实现

相关文章

网友评论

    本文标题:asyncio源码-01

    本文链接:https://www.haomeiwen.com/subject/xjbwcctx.html