现在 python 写爬虫基本都是用scrapy,但是有时候一些小任务没必要用这么大的框架。而且希望代码自由度高一些,可以随心所欲写。这时候就需要用到 aiohttp 这个库,它基于 asyncio 编写,主要解决这类问题。
什么?你说 requests?这确实是一个优秀的http库,可它是阻塞式IO,太慢了。但是,其API设计优秀,文档健全,非常适合新手入门使用。
这是群里一位朋友需要的抓取淘宝主图的爬虫。随手写了一下,就全写在单文件内,没有以项目形式解耦分成很多文件。希望能给正在学习协程的朋友一点参考。
import os
import logging
import aiohttp
import asyncio
import aiofiles
import pandas as pd
from pyquery import PyQuery
from multiprocessing import cpu_count
# 【使用方法】:将要抓取主图的宝贝 ID 填入程序生成的 items.csv 内,一行一个。
# 日志等级及格式
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s 【%(levelname)s】=> %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
class AsyncSpider(object):
def __init__(self):
self.BASE_DIR = os.path.dirname(os.path.abspath(__file__))
self.BASE_URL = "https://detail.tmall.com/item.htm?id="
self.checker()
self.loader()
self.total = 0
self.failed = 0
def checker(self):
"""
检查必要文件及文件夹,如果不存在就自行创建。
"""
folders = ["files", "images"]
files = ["items.csv", "failed.csv"]
for folder in folders:
path = os.path.join(self.BASE_DIR, folder)
if not os.path.exists(path):
os.makedirs(path)
for file in files:
path = os.path.join(self.BASE_DIR, "files", file)
if not os.path.exists(path):
f = open(path, "w")
exit("将要下载的宝贝 id 填写至 items.csv 后再运行程序。")
f = open(os.path.join(self.BASE_DIR, "files", "failed.csv"), "w")
f.close()
def loader(self):
try:
path = os.path.join(self.BASE_DIR, "files", "items.csv")
df = pd.read_csv(path, header=None)
items = df[0].to_list()
return items
except Exception as e:
if str(e) == "No columns to parse from file":
logging.error("items.csv 数据为空,无可执行任务,程序退出。")
else:
logging.error(e)
exit()
async def error_handler(self, url):
self.failed += 1
item = url.split("id=")[-1] + "\n"
path = os.path.join(self.BASE_DIR, "files", "failed.csv")
async with aiofiles.open(path, "ab") as f:
await f.write(item.encode())
await f.close()
@staticmethod
async def fetch(session, url):
logging.info(f"Start: {url}")
async with session.get(url) as response:
if response.status in [200, 201]:
# 如果 url 以『.jpg』结尾,则认定为图片地址,返回 byte。否则,认定为网址,返回 str 便于解析。
if url.endswith(".jpg"):
return await response.read()
else:
return await response.text()
# 解析 html 获取主图 url
@staticmethod
async def parser(html):
pq = PyQuery(html)
# 获取所有图片的url,如果 url 中存在『_430x430q90』则认定为主图,将主图 url 返回
for link in pq.items("img"):
src = link.attr("src")
if src:
if "_430x430q90" in src or "400x400.jpg" in src:
return "https:" + src
return
# 图片存储
async def save(self, session, url, path):
# 获取图片内容
img = await self.fetch(session, url)
# 将内容异步存储
async with aiofiles.open(path, "wb") as f:
await f.write(img)
await f.close()
logging.info(f"Saved image {path}")
async def main(self, url, semaphore):
self.total += 1
async with semaphore:
async with aiohttp.ClientSession() as session:
html = await self.fetch(session, url)
image_src = await self.parser(html)
if image_src:
image_name = url.split("id=")[-1] + ".jpg"
image_path = os.path.join(self.BASE_DIR, "images", image_name)
await self.save(session, image_src, image_path)
else:
await self.error_handler(url)
def run(self):
items = self.loader()
urls = map(lambda x: self.BASE_URL + str(x), items)
semaphore = asyncio.Semaphore(cpu_count() * 2)
loop = asyncio.get_event_loop()
futures = [asyncio.ensure_future(self.main(url, semaphore)) for url in urls]
tasks = asyncio.gather(*futures)
loop.run_until_complete(tasks)
loop.close()
logging.info(f"运行完毕。任务数共 {self.total} 。失败 {self.failed}")
if __name__ == "__main__":
async_spider = AsyncSpider()
async_spider.run()
网友评论