这几天看代码,总是会接触到很多异步编程,之前只想着实现功能,从来没考虑过代码的运行快慢问题,故学习一番。
参考链接:
1、https://zhuanlan.zhihu.com/p/25228075(第一部分代码从该文章记录)
从0到1,了解python异步编程的演进
-
1、urllib与requests爬虫
requests对请求做了优化,因此比urllib快一点。
Requests是Python中的HTTP客户端库,网络请求更加直观方便,它与Urllib最大的区别就是在爬取数据的时候连接方式的不同。urllb爬取完数据是直接断开连接的,而requests爬取数据之后可以继续复用socket,并没有断开连接。
在python2.7版本下,Python urllib模块分为两部分,urllib和urllib2。Python3.5 版本下将python2.7版本的urllib和urllib2 合并在一起成一个新的urllib。
urllib:
#-*- coding:utf-8 -*-
import urllib.request
import ssl
from lxml import etree
url = 'https://movie.douban.com/top250'
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
def fetch_page(url):
response = urllib.request.urlopen(url, context=context)
return response
def parse(url):
response = fetch_page(url)
page = response.read()
html = etree.HTML(page)
xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'
xpath_title = './/span[@class="title"]'
xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'
pages = html.xpath(xpath_pages)
fetch_list = []
result = []
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for p in pages:
fetch_list.append(url + p.get('href'))
for url in fetch_list:
response = fetch_page(url)
page = response.read()
html = etree.HTML(page)
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for i, movie in enumerate(result, 1):
title = movie.find(xpath_title).text
print(i, title)
def main():
parse(url)
if __name__ == '__main__':
main()
requests代替标准库urllib:
import requests
from lxml import etree
from time import time
url = 'https://movie.douban.com/top250'
def fetch_page(url):
response = requests.get(url)
return response
def parse(url):
response = fetch_page(url)
page = response.content
html = etree.HTML(page)
xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'
xpath_title = './/span[@class="title"]'
xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'
pages = html.xpath(xpath_pages)
fetch_list = []
result = []
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for p in pages:
fetch_list.append(url + p.get('href'))
for url in fetch_list:
response = fetch_page(url)
page = response.content
html = etree.HTML(page)
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for i, movie in enumerate(result, 1):
title = movie.find(xpath_title).text
# print(i, title)
-
2、lxml库与正则表达式进行解析
lxml库进行解析需要一定时间,但依赖正则表达式的程序会更加难以维护,扩展性不高。
常见的组合是Requests+BeautifulSoup(解析网络文本的工具库),解析工具常见的还有正则,xpath。
将lxml库换成标准的re库:
#-*- coding:utf-8 -*-
import requests
from time import time
import re
url = 'https://movie.douban.com/top250'
def fetch_page(url):
response = requests.get(url)
return response
def parse(url):
response = fetch_page(url)
page = response.content
fetch_list = set()
result = []
for title in re.findall(rb'<a href=.*\s.*<span class="title">(.*)</span>', page):
result.append(title)
for postfix in re.findall(rb'<a href="(\?start=.*?)"', page):
fetch_list.add(url + postfix.decode())
for url in fetch_list:
response = fetch_page(url)
page = response.content
for title in re.findall(rb'<a href=.*\s.*<span class="title">(.*)</span>', page):
result.append(title)
for i, title in enumerate(result, 1):
title = title.decode()
# print(i, title)
-
3、进阶:多进程和多线程
网络应用方面的编程(如上例中的爬虫),通常瓶颈都在IO层面,解决等待读写的问题比提高文本解析速度来的更有性价比。
程序切换—CPU时间的分配:操作系统自动为每个程序分配一些 CPU/内存/磁盘/键盘/显示器 等资源的使用时间,过期后自动切换到下一个程序。当然,被切换的程序,如果没有执行完,它的状态会被保存起来,方便下次轮询到的时候继续执行。
1)进程:进程就是“程序切换”的第一种方式。进程,是执行中的计算机程序。也就是说,每个代码在执行的时候,首先本身即是一个进程。一个进程具有:就绪,运行,中断,僵死,结束等状态(不同操作系统不一样)。每个程序,本身首先是一个进程。
2)线程:线程,也是“程序切换”的一种方式。线程,是在进程中执行的代码。一个进程下可以运行多个线程,这些线程之间共享主进程内申请的操作系统资源。在一个进程中启动多个线程的时候,每个线程按照顺序执行。现在的操作系统中,也支持线程抢占,也就是说其它等待运行的线程,可以通过优先级,信号等方式,将运行的线程挂起,自己先运行。线程,必须在一个存在的进程中启动运行。线程使用进程获得的系统资源,不会像进程那样需要申请CPU等资源。
3)线程与进程的区别:线程一般以并发执行,正是由于这种并发和数据共享机制,使多任务间的协作成为可能。进程一般以并行执行,这种并行能使得程序能同时在多个CPU上运行。
4)协程:协程,也是”程序切换“的一种。简单说,协程也是线程,只是协程的调度并不是由操作系统调度,而是自己”协同调度“。也就是”协程是不通过操作系统调度的线程“。协程,又称微线程。协程间是协同调度的,这使得并发量数万以上的时候,协程的性能是远远高于线程。注意这里也是“并发”,不是“并行”。
多线程有效地解决了阻塞等待的问题。
#-*- coding:utf-8 -*-
import requests
from lxml import etree
from time import time
from threading import Thread
url = 'https://movie.douban.com/top250'
def fetch_page(url):
response = requests.get(url)
return response
def parse(url):
response = fetch_page(url)
page = response.content
html = etree.HTML(page)
xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'
xpath_title = './/span[@class="title"]'
xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'
pages = html.xpath(xpath_pages)
fetch_list = []
result = []
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for p in pages:
fetch_list.append(url + p.get('href'))
def fetch_content(url):
response = fetch_page(url)
page = response.content
html = etree.HTML(page)
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
threads = []
for url in fetch_list:
t = Thread(target=fetch_content, args=[url])
t.start()
threads.append(t)
for t in threads:
t.join()
for i, movie in enumerate(result, 1):
title = movie.find(xpath_title).text
# print(i, title)
多进程,用4个进程的进程池来并行处理网络数据。
#-*- coding:utf-8 -*-
import requests
from lxml import etree
from time import time
from concurrent.futures import ProcessPoolExecutor
url = 'https://movie.douban.com/top250'
def fetch_page(url):
response = requests.get(url)
return response
def fetch_content(url):
response = fetch_page(url)
page = response.content
return page
def parse(url):
page = fetch_content(url)
html = etree.HTML(page)
xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'
xpath_title = './/span[@class="title"]'
xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'
pages = html.xpath(xpath_pages)
fetch_list = []
result = []
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for p in pages:
fetch_list.append(url + p.get('href'))
with ProcessPoolExecutor(max_workers=4) as executor:
for page in executor.map(fetch_content, fetch_list):
html = etree.HTML(page)
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for i, movie in enumerate(result, 1):
title = movie.find(xpath_title).text
# print(i, title)
这里多进程带来的优点(cpu处理)并没有得到体现,反而创建和调度进程带来的开销要远超出它的正面效应,拖了一把后腿。即便如此,多进程带来的效益相比于之前单进程单线程的模型要好得多。
多进程和多线程除了创建的开销大之外还有一个难以根治的缺陷,就是处理进程之间或线程之间的协作问题,因为是依赖多进程和多线程的程序在不加锁的情况下通常是不可控的,而协程则可以完美地解决协作问题,由用户来决定协程之间的调度。
基于gevent的异步程序:
#-*- coding:utf-8 -*-
import requests
from lxml import etree
from time import time
import gevent
from gevent import monkey
monkey.patch_all()
url = 'https://movie.douban.com/top250'
def fetch_page(url):
response = requests.get(url)
return response
def fetch_content(url):
response = fetch_page(url)
page = response.content
return page
def parse(url):
page = fetch_content(url)
html = etree.HTML(page)
xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'
xpath_title = './/span[@class="title"]'
xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'
pages = html.xpath(xpath_pages)
fetch_list = []
result = []
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for p in pages:
fetch_list.append(url + p.get('href'))
jobs = [gevent.spawn(fetch_content, url) for url in fetch_list]
gevent.joinall(jobs)
[job.value for job in jobs]
for page in [job.value for job in jobs]:
html = etree.HTML(page)
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for i, movie in enumerate(result, 1):
title = movie.find(xpath_title).text
# print(i, title)
gevent给予了我们一种以同步逻辑来书写异步程序的能力,看monkey.patch_all()这段代码,它是整个程序实现异步的黑科技,当我们给程序打了猴子补丁后,Python程序在运行时会动态地将一些网络库(例如socket,thread)替换掉,变成异步的库。使得程序在进行网络操作的时候都变成异步的方式去工作,效率就自然提升很多了。
-
4、python Async/Await
Python需要一个独立的标准库来支持协程,于是就有了后来的asyncio。
把同步的requests库改成了支持asyncio的aiohttp库,使用3.5的async/await语法编写协程版本的例子。
#-*- coding:utf-8 -*-
from lxml import etree
from time import time
import asyncio
import aiohttp
url = 'https://movie.douban.com/top250'
async def fetch_content(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def parse(url):
page = await fetch_content(url)
html = etree.HTML(page)
xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'
xpath_title = './/span[@class="title"]'
xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'
pages = html.xpath(xpath_pages)
fetch_list = []
result = []
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for p in pages:
fetch_list.append(url + p.get('href'))
tasks = [fetch_content(url) for url in fetch_list]
pages = await asyncio.gather(*tasks)
for page in pages:
html = etree.HTML(page)
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for i, movie in enumerate(result, 1):
title = movie.find(xpath_title).text
# print(i, title)
def main():
loop = asyncio.get_event_loop()
start = time()
for i in range(5):
loop.run_until_complete(parse(url))
end = time()
print ('Cost {} seconds'.format((end - start) / 5))
loop.close()
速度快,且提高了程序的可读性。
Python Async/Await入门指南
留坑待续......
网友评论