尽管asyncio应用通常作为单线程运行,不过仍被构建为并发应用。由于I/O以及其他外部事件的延迟和中断,每个协程或任务可能按一种不可预知的顺序执行。为了支持安全的并发执行,asyncio包含了threading和multiprocessing模块中的一些底层原语的实现。
1.锁(LOCK)
锁可以用来保护对一个共享资源的访问。只有锁的持有者可以使用这个资源。如果有多个请求要的到这个锁,那么其将会阻塞,以保证一次只有一个持有者。
import asyncio
from functoolsimport partial
def unlock(lock):
print("callback释放锁")
lock.release()
async def coro1(lock):
print("并行中,coro1等待锁")
# with语句有自动释放锁的作用
async with lock:
print("coro1被锁了")
print("coro1的锁释放了",lock.locked())
async def coro2(lock):
print("并行中,coro2等待锁")
# 等待加锁
try:
await lock.acquire()
print(f"当前coro2是否被锁", lock.locked())
finally:
lock.release()
print("coro2的锁释放了")
async def coro3(lock):
print("并行中,coro3等待锁")
try:
print("coro3没有加锁加试图释放")
lock.release()
except RuntimeError as e:
print("触发RuntimeError的错误")
import time
async def main(loop):
lock= asyncio.Lock()
loop.call_later(0.1, partial(unlock, lock))
print("等待协程")
await asyncio.wait([coro1(lock), coro2(lock), coro3(lock)])
if __name__== '__main__':
loop= asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop))
finally:
loop.close()
结论:
lock.acquire需要使用await。
lock.release不需要加await。
在加锁之前coro1和coro2 ,coro3是并发执行的。
锁有两种使用方式和像coro1一样通过async with 异步上下文关键字进行锁定,还可以通过coro2那种通过await方式使用acquire加锁,结束的时候使用release释放锁。
如果没有使用acquire进行加锁,就试图使用release去释放,将触发RuntimeError的异常,像coro3协程一样。
2.事件(Event)
asyncio.Event基于threading.Event。允许多个消费者等待某个事件发生,而不必寻找一个特定值与通知关联。
发现好像和lock也没啥区别。其实区别的话就是一旦触发了事件,coro1和coro2协程就会立即启动,不需要得到事件对象上的唯一的锁了。
import asyncio
import functools
def callback(event):
print('callback中设置event')
event.set()
async def coro1(name, event):
print(f'{name}等待事件')
await event.wait()
print(f'{name}触发')
async def coro2(name, event):
print(f'{name}等待事件')
await event.wait()
print(f'{name}触发')
async def main(loop):
event= asyncio.Event()
print(f'当前事件状态: {event.is_set()}')
loop.call_later(0.1, functools.partial(callback, event) )
await asyncio.wait([coro1('coro1', event), coro2('coro2', event)])
print(f'当前事件状态: {event.is_set()}')
if __name__== '__main__':
loop= asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
3)条件(condition)
Condition的做法与Event类似,只不过不是通知所有的协程等待的协程,被唤醒的等待协程的数目由notify的一个参数控制。
import asyncio
async def consumer(cond, name, second):
await asyncio.sleep(second)
async with cond:
await cond.wait()
print(f'{name}:资源可供消费者使用')
async def producer(cond):
await asyncio.sleep(2)
for nin range(1, 3):
async with cond:
print(f'唤醒消费者 {n}')
cond.notify(n=n)
await asyncio.sleep(0.1)
async def producer2(cond):
await asyncio.sleep(2)
with await cond:
print('让资源变的可用')
cond.notify_all()
async def main(loop):
condition= asyncio.Condition()
task= loop.create_task(producer(condition))
consumers= [consumer(condition, name, index) for index, namein enumerate(('c1', 'c2'))]
print(consumers)
await asyncio.wait(consumers)
task.cancel()
task= loop.create_task(producer2(condition))
consumers= [consumer(condition, name, index) for index, namein enumerate(('c1', 'c2'))]
await asyncio.wait(consumers)
task.cancel()
if __name__== '__main__':
loop= asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
对上面的代码做简单的分析
使用notify方法挨个通知单个消费者
使用notifyall方法一次性的通知全部消费者
由于producer和producer2是异步的函数,所以不能使用之前calllater方法,需要用createtask把它创建成一个任务,或者asyncio.ensurefuture.
4) 队列(Queue)
asyncio.Queue为协程提供了一个先进先出的数据结构,这与线程queue.Queue或进程的multiprocess,Queue很类似。这里直接上一个aiohtpp爬虫使用的例子
网友评论