快乐的 asyncio 学习

作者: 宝宝家的隔壁老王 | 来源:发表于2018-02-10 02:40 被阅读80次

    其实 asyncio 的学习一点也不快乐

    一、python 的多线程和多进程

    要想理解 asyncio 的异步编程,需要简单了解一下 python 的多线程和多进程知识

    • 1、多线程

    python 有 GIL 机制,因此,python 的多线程操作并非真正意义的多线程,而仅仅在线程处于睡眠或者等待 I/O 时,才会发挥真正的多线程功能。

    • 1.1、睡眠
      • time.sleep()
      • threading.Lock
      • 线程模块其他同步对象
    • 1.2、I/O
      • requests
      • open
    • 1.3、定期放弃GIL
      • py2 解释器每执行 1000 字节码释放 GIL
      • py3 解释器每执行 15ms 释放 GIL
    • 1.4、GIL 全局解释器锁
      • 一个线程运行 Python,而其他 N 个睡眠或者等待 I/O。(保证同一时刻只有一个线程对共享资源进行存取)
    • 1.5、GIL 原理
        /* s.connect((host, port)) method */
        static PyObject *
        sock_connect(PySocketSockObject *s, PyObject *addro)
        {
            sock_addr_t addrbuf;
            int addrlen;
            int res;
         
            /* convert (host, port) tuple to C address */
            getsockaddrarg(s, addro, SAS2SA(&addrbuf), &addrlen);
         
            Py_BEGIN_ALLOW_THREADS
            res = connect(s->sock_fd, addr, addrlen);
            Py_END_ALLOW_THREADS
         
            /* error handling and so on .... */
        }
      
      • Py_BEGIN_ALLOW_THREADS 放弃 GIL
      • Py_END_ALLOW_THREADS 重新获取 GIL,一个线程会在这个位置阻塞,等待另一个线程释放锁;一旦出现这个情况,等待的线程会抢夺回锁,并恢复python代码的执行
      • 简而言之:当N个线程在网络 I/O 堵塞,或等待重新获取GIL,而一个线程运行Python
    • 1.6、示例
      • 睡眠阻塞
        import time
        from threading import Thread
        from datetime import datetime
        
        def write(i):
            print('{} start write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), i))
            time.sleep(4)
            print('{} end write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), i))
        
        def fun():
            print('start ...')
            for i in range(3):
                Thread(target=write, args=(i,), daemon=False).start()
            print('end ...')
        # 输出结果
        start ...
        2018-02-09 23:58:25 start write --> 0
        2018-02-09 23:58:25 start write --> 1
        2018-02-09 23:58:25 start write --> 2
        end ...
        2018-02-09 23:58:29 end write --> 0
        2018-02-09 23:58:29 end write --> 1
        2018-02-09 23:58:29 end write --> 2
      
      • CPU 阻塞
        import time
        from threading import Thread
        from datetime import datetime
        
        def write(n):
            print('{} start write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), n))
            l, sum_ = list(range(100000000)), 0
            for i in l:
                sum_ += i
            print('{} end write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), n))
        
        def fun():
            print('start ...')
            for i in range(3):
                Thread(target=write, args=(i,), daemon=False).start()
            print('end ...')
        # 输出结果
        start ...
        2018-02-10 00:13:55 start write --> 0
        2018-02-10 00:13:58 start write --> 1
        2018-02-10 00:14:02 start write --> 2
        end ...
        2018-02-10 00:14:27 end write --> 0
        2018-02-10 00:14:32 end write --> 1
        2018-02-10 00:14:35 end write --> 2
      
      • 总结
        • 对于睡眠操作或者 I/O 操作,多线程的作用非常明显,明显减少所消耗总时间;
        • 对于 CPU 计算型操作,多线程操作反而因为多线程间获取 GIL 而增加总的消耗时间。
    • 2、python 多进程

    python多进程即其他语言中的多进程概念,不再累述

    二、异步编程思想
    • 1、协程(coroutine)
    • 2、任务(Task)
    • 3、事件循环(loop)
    1、Task 对象包含 协程(coro) 和协程调用时间2个属性;
    2、Loop 对象使用堆承载多个 Task 对象,根据 Task 对象中最小调用时间去执行对应的 coro。如果 coro 没有迭代完,则将此 coro 生成新的 task,然后 push 到 Loop 对象的堆中。
    
    # 简单的调用示例
    import asyncio
    
    @asyncio.coroutine
    def coro_fun():
        yield from range(10)
    
    loop = asyncio.get_event_loop()
    
    loop.run_until_complete(coro_fun()) 
    # or
    tasks = [asyncio.ensure_future(coro_fun())]
    loop.run_until_complete(asyncio.wait(tasks))
    
    三、源代码分析
    • 关于 _ code _.co_flags
      # 每个函数或方法都有 __code__ 魔法方法 以及其对应的 co_flags 值
      # 在 Cpython 中,
      1、生成器函数的标识符为 CO_GENERATOR 即 0x20,
      2、协程函数的标识符为 CO_COROUTINE 即 0x180
      3、CO_ITERABLE_COROUTINE 即 0x100
      
      # 通过对函数对象的 __code__.co_flags 与 对应的标识符做位与运算,如果是真值,则表明函数对象属于生成器函数或协程函数
      
      def gen_fun():
          yield from range(10)
      >>> gen_fun.__code__.co_flags  # 99
      >>> 99 & 0x20  # 32, True
      >>> 99 & 0x180  # 0, False
      
      async def asy_fun():
          await sleep(4)
      >>> asy_fun.__code__.co_flags  # 227
      >>> 99 & 0x20  # 32, True
      >>> 99 & 0x180  # 128, True
      
    • 关于类型判断
      from collections import Iterator, Awaitable
      # 判断迭代器 和 Awaitable 对象
      class A:
          def __iter__(self):
              return iter([1,2,3,4,5])
          def __await__(self):
              return iter([1,2,3,4,5])
      a = A()
      >>> isinstance(a, Iterator)  # True
      >>> isinstance(a, Awaitable)  # True
      
      # 判断是否为协程等
      import inspect
      async def asy_fun():
          await a
      >>> inspect.iscoroutine(asy_fun())  # True
      
    • @asyncio.coroutine
      def coroutine(func):
          # 将一个生成器标记为协程,如果在destroyed前没有调用,则会记录错误
      
          # 这个方法是使用 inspect.iscoroutinefunction 方法判断是否为协程方法,使用 types.coroutine 装饰的生成器,或 async def 语法定义的函数都会返回 True
          if _inspect_iscoroutinefunction(func):
              return func
      
          # 使用 co_flags 判断是否为生成器
          if inspect.isgeneratorfunction(func):
              coro = func
          else:
              @functools.wraps(func)
              def coro(*args, **kw):
                  res = func(*args, **kw)
                  
                  # 判断 res 是否为期物,生成器 或 协程包装类 实例
                  if isinstance(res, futures.Future) or inspect.isgenerator(res) or \
                          isinstance(res, CoroWrapper):
                      res = yield from res
      
                  elif _AwaitableABC is not None:
                      # py 3.5 才会有 Awaitable 类
                      try:
                          # 如果有 __await__属性,__await__属性只会返回一个不是协程的迭代器
                          await_meth = res.__await__
                      except AttributeError:
                          pass
                      else:
                          # 如果是 Awaitable 对象
                          if isinstance(res, _AwaitableABC):
                              # 使用 yield from 处理其迭代器
                              res = yield from await_meth()
                  return res
      
          # 使用 types.coroutine 包装 coro(注意,多层 @types.coroutine 装饰不会影响,会直接return装饰的值)
          if not _DEBUG:
              if _types_coroutine is None:
                  wrapper = coro
              else:
                  wrapper = _types_coroutine(coro)
          else:
              @functools.wraps(func)
              def wrapper(*args, **kwds):
                
                  # 使用协程包装器处理
                  w = CoroWrapper(coro(*args, **kwds), func=func)
                  if w._source_traceback:
                      del w._source_traceback[-1]
                  # 如果是 py 3.5 则包装增加 协程 对象的属性,否则包装为 生成器 对象的属性
                  w.__name__ = getattr(func, '__name__', None)
                  w.__qualname__ = getattr(func, '__qualname__', None)
                  return w
          
          # 用以别处使用 asyncio.iscoroutinefunction() 判断为 True 的作用
          wrapper._is_coroutine = True  # For iscoroutinefunction().
          return wrapper
      
    • @types.coroutine
      def coroutine(func):
        # 将一个普通的生成器函数转化为协程
      
        if not callable(func):
            raise TypeError('types.coroutine() expects a callable')
      
        if (func.__class__ is FunctionType and
            getattr(func, '__code__', None).__class__ is CodeType):
      
            # 获取函数的 co_flags
            co_flags = func.__code__.co_flags
      
            # 检查是否为协程函数
            if co_flags & 0x180:
                return func
      
            # 检查是否为生成器函数,此步主要作用是将生成器的 co_flags 同 0x100 做位或运算,将其标识变更为协程标识
            if co_flags & 0x20:
                # TODO: Implement this in C.
                co = func.__code__
                func.__code__ = CodeType(
                    co.co_argcount, co.co_kwonlyargcount, co.co_nlocals,
                    co.co_stacksize,
                    co.co_flags | 0x100,  # 0x100 == CO_ITERABLE_COROUTINE
                    co.co_code,
                    co.co_consts, co.co_names, co.co_varnames, co.co_filename,
                    co.co_name, co.co_firstlineno, co.co_lnotab, co.co_freevars,
                    co.co_cellvars)
                return func
      
        # 用以支持类似生成器的对象
      
        @_functools.wraps(func)
        def wrapped(*args, **kwargs):
            coro = func(*args, **kwargs)
      
            # 协程或 co_flags 大于 256 的生成器对象,直接返回
            if (coro.__class__ is CoroutineType or
                coro.__class__ is GeneratorType and coro.gi_code.co_flags & 0x100):
                return coro
            if (isinstance(coro, _collections_abc.Generator) and
                not isinstance(coro, _collections_abc.Coroutine)):
                # 实现了生成器抽象类的方法,使用生成器包装器处理成生成器
                return _GeneratorWrapper(coro)
            # 协程抽象类实例或其他对象
            return coro
      
        return wrapped
      
    • asyncio.get_event_loop
      • 1、DefaultEventLoopPolicy().get_event_loop()
        # DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
        
        class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
        # 事件循环且监听子进程
        _loop_factory = _UnixSelectorEventLoop
        
        def __init__(self):
          super().__init__()
          self._watcher = None
        
        def _init_watcher(self):
          with events._lock:
              if self._watcher is None:  # pragma: no branch
                  self._watcher = SafeChildWatcher()
                  if isinstance(threading.current_thread(),
                                threading._MainThread):
                      self._watcher.attach_loop(self._local._loop)
        
        def set_event_loop(self, loop):
          # 如果子监听已经设置,那么在主线程中调用 .set_event_loop() 会在子监听中调用 .attach_loop(loop)
          super().set_event_loop(loop)
        
          if self._watcher is not None and \
              isinstance(threading.current_thread(), threading._MainThread):
              self._watcher.attach_loop(loop)
        
        def get_child_watcher(self):
          """
          Get the watcher for child processes.
          If not yet set, a SafeChildWatcher object is automatically created.
          """
          if self._watcher is None:
              self._init_watcher()
        
          return self._watcher
        
        def set_child_watcher(self, watcher):
          """Set the watcher for child processes."""
        
          assert watcher is None or isinstance(watcher, AbstractChildWatcher)
        
          if self._watcher is not None:
              self._watcher.close()
        
          self._watcher = watcher
        
      • 2、父类BaseDefaultEventLoopPolicy().get_event_loop()
        class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
        
        # 此 policy 下,每一个线程拥有独立的事件循环,然而我们仅会默认在主线程创建一个事件循环,其他线程默认没有事件循环
        # 其他 policy 下有不同的规则(e.g. 一个全局的事件循环,或者每一个线程自动创建一个事件循环,或者使用其他上下文将事件循环关联起来)
        
        _loop_factory = None
        
        class _Local(threading.local):
            _loop = None
            _set_called = False
        
        def __init__(self):
            self._local = self._Local()
        
        def get_event_loop(self):
            # 如果主线程 且 self._local._loop == None 且 self._local._set_called == False
            if (self._local._loop is None and
                not self._local._set_called and
                isinstance(threading.current_thread(), threading._MainThread)):
                self.set_event_loop(self.new_event_loop())
            if self._local._loop is None:
                raise RuntimeError('There is no current event loop in thread %r.'
                                   % threading.current_thread().name)
            return self._local._loop
        
        def set_event_loop(self, loop):
            # 设置事件循环
            self._local._set_called = True
            assert loop is None or isinstance(loop, AbstractEventLoop)
            self._local._loop = loop
        
        def new_event_loop(self):
            # 调用 set_event_loop() 设置,因为还牵扯到 _local._set_called 的设置
            return self._loop_factory()
        
      • 3、_UnixSelectorEventLoop 事件循环工厂类
        class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
        """Unix event loop.
        
        Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
        """
        
           def __init__(self, selector=None):
               super().__init__(selector)
               self._signal_handlers = {}
        
           def _socketpair(self):
               return socket.socketpair()
        
           def close(self):
               super().close()
               for sig in list(self._signal_handlers):
                   self.remove_signal_handler(sig)
        
           def _process_self_data(self, data):
               for signum in data:
                   if not signum:
                       # ignore null bytes written by _write_to_self()
                       continue
                   self._handle_signal(signum)
        
           def add_signal_handler(self, sig, callback, *args):
               """Add a handler for a signal.  UNIX only.
        
               Raise ValueError if the signal number is invalid or uncatchable.
               Raise RuntimeError if there is a problem setting up the handler.
               """
               if (coroutines.iscoroutine(callback)
               or coroutines.iscoroutinefunction(callback)):
                   raise TypeError("coroutines cannot be used "
                                   "with add_signal_handler()")
               self._check_signal(sig)
               self._check_closed()
               try:
                   # set_wakeup_fd() raises ValueError if this is not the
                   # main thread.  By calling it early we ensure that an
                   # event loop running in another thread cannot add a signal
                   # handler.
                   signal.set_wakeup_fd(self._csock.fileno())
               except (ValueError, OSError) as exc:
                   raise RuntimeError(str(exc))
        
               handle = events.Handle(callback, args, self)
               self._signal_handlers[sig] = handle
        
               try:
                   # Register a dummy signal handler to ask Python to write the signal
                   # number in the wakup file descriptor. _process_self_data() will
                   # read signal numbers from this file descriptor to handle signals.
                   signal.signal(sig, _sighandler_noop)
        
                   # Set SA_RESTART to limit EINTR occurrences.
                   signal.siginterrupt(sig, False)
               except OSError as exc:
                   del self._signal_handlers[sig]
                   if not self._signal_handlers:
                       try:
                           signal.set_wakeup_fd(-1)
                       except (ValueError, OSError) as nexc:
                           logger.info('set_wakeup_fd(-1) failed: %s', nexc)
        
                   if exc.errno == errno.EINVAL:
                       raise RuntimeError('sig {} cannot be caught'.format(sig))
                   else:
                       raise
        
           def _handle_signal(self, sig):
               """Internal helper that is the actual signal handler."""
               handle = self._signal_handlers.get(sig)
               if handle is None:
                   return  # Assume it's some race condition.
               if handle._cancelled:
                   self.remove_signal_handler(sig)  # Remove it properly.
               else:
                   self._add_callback_signalsafe(handle)
        
           def remove_signal_handler(self, sig):
               """Remove a handler for a signal.  UNIX only.
        
               Return True if a signal handler was removed, False if not.
               """
               self._check_signal(sig)
               try:
                   del self._signal_handlers[sig]
               except KeyError:
                   return False
        
               if sig == signal.SIGINT:
                   handler = signal.default_int_handler
               else:
                   handler = signal.SIG_DFL
        
               try:
                   signal.signal(sig, handler)
               except OSError as exc:
                   if exc.errno == errno.EINVAL:
                       raise RuntimeError('sig {} cannot be caught'.format(sig))
                   else:
                       raise
        
               if not self._signal_handlers:
                   try:
                       signal.set_wakeup_fd(-1)
                   except (ValueError, OSError) as exc:
                       logger.info('set_wakeup_fd(-1) failed: %s', exc)
        
               return True
        
           def _check_signal(self, sig):
               """Internal helper to validate a signal.
        
               Raise ValueError if the signal number is invalid or uncatchable.
               Raise RuntimeError if there is a problem setting up the handler.
               """
               if not isinstance(sig, int):
                   raise TypeError('sig must be an int, not {!r}'.format(sig))
        
               if not (1 <= sig < signal.NSIG):
                   raise ValueError(
                       'sig {} out of range(1, {})'.format(sig, signal.NSIG))
        
           def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
                                         extra=None):
               return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
        
           def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
                                          extra=None):
               return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
        
           @coroutine
           def _make_subprocess_transport(self, protocol, args, shell,
                                          stdin, stdout, stderr, bufsize,
                                          extra=None, **kwargs):
               with events.get_child_watcher() as watcher:
                   waiter = self.create_future()
                   transp = _UnixSubprocessTransport(self, protocol, args, shell,
                                                     stdin, stdout, stderr, bufsize,
                                                     waiter=waiter, extra=extra,
                                                     **kwargs)
        
                   watcher.add_child_handler(transp.get_pid(),
                                             self._child_watcher_callback, transp)
                   try:
                       yield from waiter
                   except Exception as exc:
                       # Workaround CPython bug #23353: using yield/yield-from in an
                       # except block of a generator doesn't clear properly
                       # sys.exc_info()
                       err = exc
                   else:
                       err = None
        
                   if err is not None:
                       transp.close()
                       yield from transp._wait()
                       raise err
        
               return transp
        
           def _child_watcher_callback(self, pid, returncode, transp):
               self.call_soon_threadsafe(transp._process_exited, returncode)
        
           @coroutine
           def create_unix_connection(self, protocol_factory, path, *,
                                      ssl=None, sock=None,
                                      server_hostname=None):
               assert server_hostname is None or isinstance(server_hostname, str)
               if ssl:
                   if server_hostname is None:
                       raise ValueError(
                           'you have to pass server_hostname when using ssl')
               else:
                   if server_hostname is not None:
                       raise ValueError('server_hostname is only meaningful with ssl')
        
               if path is not None:
                   if sock is not None:
                       raise ValueError(
                           'path and sock can not be specified at the same time')
        
                   sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
                   try:
                       sock.setblocking(False)
                       yield from self.sock_connect(sock, path)
                   except:
                       sock.close()
                       raise
        
               else:
                   if sock is None:
                       raise ValueError('no path and sock were specified')
                   sock.setblocking(False)
        
               transport, protocol = yield from self._create_connection_transport(
                   sock, protocol_factory, ssl, server_hostname)
               return transport, protocol
        
           @coroutine
           def create_unix_server(self, protocol_factory, path=None, *,
                                  sock=None, backlog=100, ssl=None):
               if isinstance(ssl, bool):
                   raise TypeError('ssl argument must be an SSLContext or None')
        
               if path is not None:
                   if sock is not None:
                       raise ValueError(
                           'path and sock can not be specified at the same time')
        
                   sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        
                   try:
                       sock.bind(path)
                   except OSError as exc:
                       sock.close()
                       if exc.errno == errno.EADDRINUSE:
                           # Let's improve the error message by adding
                           # with what exact address it occurs.
                           msg = 'Address {!r} is already in use'.format(path)
                           raise OSError(errno.EADDRINUSE, msg) from None
                       else:
                           raise
                   except:
                       sock.close()
                       raise
               else:
                   if sock is None:
                       raise ValueError(
                           'path was not specified, and no sock specified')
        
                   if sock.family != socket.AF_UNIX:
                       raise ValueError(
                           'A UNIX Domain Socket was expected, got {!r}'.format(sock))
        
               server = base_events.Server(self, [sock])
               sock.listen(backlog)
               sock.setblocking(False)
               self._start_serving(protocol_factory, sock, ssl, server)
               return server
        
    • BaseEventLoop().run_until_complete
      def run_until_complete(self, future):
            """ 
            1、参数为协程,使用asyncio.wait()值也是协程
            2、会将协程包装成期物
            3、运行直到期物状态为 done
            """
      
            # 检查 loop 是否 close
            self._check_closed()
      
            new_task = not isinstance(future, futures.Future)
            future = tasks.ensure_future(future, loop=self)
            if new_task:
                # An exception is raised if the future didn't complete, so there
                # is no need to log the "destroy pending task" message
                future._log_destroy_pending = False
      
            # 添加 done 回调
            future.add_done_callback(_run_until_complete_cb)
            try:
                self.run_forever()
            except:
                if new_task and future.done() and not future.cancelled():
                    # The coroutine raised a BaseException. Consume the exception
                    # to not log a warning, the caller doesn't have access to the
                    # local task.
                    future.exception()
                raise
            future.remove_done_callback(_run_until_complete_cb)
            if not future.done():
                raise RuntimeError('Event loop stopped before Future completed.')
      
            return future.result()
      
    • asyncio.ensure_future
      def ensure_future(coro_or_future, *, loop=None):
        
        # 把一个协程或者awaitable对象包装成期物(Future)
        if isinstance(coro_or_future, futures.Future):
            if loop is not None and loop is not coro_or_future._loop:
                raise ValueError('loop argument must agree with Future')
            # 直接返回
            return coro_or_future
      
        # 判断是否为生成器或协程包装器
        elif coroutines.iscoroutine(coro_or_future):
            if loop is None:
                loop = events.get_event_loop()
            # 创建 task 对象
            task = loop.create_task(coro_or_future)
            if task._source_traceback:
                del task._source_traceback[-1]
            return task
        
        # py 3.5下,判断对象是否为 awaitable 对象
        elif compat.PY35 and inspect.isawaitable(coro_or_future):
            return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
        else:
            raise TypeError('A Future, a coroutine or an awaitable is required')
      
    • asyncio.wait
      @coroutine
      def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
        """
        fs 参数是 futures 或者 coroutines 序列,且不能为空
        return two sets of Future: (done, pending).
        用法:
            done, pending = yield from asyncio.wait(fs)
        """
        if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
            raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
        if not fs:
            raise ValueError('Set of coroutines/Futures is empty.')
        if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
            raise ValueError('Invalid return_when value: {}'.format(return_when))
      
        if loop is None:
            loop = events.get_event_loop()
      
        # 把 fs 中的参数转化为期物 future 对象的集合
        fs = {ensure_future(f, loop=loop) for f in set(fs)}
      
        return (yield from _wait(fs, timeout, return_when, loop))
      
    • asyncio._wait
      @coroutine
      def _wait(fs, timeout, return_when, loop):
        # fs 必须是集合
        assert fs, 'Set of Futures is empty.'
        
        # 创建一个 future 对象,附加到当前 loop 上
        waiter = loop.create_future()
        timeout_handle = None
      
        if timeout is not None:
            # 在给定的时间,进行超时回调
            timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        counter = len(fs)
      
        def _on_completion(f):
            # 声明使用外层变量 counter
            nonlocal counter
            counter -= 1
            """ 
            以下三种状态均表示 waiter 期物运行结束
            1、所有的期物都运行完成 
            2、return_when 值为 FIRST_COMPLETED
            3、return_when 值为 FIRST_EXCEPTION 并且期物没有被取消,且 f.exception()值存在
            """
            if (counter <= 0 or
                return_when == FIRST_COMPLETED or
                return_when == FIRST_EXCEPTION and (not f.cancelled() and
                                                    f.exception() is not None)):
                # 如果超时回调函数存在,则取消
                if timeout_handle is not None:
                    timeout_handle.cancel()
                # 变更 waiter 期物的状态为结束
                if not waiter.done():
                    waiter.set_result(None)
      
        for f in fs:
            # 给期物添加运行完成后的回调
            f.add_done_callback(_on_completion)
      
        try:
            yield from waiter
        finally:
            if timeout_handle is not None:
                timeout_handle.cancel()
      
        done, pending = set(), set()
        for f in fs:
            # 将 _on_completion 从 done 回调列表移除
            f.remove_done_callback(_on_completion)
            if f.done():
                done.add(f)
            else:
                pending.add(f)
        return done, pending
      

    相关文章

      网友评论

        本文标题:快乐的 asyncio 学习

        本文链接:https://www.haomeiwen.com/subject/esxctftx.html