本文介绍asyncio的基本用法,asyncio通过event loop机制疯狂调度和执行各个coroutine
关键字-async/await
-
async def
用于定义一个coroutine. 一个coroutine需要被压入一个事件循环才能被执行,方式有两种:the code inside the coroutine function won't run until you await on the function, or run it as a task to a loop -
await
用于等待一个awaitable对象,例如future实例和coroutine实例。更多future vs coroutine的介绍见于下文协程中使用await可以让出cpu,让event loop可以去执行其他协程
当一个线程中的 asyncio 事件循环遇到
await
表达式时,线程的事件循环机制会暂停当前协程的执行,并尝试运行其他就绪的协程。在这个过程中,线程仍然处于运行状态。然而,由于事件循环可以在协程之间切换,这使得线程能够在等待 I/O 或其他异步操作期间执行其他任务
基本概念-future vs coroutine
- 相同点:两者都是awaitable对象
- 不同点:
实现方式上存在根本差异-
Future 是一个表示未来结果的对象,它通常由一个
asyncio
任务(通过asyncio.create_task
或者loop.create_task
创建)或其他低级别的异步操作创建,且通常用于封装底层的异步操作如网络请求、文件 I/O 等下面给出一个关于future创建和使用的基本示例
import asyncio async def main(): # return the loop instance specific to the current thread loop = asyncio.get_event_loop() # create an default future future = loop.create_future() async def set_future_result(): await asyncio.sleep(1) future.set_result("Hello, world!") print(f"future done: {future.done()}") # use another coroutine to set future loop.create_task(set_future_result()) try: print("coro main waiting") result = await future print("coro main wait done") print(f"result is {result}") except Exception as e: print(f"An error occurred: {e}") loop = asyncio.new_event_loop() loop.run_until_complete(main()) loop.close()
程序输出:
coro main waiting future done: True coro main wait done result is Hello, world!
asyncio.sleep
这个coroutine的实现逻辑正是运用了类似上面的future创建和使用逻辑async def sleep(delay, result=None, *, loop=None): """Coroutine that completes after a given time (in seconds).""" if loop is not None: warnings.warn("The loop argument is deprecated since Python 3.8, " "and scheduled for removal in Python 3.10.", DeprecationWarning, stacklevel=2) if delay <= 0: await __sleep0() return result if loop is None: loop = events.get_running_loop() future = loop.create_future() h = loop.call_later(delay, futures._set_result_unless_cancelled, future, result) try: return await future finally: h.cancel()
-
coroutine 是一个使用
async def
定义的特殊函数- 它的内部可以使用(但不强制使用)
await
关键字等待其他 coroutine 或Future
对象的返回值 - 一个coroutine必须 1. 通过
loop.create_task
提交到loop实例的任务队列,2. 或者被其他coroutine await以加入loop实例的任务队列。该coroutine才能够被loop调度运行
注意,coroutine内部逻辑不可以【同步阻塞】当前线程,否则整个事件循环将被阻塞
下面给出两个sleep功能的coroutine实现,包括阻塞式实现和非阻塞式实现
import asyncio import time async def blocking_sleep(): """ blocking_sleep 调用 time.sleep(1) 阻塞当前线程 当前线程将被操作系统挂起1s,该线程上运行的事件循环自然随之被阻塞 """ time.sleep(1) async def non_blocking_sleep(): """ non_blocking_sleep 不会阻塞当前线程 因为asyncio.sleep(1)本身是一个coroutine,其底层逻辑是让 thread-specific loop 执行一个延时协程。整个过程涉及多次await主动让出cpu,从而避免阻塞该线程上运行的事件循环 """ await asyncio.sleep(1)
- 它的内部可以使用(但不强制使用)
-
loop.run_until_complete vs loop.run_forever
它们都是启动一个event loop的入口方法
-
run_until_complete(future)
:这个方法用于运行事件循环,直到给定的协程或 asyncio.Future 对象完成。当协程完成时,run_until_complete 将返回协程的结果(如果有),并停止事件循环。这是在事件循环中执行单个协程的主要方法
-
run_forever()
这个方法用于运行事件循环,直到显式停止。它不接受协程或 Future 对象作为参数。当你调用 run_forever() 时,事件循环将开始一直运行,直到你显式调用 loop.stop() 方法。这个方法通常用于长时间运行的应用程序,例如网络服务器或后台任务处理器。
为了在 run_forever() 模式下执行协程,你需要使用 asyncio.create_task() 或 loop.create_task() 在事件循环中创建任务。这些任务将在事件循环运行时被调度执行。
其他注意事项
create_task不会启动事件循环
loop.create_task()
asyncio.create_task
都是用于将一个 coroutine(协程)包装为一个 Task
对象并将其加入到事件循环中。Task
对象是 asyncio.Future
的子类
方法本身不会启动事件循环,它只是将协程加入到事件循环的待执行队列中。要启动事件循环并运行协程,你需要使用如 loop.run_until_complete()
或 loop.run_forever()
等方法
loop.run_in_executor vs async.to_thread
两者都会启动另一个独立线程去完成一些阻塞逻辑,区别在于:
-
执行时间
-
loop.run_in_executor
即时起了一个线程,直接开始执行,返回一个future,这个future按需await -
async.to_thread
将新线程包装成coroutine并返回,必须await或者loop run才能被调度执行
-
-
是否依赖loop
- 独立线程与事件循环所在线程相互独立互不影响,因此
loop.run_in_executor()
会马上在executor中执行线程逻辑,而不依赖loop的启动 -
async.to_thread
将新线程包装成coroutine,必须在loop启动后才能被调度执行
- 独立线程与事件循环所在线程相互独立互不影响,因此
loop的线程安全问题
在 Python 的 asyncio 库中,事件循环不是线程安全的,这意味着在多线程环境中使用事件循环可能会导致未定义的行为或错误。以下是一些与事件循环线程安全相关的问题和注意事项:
-
不要多线程共享同一个事件循环实例。每个线程应该有自己的事件循环实例
-
不要在一个线程中操作另一个线程的事件循环实例。例如,不要在一个线程中调用另一个线程的事件循环的
create_task()
、run_until_complete()
等方法。这可能会导致竞态条件或其他线程安全问题如果你确实需要在一个线程中与另一个线程的事件循环交互,可以使用
asyncio.run_coroutine_threadsafe()
函数。这个函数可以安全地【将一个协程加入到另一线程的事件循环中】,并返回一个线程安全的concurrent.futures.Future
对象。你可以在当前线程中等待这个 Future 对象,以获取协程的执行结果 -
对于 I/O-bound 任务或其他长时间运行的操作,可以使用
loop.run_in_executor()
asyncio.to_thread()
方法将I/O-bound任务委托给别的线程执行
以下代码演示了如何在线程B中正确操作线程A中的future对象
import asyncio
import time
async def main():
# return the loop instance specific to the current thread
loop = asyncio.get_event_loop()
# create an default future
future = loop.create_future()
def set_future_result():
time.sleep(1)
future.set_result("Hello, world!")
print(f"future done: {future.done()}")
# # use common thread to set future -> not allowed since thread unsafe
# import threading
# t = threading.Thread(target=set_future_result)
# t.start()
# use loop.run_in_executor instead
loop.run_in_executor(None, set_future_result)
try:
print("coro main waiting")
result = await future
print("coro main wait done")
print(f"result is {result}")
except Exception as e:
print(f"An error occurred: {e}")
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
loop.close()
asyncio基本用法示例
asyncio就是使用一个loop
对象,对协程进行event loop式执行
下面给出一个代码示例,代码逻辑为
- 创建一个
loop
对象 -
submit_coroutines_blocking
不断向loop
提交任务协程 - 注意,
submit_coroutines_blocking
是阻塞的,因此通过asyncio.to_thread
委托给另一独立线程执行。asyncio.to_thread
返回的是一个coroutine,需要loop启动后才能被调度执行 - 通过
loop.run_until_complete
或者loop.run_forever
启动loop
。(注意,run_forever
表示让loop
对象一直进行事件循环。调用loop.run_forever
之前不一定需要先往loop
进行create_task塞入任务,可以先空跑起来,然后通过其他手段往loop
里塞任务)
import asyncio
import time
# 任务协程
async def my_coroutine(i):
print(f"eventloop thread: Started coroutine {i}")
await asyncio.sleep(5)
print(f"eventloop thread: Finished coroutine {i}")
def submit_coroutines_blocking(loop: asyncio.AbstractEventLoop):
"""
submit_coroutines_blocking 是一个无限循环
每次循环将向loop提交一个任务协程,并且通过time.sleep来模拟阻塞状态
"""
i = 0
try:
while loop.is_running():
# `submit_coroutines_blocking`将在一个单独的线程中运行,因此不能直接通过`loop.create_task(my_coroutine(i))`访问原线程的loop对象
# loop.create_task(my_coroutine(i))
# -> use asyncio.run_coroutine_threadsafe instead
asyncio.run_coroutine_threadsafe(my_coroutine(i), loop)
print(f"separated thread: submit coroutine {i} to loop, and going to sleep 2")
time.sleep(2) # 阻塞 2 秒提交一个新的协程
i += 1
except Exception as e:
print(f"Caught exception: {e}")
loop.stop()
async def submit_coroutines_coro(loop: asyncio.AbstractEventLoop):
# 将阻塞式的submit_coroutines_blocking委托给单独的线程处理
await asyncio.to_thread(submit_coroutines_blocking, loop)
# await loop.run_in_executor(None, submit_coroutines_blocking, loop) # is ok too
loop = asyncio.new_event_loop()
loop.run_until_complete(submit_coroutines_coro(loop))
#####
# or
# loop.create_task(submit_coroutines_coro(loop))
# loop.run_forever()
#####
输出如下:
separated thread: submit coroutine 0 to loop, and going to sleep 2
eventloop thread: Started coroutine 0
separated thread: submit coroutine 1 to loop, and going to sleep 2
eventloop thread: Started coroutine 1
separated thread: submit coroutine 2 to loop, and going to sleep 2
eventloop thread: Started coroutine 2
eventloop thread: Finished coroutine 0
separated thread: submit coroutine 3 to loop, and going to sleep 2
eventloop thread: Started coroutine 3
eventloop thread: Finished coroutine 1
separated thread: submit coroutine 4 to loop, and going to sleep 2
eventloop thread: Started coroutine 4
eventloop thread: Finished coroutine 2
separated thread: submit coroutine 5 to loop, and going to sleep 2
eventloop thread: Started coroutine 5
eventloop thread: Finished coroutine 3
...
当然,我们也可以借助loop.run_in_executor
把submit_coroutines_blocking
放到一个单独线程中执行
import asyncio
import time
# 任务协程
async def my_coroutine(i):
print(f"eventloop thread: Started coroutine {i}")
await asyncio.sleep(5)
print(f"eventloop thread: Finished coroutine {i}")
def submit_coroutines_blocking(loop: asyncio.AbstractEventLoop):
"""
submit_coroutines_blocking 是一个无限循环
每次循环将向loop提交一个任务协程,并且通过time.sleep来模拟阻塞状态
"""
i = 0
try:
while True:
if not loop.is_running():
print("loop is not running, continue")
time.sleep(2) # 阻塞 2 秒提交一个新的协程
continue
# `submit_coroutines_blocking`将在一个单独的线程中运行,因此不能直接通过`loop.create_task(my_coroutine(i))`访问原线程的loop对象
# loop.create_task(my_coroutine(i)) # is wrong -> use asyncio.run_coroutine_threadsafe instead
asyncio.run_coroutine_threadsafe(my_coroutine(i), loop)
print(f"separated thread: submit coroutine {i} to loop, and going to sleep 2")
time.sleep(2) # 阻塞 2 秒提交一个新的协程
i += 1
except Exception as e:
print(f"Caught exception: {e}")
loop.stop()
loop = asyncio.new_event_loop()
# 开启一个独立线程,向loop提交task,线程立刻执行
loop.run_in_executor(None, submit_coroutines_blocking, loop)
loop.run_forever()
网友评论