协程应该是属于python里面一个独有的概念。源自它设计的一个特性:同一时刻,Python 主程序只允许有一个线程执行。
但是对于一些IO操作频繁的操作,如网络请求,如果单线程同步执行的话,那么很多时间都会浪费在等待请求返回中。假设下载一个网页的数据需要2秒,现在需要下载10个网页,按同步一个一个执行的话,则需要花费2*10=20秒。但如果我开始下载1个之后,然后又立即去下载第2个,如果第1个的结果返回了之后,我们再去处理第一个的下载结果。这样我们就不需要每个都等待2秒了。
协程就是用来在单线程中实现并发编程的一种操作。协程由用户决定,在哪些地方交出控制权,切换到下一个任务。
协程就是异步编程。可以把协程理解成一个异步函数,而直接调用这个异步函数返回一个协程对象。协程使用async和await语法糖,通过async def声明。
#定义一个协程
async def main():
print("hello")
main()
#输出:返回一个协程对象
<coroutine object main at 0x7fb64949ef40>
上面看到,直接调用协程并不会真正调用执行,要真正运行一个协程,需要用到asyncio库(已经废弃的生成器调用方法就不说了)
asyncio
使用协程,我们需要用到asyncio库。
asyncio 是用来编写 并发 代码的库,使用 async/await 语法。
asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。
asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。
asyncio 提供一组 高层级API 用于并发地 运行 Python 协程并对其执行过程实现完全控制
asyncio 提供了三种主要机制运行协程:
asyncio.run()
函数用来运行最高层级的入口点 "main()"。
#此代码在paycharm中运行
async def main():
print("hello")
asyncio.run(main())
#输出:
hello
asyncio.run
(coro, *, *debug=False*)会运行传入的协程coro,并返回结果。它会创建一个事件循环(event loop),负责管理asyncio事件的循环调度。而且在一个线程中,只能运行一个事件循环。比如你想在jupyter notebook中执行asyncio.run(),就会失败,你会收到一个错误提示:RuntimeError: asyncio.run() cannot be called from a running event loop。因为jupter notebook本身已经运行了一个event loop了。
一般用asyncio.run(main()) 作为主程序的入口函数,在程序运行周期内,只调用一次 asyncio.run。
- 使用await等待一个协程
await后面接一个可等待对象。可等待对象有三种主要类型: 协程, 任务 和 Future。
await,字面意思,就是等待程序执行。我们用await等待一个协程,那么程序就会阻塞在这里,进入被调用的协程函数,执行完毕后再返回继续,这个和正常python流程是一样的。使用await,在jupter notebook中,我们就可以用它来运行协程了。如下:
#此代码在jupyter notebook中执行。
import requests
import asyncio
async def crawl_page(url):
print("crawl url:{}".format(url))
await asyncio.sleep(2) #用休眠代替网络请求操作,排除网络的原因
print("ok url:{}".format(url))
async def main():
urls = ['https://www.amazon.co.jp/-/en/ranking?type=new-releases',
'https://www.amazon.co.jp/-/en/ranking?type=top-sellers',
'https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited']
for url in urls:
await crawl_page(url)
start_time = time.perf_counter()
await main()
end_time = time.perf_counter()
print("总共耗时:{}".format(end_time-start_time))
#### 输出
crawl url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
ok url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
crawl url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
ok url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
crawl url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
ok url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
总共耗时:6.007925541001896
上面模拟网络3个网络请求,每个耗时2秒,总共耗时约6秒。整个过程与同步过程是一样。相当于下面的同步流程:
import requests
import time
def crawl_page(url):
print("crawl url:{}".format(url))
time.sleep(2) #简单粗暴用休眠代替网络请求操作,也不受其他因素的影响
print("ok url:{}".format(url))
def main():
urls = ['https://www.amazon.co.jp/-/en/ranking?type=new-releases',
'https://www.amazon.co.jp/-/en/ranking?type=top-sellers',
'https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited']
for url in urls:
crawl_page(url)
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print("总共耗时:{}".format(end_time-start_time))
#### 输出
crawl url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
ok url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
crawl url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
ok url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
crawl url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
ok url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
总共耗时:6.009312429000602
看到这,是不是觉得协程没什么特别嘛,并没有实现并发呀。并没有实现我想要的耗时约2秒。那时因为真正的主角还没上场。上面只是用异步方法实现了同步功能,如果要实现并发,那就需要任务。也即协程的第三种调用方式:
-
用任务“并行地”调度协程
用asyncio.create_task()
将一个协程封装成任务,该协程就可以被自动调度执行了。创建一个任务:task = asyncio.create_task(coro),然后用await调度任务。任务可以很快地被执行而不被阻塞。
asyncio.create_task(coro,name=None):
将 coro 协程 封装为一个Task
并调度其执行。返回 Task 对象。
该任务会在get_running_loop()
返回的循环中执行,如果当前线程没有在运行的循环则会引发RuntimeError
。
上面的例子用task实现如下:
import asyncio
async def crawl_page(url):
print("crawl url:{}".format(url))
await asyncio.sleep(2) #asyncio的sleep()总是会挂起当前任务,以允许其他任务运行
print("ok url:{}".format(url))
async def main():
urls = ['https://www.amazon.co.jp/-/en/ranking?type=new-releases',
'https://www.amazon.co.jp/-/en/ranking?type=top-sellers',
'https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited']
tasks = [asyncio.create_task(crawl_page(url)) for url in urls] #创建任务列表
for task in tasks:
await task #用await调度任务
start_time = time.perf_counter()
await main()
end_time = time.perf_counter()
print("总共耗时:{}".format(end_time-start_time))
#####输出#######
crawl url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
crawl url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
crawl url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
ok url:https://www.amazon.co.jp/-/en/ranking?type=new-releases
ok url:https://www.amazon.co.jp/-/en/ranking?type=top-sellers
ok url:https://www.amazon.co.jp/gp/dmusic/promotions/AmazonMusicUnlimited
总共耗时:2.004724346999865
看,使用任务Task,终于实现了想要的约2秒的效果了。那么task是如何实现并发的呢?看下面两个例子:
import asyncio
import time
async def work1():
print("work1 start")
await asyncio.sleep(2) #特意设置成2秒
print("work1 done")
async def work2():
print("work2 start")
await asyncio.sleep(1)
print("work2 done")
async def work3():
print("work3 start")
await asyncio.sleep(3)
print("work3 done")
async def main():
task1 = asyncio.create_task(work1())
task2 = asyncio.create_task(work2())
task3 = asyncio.create_task(work3())
print("start at {}".format(time.strftime('%X')))
await main()
print("ended at {}".format(time.strftime('%X')))
#### 输出
start at 20:26:39
ended at 20:26:39
work1 start
work2 start
work3 start
import time
async def work1():
print("work1 start")
await asyncio.sleep(2) #特意设置成2秒
print("work1 done")
async def work2():
print("work2 start")
await asyncio.sleep(1)
print("work2 done")
async def work3():
print("work3 start")
await asyncio.sleep(3)
print("work3 done")
async def main():
task1 = asyncio.create_task(work1())
task2 = asyncio.create_task(work2())
task3 = asyncio.create_task(work3())
print("before await")
await task1
print("awaited task1")
await task2
print("awaited task2")
await task3
print("awaited task3")
print("start at {}".format(time.strftime('%X')))
await main()
print("ended at {}".format(time.strftime('%X')))
###输出
start at 20:28:31
before await
work1 start
work2 start
work3 start
work2 done
work1 done
awaited task1
awaited task2
work3 done
awaited task3
ended at 20:28:34
从两个例子中我们可以看出:
1、任务被创建后,就会被加入事件循环,等待事件调度器调度。事件调度器会自动地去调度。
2、遇到await后,当前任务就会挂起,将控制权交出,事件调度器去调度其它任务,其它任务获得控制权。当任务执行完成之后,它又会重新获得控制权,继续执行后续的代码。
3、上面我特意写了work1的休眠时长比work2的休眠时长长。work2会优先于work1完成,work2执行完成之后,主程序获得控制权后,不会打印“awaited work2”,因为主程序还在await work1呢。
这样,通过事件循环调度,异步调用,我们就实现了python的“并发”。充分利用了等待时间。
上面,我们看到的都是没有返回值的,协程也是可以返回值的。我们可以在结束之后,取到返回值。另外,协程也是有可能产生异常的,任务有可能被取消。
import asyncio
import time
async def work1():
print("work1 start")
await asyncio.sleep(1)
print("work1 done")
return 1
async def work2():
print("work2 start")
await asyncio.sleep(2)
print("work2 done")
return 2/0
async def work3():
print("work3 start")
await asyncio.sleep(3)
print("work3 done")
return 3
async def work4():
print("work4 start")
await asyncio.sleep(4)
print("work4 done")
return 4
async def main():
task1 = asyncio.create_task(work1())
task2 = asyncio.create_task(work2())
task3 = asyncio.create_task(work3())
task4 = asyncio.create_task(work4())
#2秒之后,取消任务4
await asyncio.sleep(2)
task4.cancel()
res = await asyncio.gather(task1, task2, task3, task4,return_exceptions=True)
print(res)
print("start at {}".format(time.strftime('%X')))
await main()
print("ended at {}".format(time.strftime('%X')))
###输出
start at 21:10:18
work1 start
work2 start
work3 start
work4 start
work1 done
work2 done
work3 done
[1, ZeroDivisionError('division by zero'), 3, CancelledError()]
ended at 21:10:21
taskasyncio.gatherasyncio.gather(aws,return_exceptions=True)
1、并发运行 aws序列中的可等待对象
2、如果 aws 中的某个可等待对象为协程,它将自动被作为一个任务调度。
3、如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。
4、如果 return_exceptions 为False
(默认),所引发的首个异常会5、立即传播给等待gather()
的任务。aws 序列中的其他可等待对象 不会被取消 并将继续运行。
如果 return_exceptions 为True
,异常会和成功的结果一样处理,并聚合至结果列表。
如果gather()
被取消,所有被提交 (尚未完成) 的可等待对象也会 被取消。
最后,看一个真正的网络请求吧
爬取豆瓣上即将上映的电影,获取电影名称,上映时间,时长,简介等信息。(仅供学习交流,勿频繁调用)
import aiohttp
import asyncio
from bs4 import BeautifulSoup
global sleep = 0
#解析拿到的所有即将上映的电影
def get_movies_info(text):
print("开始解析拿到所有即将上映的电影数据")
soup = BeautifulSoup(text,"html.parser")
all_movies = soup.find('div',id = 'showing-soon')
movies_info = []
for each_movie in all_movies.find_all('div',class_="item"):
#print("=========")
all_a_tag = each_movie.find_all('a')
all_li_tag = each_movie.find_all("li")
movie_name = all_a_tag[1].text
movie_url= all_a_tag[1]["href"]
movie_date = all_li_tag[0].text
movie_type = all_li_tag[1].text
movies_info.append([movie_url,(movie_name,movie_type,movie_date)])
return movies_info
def get_each_movie_details(text):
soup = BeautifulSoup(text,"html.parser")
spans = soup.find("div",id="info").find_all("span")
#时长
duration = spans[-2].text
#简介
description = soup.find("div",{"class":"indent","id":"link-report"}).find("span").text.strip()
#海报url
img_tag = soup.find("img")
url = img_tag['src']
return (duration,description,url)
async def get_url(url)->str:
global sleep
asyncio.sleep(sleep) #每个请求都会被等待,等待的sleep是一个递增变量,防止频繁数据请求,我们的目的是协程。
print("start get url:{}".format(url))
header = {"user-agent":"Chrome/10.0"}
async with aiohttp.ClientSession(headers=header) as session:
async with session.get(url,headers=header) as resp:
result = await resp.text()
print("getted result")
return result
sleep += 0.5
async def main():
url = "https://movie.douban.com/cinema/later/shenzhen/"
try:
task = asyncio.create_task(get_url(url))
res = await asyncio.gather(task)
movies_info = get_movies_info(res[0])
tasks = [asyncio.create_task(get_url(info[0])) for info in movies_info]
print("start fetch each movie info")
htmls = await asyncio.gather(*tasks,return_exceptions=True)
print("get html :{}".format(len(htmls)))
all_info = []
i = 0
for html in htmls:
details = get_each_movie_details(html)
all_info.append(movies_info[i][1]+details)
i+=1
print(" ".join(all_info))
except Exception as e:
print("Exception:{}".format(e))
await main()
参考文档:https://docs.python.org/zh-cn/3/library/asyncio.html
https://time.geekbang.org/column/article/103358
网友评论