美文网首页程序员
python异步编程小抄

python异步编程小抄

作者: 小餐包 | 来源:发表于2022-09-04 17:52 被阅读0次

基础

异步调用的原理,是在单个线程中通过切换任务(就像单线程的函数切换,负担很小,性能很好)来达到并发的效果。相较于线程这种比较重的并发方式,异步调用可以大大提升IO密集型任务的执行效率,达到非常高的并发量。python中异步调用的最小单位是协程,随着异步编程在python3各个版本中不断的迭代,使用异步编程变得越来越简单,因此,我们有必要好好掌握python中异步编程相关的知识。

两个关键字

async用来声明一个协程函数(async def),调用协程函数的结果是返回一个协程对象(coroutine)。

await用来异步等待一个协程的返回,只能在协程函数内部使用。await时意味着将函数控制权交换给event loop。举个例子,当我们在g()函数内部遇到await f()时,就会将暂时挂起g()的执行直到 f()返回,与此同时,让event loop中的其他函数继续执行。

常用对象

Future:

Future对象是用来模仿concurrent.futures包中的Future对象的,除了一小部分API有差异外,他们的API基本上兼容。Future对象代表一个任务的结果,注意这里的结果可以是未执行的结果或者时一个执行异常。源代码中是这样描述这个对象的:

class Future(object):
    """
    This class is *almost* compatible with concurrent.futures.Future.
    
        Differences:
    
        - result() and exception() do not take a timeout argument and
          raise an exception when the future isn't done yet.
    
        - Callbacks registered with add_done_callback() are always called
          via the event loop's call_soon_threadsafe().
    
        - This class is not compatible with the wait() and as_completed()
          methods in the concurrent.futures package.
    """

Task:

一个和Future对象类似的协程对象,非线程安全,查看源代码可以看到TaskFuture的子类,因此 Future对象不一定是一个Task对象, 但Task对象一定是个Future对象

class Task(Future):
    """ A coroutine wrapped in a Future. """

EventLoop:

管理和分配不同Task的执行,Task需要注册到EventLoo以后才能被调度执行到。你可以把它看成是某个监控着协程空闲、可执行等运行状态,并且能根据某个协程等待的事件变为可执行时唤醒这些空闲的协程的While True的循环。Loop是可插拔(替换)的,也就是说,你可以自己实现一个事件循环来代替的默认的事件循环,比如Linux系统上非常著名的uvloop,使用下面代码即可替换EventLoop实现:

import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

uvloop是基于libuv库(nodejs)使用Cython编写的,拥有比自带的event loop更高的性能,遗憾的是你只能在*nixpython3.5+的环境中使用它。

常用方法

asyncio.run()

在python3.7中引入的新方法,会自动创建event loop并且执行run_until_complete,同时在执行结束时会自动关闭event loop,在引入该方法之前,你可能需要使用如下代码来执行一个简单的协程:

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

如果你需要对未完成的任务执行cancel()方法,那么还需要在另外写一些代码来处理它们。而asyncio.run()方法将这些代码范式进行了封装,使得调用协程变得不需要再写这些模板式的代码了。

asyncio.gather(*aws, loop=None, return_exceptions=False)

这个方法将协程(准确的说是awaitable对象,因此也可以是future对象)集合统一放到一个future对象里面,并且将协程的结果统一在一个列表中返回。如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。

asyncio.ensure_future vs asyncio.create_task

asyncio.ensure_future虽然名字里带了future,但通常它返回的对象是一个Task对象(除非传入的obj对象本身就是一个Future对象),这是一个有点反直觉且经常容易混淆的点,看下面的例子:

import asyncio


async def foo():
    print("before foo await")
    await asyncio.sleep(1)
    print("after foo await")
    return "foo"


async def bar():
    print("before bar await")
    await asyncio.sleep(1)
    print("after bar await")
    return "bar"


async def popo():
    print("before popo await")
    await asyncio.sleep(1)
    print("after popo await")
    return "popo"


async def set_after(fut, delay, value):
    # Sleep for *delay* seconds.
    await asyncio.sleep(delay)
    # Set *value* as a result of *fut* Future.
    fut.set_result(value)


async def main():
    print("running main")
    task1 = asyncio.create_task(foo())
    task2 = asyncio.create_task(bar())
    fut1 = asyncio.ensure_future(popo())
    loop = asyncio.get_running_loop()
    fut2 = loop.create_future()
    loop.create_task(
        set_after(fut2, 1, '... world'))
    print(isinstance(task1, asyncio.Future))
    print(isinstance(fut1, asyncio.Task))
    print(isinstance(fut2, asyncio.Task))
    print(isinstance(fut2, asyncio.Future))
    await task1
    await task2
    await fut1
    await fut2
    print("exiting main")


asyncio.run(main())

输出如下, 注意第三行和第四行的输出:

running main
True
True
False
True
before foo await
before bar await
before popo await
after foo await
after popo await
after bar await
exiting main

因此,python 3.7 及之后版本都推荐使用asyncio.create_task方法,这个方法限制了传入的对象必须是一个协程对象。

其他常用类

Asyncio.Queue

对于并发编程,经常需要使用队列来将负载分配到多个任务上,比如经典的生产者-消费者模式,asyncio包同样提了Queue对象来满足这类需求,参考官方的代码示例:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())

其中的几个方法需要单独说明一下:

put_nowait(item): 将item放到队列中,非阻塞性操作,当队列满时,将抛出QueueFull的异常;

put(item):将item放到队列中,阻塞性操作, 当队列满时,将一直等待直到有空位;

get_nowait(): 从队列中获取一个item,非阻塞性操作,当队列为空时,将抛出QueueEmpty的异常。

get(): 从队列中获取一个item,阻塞性操作,当队列为空时,将一直等待直到有item可用;

task_done(): 该方法通常由消费者处理,用来表示从队列获取的任务已经完成。对于每一个通过get()从队列获取的任务,调用该方法会告知队列任务处理完成;

join(): 阻塞直到队列中的所有item都已经被处理过。每个item添加到队列中时,未完成任务的计数就会增加;当这些任务调用task_done()时,这个计数就会减少;当未完成任务计数为0时,join()将不再阻塞。

Asyncio.Semaphore

异步编程处理IO密集型的任务时具有很好的性能,但有时我们也会希望限制一下并发量,这时候就可以使用信号量来达到这个目的。基本用法可以参考官方的代码示例:

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

相关文章

网友评论

    本文标题:python异步编程小抄

    本文链接:https://www.haomeiwen.com/subject/ypemnrtx.html