简介
学习这个主要是因为在处理一个本地测试的时候,需要解决这个模块的一个问题。在浏览解决方案的时候,发现大家会使用这个模块来进行python爬虫工作。感觉蛮有意思的,准备花一天补补基础。
asyncio 可以实现异步网络操作、并发、协程。当然目前实现协程的不仅仅asyncio还有tornado等模块。
创建一个asyncio的步骤如下
- 创建一个event_loop 事件循环,当启动时,程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数。
- 创建协程: 使用async关键字定义的函数就是一个协程对象。在协程函数内部可以使用await关键字用于阻塞操作的挂起。
- 将协程注册到事件循环中。协程的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
基础知识
一、定义一个协程
import time
import asyncio
now = lambda : time.time()
async def do_some_work(x):
print("waiting:", x)
start = now()
# 这里是一个协程对象,这个时候do_some_work函数并没有执行
coroutine = do_some_work(2)
print(coroutine)
# 创建一个事件loop
loop = asyncio.get_event_loop()
# 将协程注册到事件循环,并启动事件循环
loop.run_until_complete(coroutine)
print("Time:",now()-start)
二、创建一个task
一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含了任务的各种状态。在上面的代码中,在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象。 task对象是Future类的子类,保存了协程运行后的状态,用于未来获取协程的结果。
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print("waiting:", x)
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print(task)#<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex2.py:13>>
loop.run_until_complete(task)
print(task)#<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex2.py:13> result=None>
print("Time:",now()-start)
关于上面通过loop.create_task(coroutine)创建task,同样的可以通过 asyncio.ensure_future(coroutine)创建task.使用这两种方式的区别在官网上有提及。task/future以及使用async创建的都是awaitable对象,都可以在await关键字之后使用。future对象意味着在未来返回结果,可以搭配回调函数使用。
三、绑定回调
当使用ensure_feature创建任务的时候,可以使用任务的task.add_done_callback(callback)方法,获得对象的协程返回值。
async def do_some_work(x):
print("waiting:",x)
return "Done after {}s".format(x)
def callback(future):
print("callback:",future.result())
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
print(task)
task.add_done_callback(callback)
print(task)
loop.run_until_complete(task)
#结果
<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13>>
<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13> cb=[callback() at /app/py_code/study_asyncio/simple_ex3.py:18]>
waiting: 2
callback: Done after 2s
四、阻塞和await
前面提到asynic函数内部可以使用await 来针对耗时的操作进行挂起。
async def do_some_work(x):
print("waiting:",x)
# await 后面就是调用耗时的操作
await asyncio.sleep(x)
return "Done after {}s".format(x)
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)
五、并发和并行
并发通常是指有多个任务需要同时进行,并行则是同一个时刻有多个任务执行.
当有多个任务需要并行时,可以将任务先放置在任务队列中,然后将任务队列传给asynicio.wait方法,这个方法会同时并行运行队列中的任务。将其注册到事件循环中。
async def do_some_work(x):
print("Waiting:",x)
await asyncio.sleep(x)
return "Done after {}s".format(x)
start = now()
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
asyncio.wait(tasks) 也可以使用 asyncio.gather(tasks) ,前者接收一堆task,后者接受一个task列表。asyncio.wait(tasks)方法返回值是两组task/future的set.dones, pendings = await asyncio.wait(tasks)
其中dones是task的set,pendings是future的set。asyncio.gather(tasks) 返回一个结果的list。(见下一节的列子)
六、嵌套协程
使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来。
now = lambda: time.time()
async def do_some_work(x):
print("waiting:",x)
await asyncio.sleep(x)
return "Done after {}s".format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print("Task ret:", task.result())
# results = await asyncio.gather(*tasks)
# for result in results:
# print("Task ret:",result)
start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Time:", now()-start)
使用asyncio.wait的结果如下,可见返回的结果dones并不一定按照顺序输出
waiting: 1
waiting: 2
waiting: 4
Task ret: Done after 2s
Task ret: Done after 4s
Task ret: Done after 1s
Time: 4.006587505340576
使用 await asyncio.gather(*tasks)得到的结果如下,是按照列表顺序进行返回的
waiting: 1
waiting: 2
waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
Time: 4.004234313964844
上面的程序将main也定义为协程。我们也可以不在main协程函数里处理结果,直接返回await的内容,那么最外层的run_until_complete将会返回main协程的结果。
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print("waiting:",x)
await asyncio.sleep(x)
return "Done after {}s".format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
return await asyncio.gather(*tasks)
#return await asyncio.wait(tasks)也可以使用。注意gather方法需要*这个标记
start = now()
loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
for result in results:
print("Task ret:",result)
print("Time:", now()-start)
也可以使用as_complete方法实现嵌套协程
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print("waiting:",x)
await asyncio.sleep(x)
return "Done after {}s".format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
for task in asyncio.as_completed(tasks):
result = await task
print("Task ret: {}".format(result))
start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Time:", now()-start)
七、协程停止
创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消。可以使用asyncio.Task获取事件循环的task。
future对象有如下几个状态:Pending、Running、Done、Cacelled
import asyncio
import time
now = lambda :time.time()
async def do_some_work(x):
print("Waiting:",x)
await asyncio.sleep(x)
return "Done after {}s".format(x)
coroutine1 =do_some_work(1)
coroutine2 =do_some_work(2)
coroutine3 =do_some_work(2)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3),
]
start = now()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
print(asyncio.Task.all_tasks())
for task in asyncio.Task.all_tasks():
print(task.cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()
print("Time:",now()-start)
启动事件循环之后,马上ctrl+c,会触发run_until_complete的执行异常 KeyBorardInterrupt。然后通过循环asyncio.Task取消future。可以看到输出如下:
Waiting: 1
Waiting: 2
Waiting: 2
^C{<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex10.py:13> result='Done after 1s'>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<wait() running at /usr/local/lib/python3.5/asyncio/tasks.py:361> wait_for=<Future pending cb=[Task._wakeup()]>>}
False
True
True
True
Time: 1.0707225799560547
True表示cannel成功,loop stop之后还需要再次开启事件循环,最后在close,不然还会抛出异常.
循环task,逐个cancel是一种方案,可是正如上面我们把task的列表封装在main函数中,main函数外进行事件循环的调用。这个时候,main相当于最外出的一个task,那么处理包装的main函数即可。
网友评论