asyncio
是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。
asyncio的
编程模型就是一个消息循环。我们从asyncio
模块中直接获取一个EventLoop
的引用,然后把需要执行的协程扔到EventLoop
中执行,就实现了异步IO。
asyncio
实现了TCP、UDP、SSL等协议,aiohttp
则是基于asyncio
实现的HTTP框架。
我们先安装aiohttp
:
pip install aiohttp
aiomysql
异步操作使用mysql需要用到aiomysql
pip install aiomysql
pyquery
是用来读取html的工具
pip install pyquery
全部具体代码如下:
import re
import asyncio
import aiohttp
import aiomysql
from pyquery import PyQuery
sem = asyncio.Semaphore(3) # 3个允许的同时请求的数量
start_url = 'http://jobbole.com/'
stopping = False
waitting_urls = [] # 等待中的文章
seen_urls = set() # 爬取过的文章
async def fetch(url,session):
"""爬取对应url里的内容"""
async with sem:
await asyncio.sleep(1)
try:
async with session.get(url) as resp:
print("url status: {}".format(resp.status))
if resp.status in [200, 201]:
data = await resp.text()
return data
except Exception as e:
print(e)
async def init_urls(url, session):
"""没匹配到正则则, 接着爬"""
html = await fetch(url, session)
seen_urls.add(url)
extract_urls(html)
async def article_handler(url, session, pool):
"""获取文章并解析入库"""
html = await fetch(url, session)
seen_urls.add(url)
extract_urls(html)
pq = PyQuery(html)
title = pq("title").text()
async with pool.get() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 42;")
insert_sql = "insert into article_test(title) values('{}')".format(title)
await cur.execute(insert_sql)
async def consumer(pool):
"""不断的从等待列表爬取文章"""
async with aiohttp.ClientSession() as session:
while not stopping:
if len(waitting_urls) ==0:
await asyncio.sleep(0.5)
continue
url = waitting_urls.pop()
print("start get url: {}".format(url))
if re.match('http://.*?jobbole.com/\d+/', url):
if url not in seen_urls:
asyncio.ensure_future(article_handler(url, session, pool))
else:
if url not in seen_urls:
asyncio.ensure_future(init_urls(url, session))
def extract_urls(html):
"""解析html, 从href提取url放入到等待中的url"""
urls = []
pq = PyQuery(html)
for link in pq.items("a"):
url = link.attr("href")
if url and url.startswith("http") and url not in seen_urls:
urls.append(url)
waitting_urls.append(url)
async def main(loop):
# mysql准备
pool = await aiomysql.create_pool(host='localhost', port=3306,
user='root', password='123456',
db='aiomysql', loop = loop, charset="utf8", autocommit=True)
async with aiohttp.ClientSession() as session:
html = await fetch(start_url, session)
seen_urls.add(start_url)
extract_urls(html)
asyncio.ensure_future(consumer(pool))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
asyncio.ensure_future(main(loop))
loop.run_forever()
解析
if __name__ == '__main__':
loop = asyncio.get_event_loop()
asyncio.ensure_future(main(loop))
loop.run_forever()
asyncio.get_event_loop()
将创建一个事件循环, asyncio.ensure_future
将创建一个任务, 参数是一个协程, loop.run_forever()
会一直运行, 直到被stop停止
async def main(loop):
# mysql准备
pool = await aiomysql.create_pool(host='localhost', port=3306,
user='root', password='123456',
db='aiomysql', loop = loop, charset="utf8", autocommit=True)
# fetch() 用于解析html内容, extract_urls用于解析html中的a标签里的url,并放入等待列表
async with aiohttp.ClientSession() as session:
html = await fetch(start_url, session)
seen_urls.add(start_url)
extract_urls(html)
asyncio.ensure_future(consumer(pool))
async
标记一个方法标记为协程, 然后在协程内部用await
调用另一个协程实现异步操作。
网友评论