周末为了熟悉mongodb和redis,写了一个抓取《白夜行》小说的程序,并且用pytest测试框架做单元测试, 使用了线程池加快下载速度:
# white_novel.py
""" 使用redis存储网址,使用mongodb存储内容"""
import lxml.html # type: ignore
import requests # type: ignore
import redis # type: ignore
from pymongo import MongoClient, database
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from multiprocessing.dummy import Pool
from functools import partial
class DownloadWhite:
KEY = 'urls'
def __init__(self, workers=15, home_url='http://dongyeguiwu.zuopinj.com/5525'):
self.workers = workers
self.home_url = home_url
self.redis_client = redis.StrictRedis(decode_responses=True)
mongo_client = MongoClient()
db: database.Database = mongo_client['Chapter6']
self.collection = db['white_novel']
def _clear(self):
self.redis_client.delete(self.KEY)
self.collection.delete_many({})
def save_urls(self):
home_page = requests.get(self.home_url).content.decode()
selector = lxml.html.fromstring(home_page)
useful = selector.xpath('//div[@class="book_list"]/ul/li')
urls = []
for i, li in enumerate(useful):
url = li.xpath('a/@href')[0] if li.xpath('a/@href') else None
urls.append(url)
self.redis_client.rpush(self.KEY, *urls)
def download_novel(self):
client = self.redis_client
contents = []
urls = client.lrange(self.KEY, 0, -1)
if not urls:
return
# method1
# with ThreadPoolExecutor(max_workers=self.workers) as executor:
# futures = [executor.submit(self._download_chapter, url, contents) for url in urls]
# for _ in as_completed(futures):
# pass
# method2
pool = Pool(self.workers)
pool.map(partial(self._download_chapter, contents=contents), urls)
print(f'at last insert {len(contents)} chapters')
self.collection.insert_many(contents)
@staticmethod
def _download_chapter(url, contents: list) -> None:
page = requests.get(url).content.decode()
selector = lxml.html.fromstring(page)
title = selector.xpath('//div[@class="h1title"]/h1/text()')[0]
content = '\n'.join(selector.xpath('//div[@id="htmlContent"]/p/text()'))
contents.append({'title': title, 'contnet': content})
if __name__ == '__main__':
dlw = DownloadWhite()
dlw._clear()
dlw.save_urls()
start = time.perf_counter()
dlw.download_novel()
print(f'time elapse {time.perf_counter() - start} seconds')
线程池的实现我试了2个方案,一种方案是ThreadPoolExecutor, 另一种方案是multiprocessing.dummy.Pool, 还用了partial这种小技巧.
不过我有个疑惑:多个线程往同一个列表contents里append,这个contents是线程安全的吗?
What kinds of global value mutation are thread-safe?解答了我的疑问,由于GIL的存在,许多java中的非线程安全问题在python中不存在了,少数类似L[i] +=4这样的先读取再赋值的语句,由于不是原子操作,才可能线程不安全。
由于使用了线程池(15个线程)并发下载章节,因此13章的耗时基本等于1章的耗时
at last insert 13 chapters
time elapse 0.9961462760111317 seconds
单元测试:
# test_white_novel.py
import pytest # type: ignore
import redis # type: ignore
from pymongo import MongoClient, collection # type: ignore
from white_novel import DownloadWhite
@pytest.fixture(scope='function')
def wld_instance():
print('start')
dlw = DownloadWhite()
dlw._clear()
yield dlw
dlw._clear()
print('end')
@pytest.fixture(scope='module')
def redis_client():
print('init redis')
return redis.StrictRedis(decode_responses=True)
@pytest.fixture(scope='module')
def white_novel_collection() -> collection.Collection:
print('init mongo')
mongo_client = MongoClient()
database = mongo_client['Chapter6']
collection = database['white_novel']
return collection
def test_download(wld_instance, redis_client, white_novel_collection):
wld_instance.save_urls()
wld_instance.download_novel()
assert redis_client.llen(wld_instance.KEY) == 13
assert white_novel_collection.count_documents(filter={}) == 13
def test_not_save_url_download(wld_instance, redis_client, white_novel_collection):
wld_instance.download_novel()
assert redis_client.llen(wld_instance.KEY) == 0
assert white_novel_collection.count_documents(filter={}) == 0
def test_only_save_url(wld_instance, redis_client, white_novel_collection):
wld_instance.save_urls()
assert redis_client.llen(wld_instance.KEY) == 13
assert white_novel_collection.count_documents(filter={}) == 0
最终抓取的结果如下:
redis 存储每一章的链接列表 mongodb存储小说内容
网友评论