这篇文章从简单的几行代码开始,一步步跟进asyncio的执行过程。
测试代码
import asyncio
from concurrent import futures
from asyncio.unix_events import _UnixDefaultEventLoopPolicy
from asyncio.base_events import BaseEventLoop
from asyncio.tasks import Task
from asyncio.events import Handle
from asyncio.futures import Future
async def test():
await asyncio.sleep(3)
print('hello')
loop = asyncio.get_event_loop()
import asyncio.futures
import asyncio.coroutines
import asyncio.tasks
print(asyncio.futures.isfuture(test())) # False
print(asyncio.coroutines.iscoroutine(test())) # True
loop.run_until_complete(test())
asyncio.get_event_loop()
初始化Loop,是查看源码的第一步
def get_event_loop():
"""Return an asyncio event loop.
When called from a coroutine or a callback (e.g. scheduled with call_soon
or similar API), this function will always return the running event loop.
If there is no running event loop set, the function will return
the result of `get_event_loop_policy().get_event_loop()` call.
"""
current_loop = _get_running_loop()
if current_loop is not None:
return current_loop
return get_event_loop_policy().get_event_loop()
- return get_event_loop_policy().get_event_loop()
get_event_loop_policy()是_UnixDefaultEventLoopPolicy()
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory = _UnixSelectorEventLoop
def set_event_loop(self, loop):
super().set_event_loop(loop)
if self._watcher is not None and \
isinstance(threading.current_thread(), threading._MainThread):
self._watcher.attach_loop(loop)
def get_event_loop(self):
"""Get the event loop.
This may be None or an instance of EventLoop.
"""
if (self._local._loop is None and
not self._local._set_called and
isinstance(threading.current_thread(), threading._MainThread)):
self.set_event_loop(self.new_event_loop())
if self._local._loop is None:
raise RuntimeError('There is no current event loop in thread %r.'
% threading.current_thread().name)
return self._local._loop
get_event_loop里面调用了set_event_loop,set_event_loop调用new_event_loop,new_event_loop调用了_loop_factory
2 . _UnixSelectorEventLoop继承关系
class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop)
class BaseSelectorEventLoop(base_events.BaseEventLoop)
class BaseEventLoop(events.AbstractEventLoop):
class AbstractEventLoop
loop.run_until_complete(test())
将test()封装为Future,或者说Task,存到消息队列里面去,下一次循环执行这个test(),由于是一个类生成器函数,需要Task来控制这个生成器的恢复执行开始执行,一个Future执行完后(set_result),会添加它之前设置好的callback到Loop里面。这个回调通常用来恢复test()协程函数。
loop是_UnixSelectorEventLoop,此方法在BaseEventLoop里
def run_until_complete(self, future):
"""Run until the Future is done.
If the argument is a coroutine, it is wrapped in a Task.
WARNING: It would be disastrous to call run_until_complete()
with the same coroutine twice -- it would wrap it in two
different Tasks and that can't be good.
Return the Future's result, or raise its exception.
"""
self._check_closed()
new_task = not futures.isfuture(future)
future = tasks.ensure_future(future, loop=self)
if new_task:
# An exception is raised if the future didn't complete, so there
# is no need to log the "destroy pending task" message
future._log_destroy_pending = False
future.add_done_callback(_run_until_complete_cb)
try:
self.run_forever()
except:
if new_task and future.done() and not future.cancelled():
# The coroutine raised a BaseException. Consume the exception
# to not log a warning, the caller doesn't have access to the
# local task.
future.exception()
raise
finally:
future.remove_done_callback(_run_until_complete_cb)
if not future.done():
raise RuntimeError('Event loop stopped before Future completed.')
return future.result()
1.调用 future = tasks.ensure_future(future, loop=self)
def ensure_future(coro_or_future, *, loop=None):
"""Wrap a coroutine or an awaitable in a future.
If the argument is a Future, it is returned directly.
"""
if futures.isfuture(coro_or_future):
if loop is not None and loop is not coro_or_future._loop:
raise ValueError('loop argument must agree with Future')
return coro_or_future
elif coroutines.iscoroutine(coro_or_future):
if loop is None:
loop = events.get_event_loop()
task = loop.create_task(coro_or_future)
if task._source_traceback:
del task._source_traceback[-1]
return task
elif compat.PY35 and inspect.isawaitable(coro_or_future):
return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
else:
raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
'required')
会走到第二个判断逻辑,执行 task = loop.create_task(coro_or_future),create_task函数如下,创建task实例,然后返回。
def create_task(self, coro):
"""Schedule a coroutine object.
Return a task object.
"""
self._check_closed()
if self._task_factory is None:
task = tasks.Task(coro, loop=self)
if task._source_traceback:
del task._source_traceback[-1]
else:
task = self._task_factory(self, coro)
return task
在创建Task实例的时候,构造函数做了如下
def __init__():
self._loop.call_soon(self.__step, context=self._context)
_register_task(self)
call_soon方法是BaseEventLoop的方法,它内部调用_call_soon,做了如下
def _call_soon(self, callback, args):
handle = events.Handle(callback, args, self)
if handle._source_traceback:
del handle._source_traceback[-1]
self._ready.append(handle)
return handle
把回调(Task的_setup方法)封装为一个handler,添加到_ready容器里面去,接下来再看Loop的轮询检查这个容器,提取回调,并且执行。
2 开始轮询run_forever()
run_forever会调用_run_once(),下面是run_once()的一个片段
...
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
try:
self._current_handle = handle
t0 = self.time()
handle._run()
dt = self.time() - t0
if dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
finally:
self._current_handle = None
else:
handle._run()
不断的从_ready读取需要执行的handle(回调,也就是Task),也就是说,执行Task的_setup函数。可以说这个_setup函数是用来执行一个协程的。
class Task(futures.Future):
try:
if exc is None:
# We use the `send` method directly, because coroutines
# don't have `__iter__` and `__next__` methods.
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
if self._must_cancel:
# Task is cancelled right before coro stops.
self._must_cancel = False
self.set_exception(futures.CancelledError())
else:
self.set_result(exc.value)
可以看到,协程不过就是生成器,coro.send(None)来执行生成器,当生成器没有yield了,调用Future的set_result。set_result又会造成一系列连串反应,再次调用Loop的call_soon将后续要做的事情方法回调添加到_ready里面,下一次Loop轮询就会执行到。这样,一个Future的执行使命就完成了,同时,他还会返回执行好的结果给yield from/await的左值,其源码在Future的__iter__
里面:
def __iter__(self):
if not self.done():
self._asyncio_future_blocking = True
yield self # This tells Task to wait for completion.
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too. 返回值
if compat.PY35:
__await__ = __iter__ # make compatible with 'await' expression
也可以看到,最后一行,为了兼容await语法,增加了__await__
魔法函数。这样,你写的
res = await asyncio.sleep(3, result='test result')
res就能够顺利的得到值了。
能够得到信息
根据上面的简单源码分析,我们知道
- 必须把异步函数加入Loop才能达到协程的效果,使用Loop.gather()、ensure_future()、create_task()等
- Task继承自Future, 都可以调用add_done_callback(),一个Task里面维护了协程函数,Task调度这个函数
- Loop的call_soon只是将回调添加进队列里面,下次循环就会取出来执行,类似的方法还有call_at,call_later
- 每一次yield,会交出执行权,每一次send,会得到执行权。
- Future和Task的_setp方法是尤为重要的实现
网友评论