美文网首页
使用redis和mongodb下载小说,并用pytest做测试

使用redis和mongodb下载小说,并用pytest做测试

作者: ThomasYoungK | 来源:发表于2019-01-06 12:06 被阅读68次

    周末为了熟悉mongodb和redis,写了一个抓取《白夜行》小说的程序,并且用pytest测试框架做单元测试, 使用了线程池加快下载速度:

    # white_novel.py
    """ 使用redis存储网址,使用mongodb存储内容"""
    
    import lxml.html  # type: ignore
    import requests  # type: ignore
    import redis  # type: ignore
    from pymongo import MongoClient, database
    from concurrent.futures import ThreadPoolExecutor, as_completed
    import time
    from multiprocessing.dummy import Pool
    from functools import partial
    
    class DownloadWhite:
        KEY = 'urls'
    
        def __init__(self, workers=15, home_url='http://dongyeguiwu.zuopinj.com/5525'):
            self.workers = workers
            self.home_url = home_url
            self.redis_client = redis.StrictRedis(decode_responses=True)
            mongo_client = MongoClient()
            db: database.Database = mongo_client['Chapter6']
            self.collection = db['white_novel']
    
        def _clear(self):
            self.redis_client.delete(self.KEY)
            self.collection.delete_many({})
    
        def save_urls(self):
            home_page = requests.get(self.home_url).content.decode()
            selector = lxml.html.fromstring(home_page)
            useful = selector.xpath('//div[@class="book_list"]/ul/li')
            urls = []
            for i, li in enumerate(useful):
                url = li.xpath('a/@href')[0] if li.xpath('a/@href') else None
                urls.append(url)
            self.redis_client.rpush(self.KEY, *urls)
    
        def download_novel(self):
            client = self.redis_client
            contents = []
            urls = client.lrange(self.KEY, 0, -1)
            if not urls:
                return
            # method1
            # with ThreadPoolExecutor(max_workers=self.workers) as executor:
            #     futures = [executor.submit(self._download_chapter, url, contents) for url in urls]
            # for _ in as_completed(futures):
            #     pass
            # method2
            pool = Pool(self.workers)
            pool.map(partial(self._download_chapter, contents=contents), urls)
            print(f'at last insert {len(contents)} chapters')
            self.collection.insert_many(contents)
    
        @staticmethod
        def _download_chapter(url, contents: list) -> None:
            page = requests.get(url).content.decode()
            selector = lxml.html.fromstring(page)
            title = selector.xpath('//div[@class="h1title"]/h1/text()')[0]
            content = '\n'.join(selector.xpath('//div[@id="htmlContent"]/p/text()'))
            contents.append({'title': title, 'contnet': content})
    
    
    if __name__ == '__main__':
        dlw = DownloadWhite()
        dlw._clear()
        dlw.save_urls()
        start = time.perf_counter()
        dlw.download_novel()
        print(f'time elapse {time.perf_counter() - start} seconds')
    
    

    线程池的实现我试了2个方案,一种方案是ThreadPoolExecutor, 另一种方案是multiprocessing.dummy.Pool, 还用了partial这种小技巧.

    不过我有个疑惑:多个线程往同一个列表contents里append,这个contents是线程安全的吗?
    What kinds of global value mutation are thread-safe?解答了我的疑问,由于GIL的存在,许多java中的非线程安全问题在python中不存在了,少数类似L[i] +=4这样的先读取再赋值的语句,由于不是原子操作,才可能线程不安全。

    由于使用了线程池(15个线程)并发下载章节,因此13章的耗时基本等于1章的耗时

    at last insert 13 chapters
    time elapse 0.9961462760111317 seconds
    

    单元测试:

    # test_white_novel.py
    import pytest  # type: ignore
    import redis  # type: ignore
    from pymongo import MongoClient, collection  # type: ignore
    
    from white_novel import DownloadWhite
    
    
    @pytest.fixture(scope='function')
    def wld_instance():
        print('start')
        dlw = DownloadWhite()
        dlw._clear()
        yield dlw
        dlw._clear()
        print('end')
    
    
    @pytest.fixture(scope='module')
    def redis_client():
        print('init redis')
        return redis.StrictRedis(decode_responses=True)
    
    
    @pytest.fixture(scope='module')
    def white_novel_collection() -> collection.Collection:
        print('init mongo')
        mongo_client = MongoClient()
        database = mongo_client['Chapter6']
        collection = database['white_novel']
        return collection
    
    
    def test_download(wld_instance, redis_client, white_novel_collection):
        wld_instance.save_urls()
        wld_instance.download_novel()
        assert redis_client.llen(wld_instance.KEY) == 13
        assert white_novel_collection.count_documents(filter={}) == 13
    
    
    def test_not_save_url_download(wld_instance, redis_client, white_novel_collection):
        wld_instance.download_novel()
        assert redis_client.llen(wld_instance.KEY) == 0
        assert white_novel_collection.count_documents(filter={}) == 0
    
    def test_only_save_url(wld_instance, redis_client, white_novel_collection):
        wld_instance.save_urls()
        assert redis_client.llen(wld_instance.KEY) == 13
        assert white_novel_collection.count_documents(filter={}) == 0
    

    最终抓取的结果如下:


    redis 存储每一章的链接列表 mongodb存储小说内容

    相关文章

      网友评论

          本文标题:使用redis和mongodb下载小说,并用pytest做测试

          本文链接:https://www.haomeiwen.com/subject/qoqsrqtx.html