快乐的 asyncio 学习

作者: 宝宝家的隔壁老王 | 来源:发表于2018-02-10 02:40 被阅读80次

其实 asyncio 的学习一点也不快乐

一、python 的多线程和多进程

要想理解 asyncio 的异步编程,需要简单了解一下 python 的多线程和多进程知识

  • 1、多线程

python 有 GIL 机制,因此,python 的多线程操作并非真正意义的多线程,而仅仅在线程处于睡眠或者等待 I/O 时,才会发挥真正的多线程功能。

  • 1.1、睡眠
    • time.sleep()
    • threading.Lock
    • 线程模块其他同步对象
  • 1.2、I/O
    • requests
    • open
  • 1.3、定期放弃GIL
    • py2 解释器每执行 1000 字节码释放 GIL
    • py3 解释器每执行 15ms 释放 GIL
  • 1.4、GIL 全局解释器锁
    • 一个线程运行 Python,而其他 N 个睡眠或者等待 I/O。(保证同一时刻只有一个线程对共享资源进行存取)
  • 1.5、GIL 原理
      /* s.connect((host, port)) method */
      static PyObject *
      sock_connect(PySocketSockObject *s, PyObject *addro)
      {
          sock_addr_t addrbuf;
          int addrlen;
          int res;
       
          /* convert (host, port) tuple to C address */
          getsockaddrarg(s, addro, SAS2SA(&addrbuf), &addrlen);
       
          Py_BEGIN_ALLOW_THREADS
          res = connect(s->sock_fd, addr, addrlen);
          Py_END_ALLOW_THREADS
       
          /* error handling and so on .... */
      }
    
    • Py_BEGIN_ALLOW_THREADS 放弃 GIL
    • Py_END_ALLOW_THREADS 重新获取 GIL,一个线程会在这个位置阻塞,等待另一个线程释放锁;一旦出现这个情况,等待的线程会抢夺回锁,并恢复python代码的执行
    • 简而言之:当N个线程在网络 I/O 堵塞,或等待重新获取GIL,而一个线程运行Python
  • 1.6、示例
    • 睡眠阻塞
      import time
      from threading import Thread
      from datetime import datetime
      
      def write(i):
          print('{} start write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), i))
          time.sleep(4)
          print('{} end write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), i))
      
      def fun():
          print('start ...')
          for i in range(3):
              Thread(target=write, args=(i,), daemon=False).start()
          print('end ...')
      # 输出结果
      start ...
      2018-02-09 23:58:25 start write --> 0
      2018-02-09 23:58:25 start write --> 1
      2018-02-09 23:58:25 start write --> 2
      end ...
      2018-02-09 23:58:29 end write --> 0
      2018-02-09 23:58:29 end write --> 1
      2018-02-09 23:58:29 end write --> 2
    
    • CPU 阻塞
      import time
      from threading import Thread
      from datetime import datetime
      
      def write(n):
          print('{} start write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), n))
          l, sum_ = list(range(100000000)), 0
          for i in l:
              sum_ += i
          print('{} end write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), n))
      
      def fun():
          print('start ...')
          for i in range(3):
              Thread(target=write, args=(i,), daemon=False).start()
          print('end ...')
      # 输出结果
      start ...
      2018-02-10 00:13:55 start write --> 0
      2018-02-10 00:13:58 start write --> 1
      2018-02-10 00:14:02 start write --> 2
      end ...
      2018-02-10 00:14:27 end write --> 0
      2018-02-10 00:14:32 end write --> 1
      2018-02-10 00:14:35 end write --> 2
    
    • 总结
      • 对于睡眠操作或者 I/O 操作,多线程的作用非常明显,明显减少所消耗总时间;
      • 对于 CPU 计算型操作,多线程操作反而因为多线程间获取 GIL 而增加总的消耗时间。
  • 2、python 多进程

python多进程即其他语言中的多进程概念,不再累述

二、异步编程思想
  • 1、协程(coroutine)
  • 2、任务(Task)
  • 3、事件循环(loop)
1、Task 对象包含 协程(coro) 和协程调用时间2个属性;
2、Loop 对象使用堆承载多个 Task 对象,根据 Task 对象中最小调用时间去执行对应的 coro。如果 coro 没有迭代完,则将此 coro 生成新的 task,然后 push 到 Loop 对象的堆中。

# 简单的调用示例
import asyncio

@asyncio.coroutine
def coro_fun():
    yield from range(10)

loop = asyncio.get_event_loop()

loop.run_until_complete(coro_fun()) 
# or
tasks = [asyncio.ensure_future(coro_fun())]
loop.run_until_complete(asyncio.wait(tasks))
三、源代码分析
  • 关于 _ code _.co_flags
    # 每个函数或方法都有 __code__ 魔法方法 以及其对应的 co_flags 值
    # 在 Cpython 中,
    1、生成器函数的标识符为 CO_GENERATOR 即 0x20,
    2、协程函数的标识符为 CO_COROUTINE 即 0x180
    3、CO_ITERABLE_COROUTINE 即 0x100
    
    # 通过对函数对象的 __code__.co_flags 与 对应的标识符做位与运算,如果是真值,则表明函数对象属于生成器函数或协程函数
    
    def gen_fun():
        yield from range(10)
    >>> gen_fun.__code__.co_flags  # 99
    >>> 99 & 0x20  # 32, True
    >>> 99 & 0x180  # 0, False
    
    async def asy_fun():
        await sleep(4)
    >>> asy_fun.__code__.co_flags  # 227
    >>> 99 & 0x20  # 32, True
    >>> 99 & 0x180  # 128, True
    
  • 关于类型判断
    from collections import Iterator, Awaitable
    # 判断迭代器 和 Awaitable 对象
    class A:
        def __iter__(self):
            return iter([1,2,3,4,5])
        def __await__(self):
            return iter([1,2,3,4,5])
    a = A()
    >>> isinstance(a, Iterator)  # True
    >>> isinstance(a, Awaitable)  # True
    
    # 判断是否为协程等
    import inspect
    async def asy_fun():
        await a
    >>> inspect.iscoroutine(asy_fun())  # True
    
  • @asyncio.coroutine
    def coroutine(func):
        # 将一个生成器标记为协程,如果在destroyed前没有调用,则会记录错误
    
        # 这个方法是使用 inspect.iscoroutinefunction 方法判断是否为协程方法,使用 types.coroutine 装饰的生成器,或 async def 语法定义的函数都会返回 True
        if _inspect_iscoroutinefunction(func):
            return func
    
        # 使用 co_flags 判断是否为生成器
        if inspect.isgeneratorfunction(func):
            coro = func
        else:
            @functools.wraps(func)
            def coro(*args, **kw):
                res = func(*args, **kw)
                
                # 判断 res 是否为期物,生成器 或 协程包装类 实例
                if isinstance(res, futures.Future) or inspect.isgenerator(res) or \
                        isinstance(res, CoroWrapper):
                    res = yield from res
    
                elif _AwaitableABC is not None:
                    # py 3.5 才会有 Awaitable 类
                    try:
                        # 如果有 __await__属性,__await__属性只会返回一个不是协程的迭代器
                        await_meth = res.__await__
                    except AttributeError:
                        pass
                    else:
                        # 如果是 Awaitable 对象
                        if isinstance(res, _AwaitableABC):
                            # 使用 yield from 处理其迭代器
                            res = yield from await_meth()
                return res
    
        # 使用 types.coroutine 包装 coro(注意,多层 @types.coroutine 装饰不会影响,会直接return装饰的值)
        if not _DEBUG:
            if _types_coroutine is None:
                wrapper = coro
            else:
                wrapper = _types_coroutine(coro)
        else:
            @functools.wraps(func)
            def wrapper(*args, **kwds):
              
                # 使用协程包装器处理
                w = CoroWrapper(coro(*args, **kwds), func=func)
                if w._source_traceback:
                    del w._source_traceback[-1]
                # 如果是 py 3.5 则包装增加 协程 对象的属性,否则包装为 生成器 对象的属性
                w.__name__ = getattr(func, '__name__', None)
                w.__qualname__ = getattr(func, '__qualname__', None)
                return w
        
        # 用以别处使用 asyncio.iscoroutinefunction() 判断为 True 的作用
        wrapper._is_coroutine = True  # For iscoroutinefunction().
        return wrapper
    
  • @types.coroutine
    def coroutine(func):
      # 将一个普通的生成器函数转化为协程
    
      if not callable(func):
          raise TypeError('types.coroutine() expects a callable')
    
      if (func.__class__ is FunctionType and
          getattr(func, '__code__', None).__class__ is CodeType):
    
          # 获取函数的 co_flags
          co_flags = func.__code__.co_flags
    
          # 检查是否为协程函数
          if co_flags & 0x180:
              return func
    
          # 检查是否为生成器函数,此步主要作用是将生成器的 co_flags 同 0x100 做位或运算,将其标识变更为协程标识
          if co_flags & 0x20:
              # TODO: Implement this in C.
              co = func.__code__
              func.__code__ = CodeType(
                  co.co_argcount, co.co_kwonlyargcount, co.co_nlocals,
                  co.co_stacksize,
                  co.co_flags | 0x100,  # 0x100 == CO_ITERABLE_COROUTINE
                  co.co_code,
                  co.co_consts, co.co_names, co.co_varnames, co.co_filename,
                  co.co_name, co.co_firstlineno, co.co_lnotab, co.co_freevars,
                  co.co_cellvars)
              return func
    
      # 用以支持类似生成器的对象
    
      @_functools.wraps(func)
      def wrapped(*args, **kwargs):
          coro = func(*args, **kwargs)
    
          # 协程或 co_flags 大于 256 的生成器对象,直接返回
          if (coro.__class__ is CoroutineType or
              coro.__class__ is GeneratorType and coro.gi_code.co_flags & 0x100):
              return coro
          if (isinstance(coro, _collections_abc.Generator) and
              not isinstance(coro, _collections_abc.Coroutine)):
              # 实现了生成器抽象类的方法,使用生成器包装器处理成生成器
              return _GeneratorWrapper(coro)
          # 协程抽象类实例或其他对象
          return coro
    
      return wrapped
    
  • asyncio.get_event_loop
    • 1、DefaultEventLoopPolicy().get_event_loop()
      # DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
      
      class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
      # 事件循环且监听子进程
      _loop_factory = _UnixSelectorEventLoop
      
      def __init__(self):
        super().__init__()
        self._watcher = None
      
      def _init_watcher(self):
        with events._lock:
            if self._watcher is None:  # pragma: no branch
                self._watcher = SafeChildWatcher()
                if isinstance(threading.current_thread(),
                              threading._MainThread):
                    self._watcher.attach_loop(self._local._loop)
      
      def set_event_loop(self, loop):
        # 如果子监听已经设置,那么在主线程中调用 .set_event_loop() 会在子监听中调用 .attach_loop(loop)
        super().set_event_loop(loop)
      
        if self._watcher is not None and \
            isinstance(threading.current_thread(), threading._MainThread):
            self._watcher.attach_loop(loop)
      
      def get_child_watcher(self):
        """
        Get the watcher for child processes.
        If not yet set, a SafeChildWatcher object is automatically created.
        """
        if self._watcher is None:
            self._init_watcher()
      
        return self._watcher
      
      def set_child_watcher(self, watcher):
        """Set the watcher for child processes."""
      
        assert watcher is None or isinstance(watcher, AbstractChildWatcher)
      
        if self._watcher is not None:
            self._watcher.close()
      
        self._watcher = watcher
      
    • 2、父类BaseDefaultEventLoopPolicy().get_event_loop()
      class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
      
      # 此 policy 下,每一个线程拥有独立的事件循环,然而我们仅会默认在主线程创建一个事件循环,其他线程默认没有事件循环
      # 其他 policy 下有不同的规则(e.g. 一个全局的事件循环,或者每一个线程自动创建一个事件循环,或者使用其他上下文将事件循环关联起来)
      
      _loop_factory = None
      
      class _Local(threading.local):
          _loop = None
          _set_called = False
      
      def __init__(self):
          self._local = self._Local()
      
      def get_event_loop(self):
          # 如果主线程 且 self._local._loop == None 且 self._local._set_called == False
          if (self._local._loop is None and
              not self._local._set_called and
              isinstance(threading.current_thread(), threading._MainThread)):
              self.set_event_loop(self.new_event_loop())
          if self._local._loop is None:
              raise RuntimeError('There is no current event loop in thread %r.'
                                 % threading.current_thread().name)
          return self._local._loop
      
      def set_event_loop(self, loop):
          # 设置事件循环
          self._local._set_called = True
          assert loop is None or isinstance(loop, AbstractEventLoop)
          self._local._loop = loop
      
      def new_event_loop(self):
          # 调用 set_event_loop() 设置,因为还牵扯到 _local._set_called 的设置
          return self._loop_factory()
      
    • 3、_UnixSelectorEventLoop 事件循环工厂类
      class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
      """Unix event loop.
      
      Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
      """
      
         def __init__(self, selector=None):
             super().__init__(selector)
             self._signal_handlers = {}
      
         def _socketpair(self):
             return socket.socketpair()
      
         def close(self):
             super().close()
             for sig in list(self._signal_handlers):
                 self.remove_signal_handler(sig)
      
         def _process_self_data(self, data):
             for signum in data:
                 if not signum:
                     # ignore null bytes written by _write_to_self()
                     continue
                 self._handle_signal(signum)
      
         def add_signal_handler(self, sig, callback, *args):
             """Add a handler for a signal.  UNIX only.
      
             Raise ValueError if the signal number is invalid or uncatchable.
             Raise RuntimeError if there is a problem setting up the handler.
             """
             if (coroutines.iscoroutine(callback)
             or coroutines.iscoroutinefunction(callback)):
                 raise TypeError("coroutines cannot be used "
                                 "with add_signal_handler()")
             self._check_signal(sig)
             self._check_closed()
             try:
                 # set_wakeup_fd() raises ValueError if this is not the
                 # main thread.  By calling it early we ensure that an
                 # event loop running in another thread cannot add a signal
                 # handler.
                 signal.set_wakeup_fd(self._csock.fileno())
             except (ValueError, OSError) as exc:
                 raise RuntimeError(str(exc))
      
             handle = events.Handle(callback, args, self)
             self._signal_handlers[sig] = handle
      
             try:
                 # Register a dummy signal handler to ask Python to write the signal
                 # number in the wakup file descriptor. _process_self_data() will
                 # read signal numbers from this file descriptor to handle signals.
                 signal.signal(sig, _sighandler_noop)
      
                 # Set SA_RESTART to limit EINTR occurrences.
                 signal.siginterrupt(sig, False)
             except OSError as exc:
                 del self._signal_handlers[sig]
                 if not self._signal_handlers:
                     try:
                         signal.set_wakeup_fd(-1)
                     except (ValueError, OSError) as nexc:
                         logger.info('set_wakeup_fd(-1) failed: %s', nexc)
      
                 if exc.errno == errno.EINVAL:
                     raise RuntimeError('sig {} cannot be caught'.format(sig))
                 else:
                     raise
      
         def _handle_signal(self, sig):
             """Internal helper that is the actual signal handler."""
             handle = self._signal_handlers.get(sig)
             if handle is None:
                 return  # Assume it's some race condition.
             if handle._cancelled:
                 self.remove_signal_handler(sig)  # Remove it properly.
             else:
                 self._add_callback_signalsafe(handle)
      
         def remove_signal_handler(self, sig):
             """Remove a handler for a signal.  UNIX only.
      
             Return True if a signal handler was removed, False if not.
             """
             self._check_signal(sig)
             try:
                 del self._signal_handlers[sig]
             except KeyError:
                 return False
      
             if sig == signal.SIGINT:
                 handler = signal.default_int_handler
             else:
                 handler = signal.SIG_DFL
      
             try:
                 signal.signal(sig, handler)
             except OSError as exc:
                 if exc.errno == errno.EINVAL:
                     raise RuntimeError('sig {} cannot be caught'.format(sig))
                 else:
                     raise
      
             if not self._signal_handlers:
                 try:
                     signal.set_wakeup_fd(-1)
                 except (ValueError, OSError) as exc:
                     logger.info('set_wakeup_fd(-1) failed: %s', exc)
      
             return True
      
         def _check_signal(self, sig):
             """Internal helper to validate a signal.
      
             Raise ValueError if the signal number is invalid or uncatchable.
             Raise RuntimeError if there is a problem setting up the handler.
             """
             if not isinstance(sig, int):
                 raise TypeError('sig must be an int, not {!r}'.format(sig))
      
             if not (1 <= sig < signal.NSIG):
                 raise ValueError(
                     'sig {} out of range(1, {})'.format(sig, signal.NSIG))
      
         def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
                                       extra=None):
             return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
      
         def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
                                        extra=None):
             return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
      
         @coroutine
         def _make_subprocess_transport(self, protocol, args, shell,
                                        stdin, stdout, stderr, bufsize,
                                        extra=None, **kwargs):
             with events.get_child_watcher() as watcher:
                 waiter = self.create_future()
                 transp = _UnixSubprocessTransport(self, protocol, args, shell,
                                                   stdin, stdout, stderr, bufsize,
                                                   waiter=waiter, extra=extra,
                                                   **kwargs)
      
                 watcher.add_child_handler(transp.get_pid(),
                                           self._child_watcher_callback, transp)
                 try:
                     yield from waiter
                 except Exception as exc:
                     # Workaround CPython bug #23353: using yield/yield-from in an
                     # except block of a generator doesn't clear properly
                     # sys.exc_info()
                     err = exc
                 else:
                     err = None
      
                 if err is not None:
                     transp.close()
                     yield from transp._wait()
                     raise err
      
             return transp
      
         def _child_watcher_callback(self, pid, returncode, transp):
             self.call_soon_threadsafe(transp._process_exited, returncode)
      
         @coroutine
         def create_unix_connection(self, protocol_factory, path, *,
                                    ssl=None, sock=None,
                                    server_hostname=None):
             assert server_hostname is None or isinstance(server_hostname, str)
             if ssl:
                 if server_hostname is None:
                     raise ValueError(
                         'you have to pass server_hostname when using ssl')
             else:
                 if server_hostname is not None:
                     raise ValueError('server_hostname is only meaningful with ssl')
      
             if path is not None:
                 if sock is not None:
                     raise ValueError(
                         'path and sock can not be specified at the same time')
      
                 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
                 try:
                     sock.setblocking(False)
                     yield from self.sock_connect(sock, path)
                 except:
                     sock.close()
                     raise
      
             else:
                 if sock is None:
                     raise ValueError('no path and sock were specified')
                 sock.setblocking(False)
      
             transport, protocol = yield from self._create_connection_transport(
                 sock, protocol_factory, ssl, server_hostname)
             return transport, protocol
      
         @coroutine
         def create_unix_server(self, protocol_factory, path=None, *,
                                sock=None, backlog=100, ssl=None):
             if isinstance(ssl, bool):
                 raise TypeError('ssl argument must be an SSLContext or None')
      
             if path is not None:
                 if sock is not None:
                     raise ValueError(
                         'path and sock can not be specified at the same time')
      
                 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
      
                 try:
                     sock.bind(path)
                 except OSError as exc:
                     sock.close()
                     if exc.errno == errno.EADDRINUSE:
                         # Let's improve the error message by adding
                         # with what exact address it occurs.
                         msg = 'Address {!r} is already in use'.format(path)
                         raise OSError(errno.EADDRINUSE, msg) from None
                     else:
                         raise
                 except:
                     sock.close()
                     raise
             else:
                 if sock is None:
                     raise ValueError(
                         'path was not specified, and no sock specified')
      
                 if sock.family != socket.AF_UNIX:
                     raise ValueError(
                         'A UNIX Domain Socket was expected, got {!r}'.format(sock))
      
             server = base_events.Server(self, [sock])
             sock.listen(backlog)
             sock.setblocking(False)
             self._start_serving(protocol_factory, sock, ssl, server)
             return server
      
  • BaseEventLoop().run_until_complete
    def run_until_complete(self, future):
          """ 
          1、参数为协程,使用asyncio.wait()值也是协程
          2、会将协程包装成期物
          3、运行直到期物状态为 done
          """
    
          # 检查 loop 是否 close
          self._check_closed()
    
          new_task = not isinstance(future, futures.Future)
          future = tasks.ensure_future(future, loop=self)
          if new_task:
              # An exception is raised if the future didn't complete, so there
              # is no need to log the "destroy pending task" message
              future._log_destroy_pending = False
    
          # 添加 done 回调
          future.add_done_callback(_run_until_complete_cb)
          try:
              self.run_forever()
          except:
              if new_task and future.done() and not future.cancelled():
                  # The coroutine raised a BaseException. Consume the exception
                  # to not log a warning, the caller doesn't have access to the
                  # local task.
                  future.exception()
              raise
          future.remove_done_callback(_run_until_complete_cb)
          if not future.done():
              raise RuntimeError('Event loop stopped before Future completed.')
    
          return future.result()
    
  • asyncio.ensure_future
    def ensure_future(coro_or_future, *, loop=None):
      
      # 把一个协程或者awaitable对象包装成期物(Future)
      if isinstance(coro_or_future, futures.Future):
          if loop is not None and loop is not coro_or_future._loop:
              raise ValueError('loop argument must agree with Future')
          # 直接返回
          return coro_or_future
    
      # 判断是否为生成器或协程包装器
      elif coroutines.iscoroutine(coro_or_future):
          if loop is None:
              loop = events.get_event_loop()
          # 创建 task 对象
          task = loop.create_task(coro_or_future)
          if task._source_traceback:
              del task._source_traceback[-1]
          return task
      
      # py 3.5下,判断对象是否为 awaitable 对象
      elif compat.PY35 and inspect.isawaitable(coro_or_future):
          return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
      else:
          raise TypeError('A Future, a coroutine or an awaitable is required')
    
  • asyncio.wait
    @coroutine
    def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
      """
      fs 参数是 futures 或者 coroutines 序列,且不能为空
      return two sets of Future: (done, pending).
      用法:
          done, pending = yield from asyncio.wait(fs)
      """
      if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
          raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
      if not fs:
          raise ValueError('Set of coroutines/Futures is empty.')
      if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
          raise ValueError('Invalid return_when value: {}'.format(return_when))
    
      if loop is None:
          loop = events.get_event_loop()
    
      # 把 fs 中的参数转化为期物 future 对象的集合
      fs = {ensure_future(f, loop=loop) for f in set(fs)}
    
      return (yield from _wait(fs, timeout, return_when, loop))
    
  • asyncio._wait
    @coroutine
    def _wait(fs, timeout, return_when, loop):
      # fs 必须是集合
      assert fs, 'Set of Futures is empty.'
      
      # 创建一个 future 对象,附加到当前 loop 上
      waiter = loop.create_future()
      timeout_handle = None
    
      if timeout is not None:
          # 在给定的时间,进行超时回调
          timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
      counter = len(fs)
    
      def _on_completion(f):
          # 声明使用外层变量 counter
          nonlocal counter
          counter -= 1
          """ 
          以下三种状态均表示 waiter 期物运行结束
          1、所有的期物都运行完成 
          2、return_when 值为 FIRST_COMPLETED
          3、return_when 值为 FIRST_EXCEPTION 并且期物没有被取消,且 f.exception()值存在
          """
          if (counter <= 0 or
              return_when == FIRST_COMPLETED or
              return_when == FIRST_EXCEPTION and (not f.cancelled() and
                                                  f.exception() is not None)):
              # 如果超时回调函数存在,则取消
              if timeout_handle is not None:
                  timeout_handle.cancel()
              # 变更 waiter 期物的状态为结束
              if not waiter.done():
                  waiter.set_result(None)
    
      for f in fs:
          # 给期物添加运行完成后的回调
          f.add_done_callback(_on_completion)
    
      try:
          yield from waiter
      finally:
          if timeout_handle is not None:
              timeout_handle.cancel()
    
      done, pending = set(), set()
      for f in fs:
          # 将 _on_completion 从 done 回调列表移除
          f.remove_done_callback(_on_completion)
          if f.done():
              done.add(f)
          else:
              pending.add(f)
      return done, pending
    

相关文章

网友评论

    本文标题:快乐的 asyncio 学习

    本文链接:https://www.haomeiwen.com/subject/esxctftx.html