1.使用aiohttp客户端实现爬虫代码示例
import asyncio
import re
from scrapy import Selector
import aiohttp
import aiomysql
# 最开始的url
start_url = 'https://news.sina.com.cn/'
# 等待爬取的url
waitting_urls = set()
# 已经爬取过的url
already_urls = set()
# 定义并发的数量为3
sem = asyncio.Semaphore(3)
# 请求url获取源码
async def fetch(url, session):
async with sem:
try:
# get(url)是耗费网咯IO请求的过程,因此需要使用async with
async with session.get(url) as response:
if response.status == 200:
return await response.text()
# 必须使用await获取源码
except Exception as e:
print(e)
async def analysis_title(url, session, pool):
try:
html = await fetch(url, session)
selector = Selector(text=html)
except Exception as e:
print(e)
print(url)
return ''
title = selector.xpath('//h1[@class="main-title"]/text()').get()
if title:
# 从mysql的连接池中获取一个mysql的连接对象
async with pool.acquire() as conn:
async with conn.cursor() as cur:
try:
insert_sql = 'INSERT INTO `title` VALUES ("{}")'.format(title)
await cur.execute(insert_sql)
already_urls.add(url)
except Exception as e:
print(e)
print(url)
return ''
# 从非新闻页的提取所有url并放入waitting_urls
async def extract_url(url, session):
html = await fetch(url, session)
try:
selector = Selector(text=html)
except Exception as e:
print(e)
print(url)
print(html)
return ''
urls = selector.xpath('//@href').extract()
for url in urls:
waitting_urls.add(url)
already_urls.add(url)
# 从waitting_urls中获取url并找出符合要求的url发送给解析方法
async def consumer(pool):
# 由于session是可以close的,因此需要使用async with来声明session
# 为了防止每次请求都建立session而耗费并发, 因此只声明一次session并将其传入其他协程
async with aiohttp.ClientSession() as session:
while True:
if len(waitting_urls) == 0:
await asyncio.sleep(2)
continue
url = waitting_urls.pop()
if url and url not in already_urls and re.findall('/\d{4}-\d{2}-\d{2}/doc.*?\d+\.shtml', url):
asyncio.ensure_future(analysis_title(url, session, pool))
else:
asyncio.ensure_future(extract_url(url, session))
async def main(loop):
# 等待mysql连接成功, loop参数是协程中的loop,autocommit表示自动提交
pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
user='root', password='zkr@123...',
db='test', loop=loop, charset='utf8',
autocommit=True)
async with aiohttp.ClientSession() as session:
await extract_url(start_url, session)
# 为了防止每次入库都生成mysql连接池耗费资源,因此将pool连接池传入函数中
asyncio.ensure_future(consumer(pool))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
asyncio.ensure_future(main(loop))
loop.run_forever()
网友评论