美文网首页
Python进程+协程——从零开始搭建异步爬虫(2)

Python进程+协程——从零开始搭建异步爬虫(2)

作者: Yeureka | 来源:发表于2021-02-22 14:46 被阅读0次

    Python进程+协程——从零开始搭建异步爬虫(1)
    Python进程+协程——从零开始搭建异步爬虫(2)

    在上节中,我们成功的在多进程中利用协程实现了多任务异步执行和多流程按次序执行的目标。本节我们将在原有代码的基础上继续改造代码,增加网页请求功能,实现一个简单的异步爬虫,实现每次爬新网页只需要关注网络请求、网页解析和数据处理,多进程和异步请求部分由爬虫自身处理。

    详细流程图

    Blog-异步爬虫实现架构图

    需要用到的库

    Beautifulsoup:一个可以从 HTMLXML 文件中提取数据的Python库。

    # 安装方法
    cd AiospiderWorkshop
    pipenv shell
    pipenv install beautifulsoup4
    

    创建下载类 Downloader

    我们以崔庆才崔老师建立的爬虫练习网站 https://scrape.center/ 为练习对象。我们用到的是其中最简单的一个网页 https://ssr1.scrape.center/page/1。阅读本节需要对 Beautifulsoup 库和 aiohttp 库有简单了解。

    新建一个 py 文件,验证下载类 Downloader 的功能。

    1. 建立一个函数备用,从网页抽取电影名并打印到屏幕上。
    from bs4 import BeautifulSoup
    
    def extract_movie_name(html):
        soup = BeautifulSoup(html, "html.parser")
        name_tags = soup.find_all(class_="m-b-sm")
        for name_tag in name_tags:
            print(name_tag.string)
    
    1. 创建下载类 Downloader

    Downloader 类主要有两个方法 get_asyncdownload

    download:打开一个 session,异步请求 url 列表中的所有 url

    get_async:请求网页并返回网页 html

    import asyncio
    from aiohttp import ClientSession
    
    class Downloader:
        async def get_async(self, session, url):
            async with session.get(url=url) as resp:
                return await resp.text()
    
        async def download(self):
            async with ClientSession() as session:
                url_lst = [
                    "https://ssr1.scrape.center/page/1",
                    "https://ssr1.scrape.center/page/2"
                    ]
                download_tasks = list()
                for url in url_lst:
                    download_task = asyncio.create_task(self.get_async(session, url))
                    download_tasks.append(download_task)
                for task in download_tasks:
                    await task
                    result = task.result()
                    extract_movie_name(result)
    
        def async_run(self):
            asyncio.run(self.download())
    
    1. 编写主函数 main
    if __name__ == "__main__":
        downloader = Downloader()
        downloader.async_run()
    

    此时,下载类能够正常运行。

    # 运行结果
    霸王别姬 - Farewell My Concubine
    这个杀手不太冷 - Léon
    肖申克的救赎 - The Shawshank Redemption
    ...
    

    整合下载类

    目前我们的下载类还是一个单独的功能,我们需要将下载方法整合进现有代码,采用多进程方法调用下载方法,并通过下载队列交换数据。

    1. 改造 Bridge

    增加下载队列相关功能,原有代码不变。

    download_queue:下载队列。

    put_download_queueget_download_queuedownload_queue_empty 的功能不言自明。

    class Bridge:
        def __init__(self):
            manager = Manager()
            self.download_queue = manager.Queue()
    
        def put_download_queue(self, workshop):
            self.download_queue.put_nowait(workshop)
    
        def get_download_queue(self):
            return self.download_queue.get_nowait()
    
        def download_queue_empty(self):
            return self.download_queue.empty()
    
    1. 改造 Workshop

    增加 urlneed_downloadhtml 三个属性

    class Workshop:
        def __init__(self,  url, need_download):
            self.url = url
            self.need_download = need_download
            self.html = None
            self._next_process = None
    
    1. 改造 MyWorkshop

    依据 Workshop 类的改变修改初始化代码,用本节的 extract_movie_name 方法稍加改造代替上节的两段模拟代码。

    class MyWorkshop(Workshop):
        def __init__(self, url, need_download):
            super().__init__(url, need_download)
            self.set_start_process(self.extract_movie_name)
    
        async def extract_movie_name(self):
            soup = BeautifulSoup(self.html, "html.parser")
            name_tags = soup.find_all(class_="m-b-sm")
            for name_tag in name_tags:
                print(name_tag.string)
            self.set_end()
    
    1. 改造 Downloader

    改造 async_run__init__ 方法,使其可以接收信息传递类 Bridge 并保存。

    增加 get_page 方法:接收 workshop,取出 url 交给 get_async 下载,下载好的 html 保存在 workshophtml 属性,之后置 workshopneed_download 属性为 False,返回 workshop

    修改 download 方法:和 works 一样采用 bridge.work_end() 判断是否程序结束,从 download_queue 下载队列中取得 workshop,交给 get_page 方法处理,返回的 workshop 放入任务队列 work_queue 中进行下一步处理。

    class Downloader:
        def __init__(self):
            self.bridge = None
    
        async def get_async(self, session, url):
            async with session.get(url=url) as resp:
                return await resp.text()
    
        async def get_page(self, session, workshop):
            workshop.html = await self.get_async(session, workshop.url)
            workshop.need_download = False
            return workshop
    
        async def download(self):
            while not self.bridge.work_end():
                async with ClientSession() as session:
                    download_tasks = list()
                    while not self.bridge.download_queue_empty():
                        workshop = self.bridge.get_download_queue()
                        task = asyncio.create_task(self.get_page(session, workshop))
                        download_tasks.append(task)
                    for task in download_tasks:
                        await task
                        workshop = task.result()
                        self.bridge.put_work_queue(workshop)
    
        def async_run(self, bridge):
            self.bridge = bridge
            asyncio.run(self.download())
    
    1. 改造 Works

    修改 run_works 方法:从 work_queue 拿到 workshop 后,判断其是否需要下载,如果需要下载就推入下载队列 download_queue 让下载进程下载。

    其余部分保持不变。

    class Works:
        async def run_works(self):
            self.bridge.flag_start()
            while not self.bridge.work_end():
                task_lst = list()
                while not self.bridge.work_queue_empty():
                    workshop = self.bridge.get_work_queue()
                    if workshop.need_download:
                        self.bridge.put_download_queue(workshop)
                        continue
                    task = asyncio.create_task(workshop.run_next_process())
                    task_lst.append(task)
                for task in task_lst:
                    await task
                    self.distribute_works(task)
    
    1. 改造 App

    下载进程作为一个新进程调用。

    class App:
        def __init__(self):
            self.works = Works()
            self.bridge = Bridge()
            self.download = Downloader()
    
        def async_run(self, workshop_lst):
            self.bridge.init_works(workshop_lst)
            p_run_works = Process(target=self.works.async_run,
                                  args=(self.bridge,))
            p_download = Process(target=self.download.async_run,
                                 args=(self.bridge,))
            p_run_works.start()
            p_download.start()
            p_run_works.join()
            p_download.join()
    
    1. 改造主函数 main

    在主函数中生成 Workshop 的列表,交给 App 执行即可。

    if __name__ == "__main__":
        work_lst = list()
        url_template = "https://ssr1.scrape.center/page/{}"
        for i in range(1, 11):
            url = url_template.format(str(i))
            work_lst.append(
                MyWorkshop(url=url, need_download=True)
            )
        app = App()
        app.async_run(work_lst)
    

    至此,程序已可正常执行。

    # 运行结果
    霸王别姬 - Farewell My Concubine
    ...
    魂断蓝桥 - Waterloo Bridge
    运行时间:2.26s
    

    本节完整代码

    import asyncio
    import time
    from functools import wraps
    from multiprocessing import Process
    from multiprocessing import Manager
    from aiohttp import ClientSession
    from bs4 import BeautifulSoup
    
    
    def print_run_time(func):
        # 记录运行程序运行时间的装饰器
        @wraps(func)  # 保持被装饰的函数名不变,否则多进程调用出错
        def wrap(*args, **kwargs):
            start = time.time()
            f = func(*args, **kwargs)
            end = time.time()
            print("运行时间:{:.2f}s".format(end - start))
            return f
        return wrap
    
    
    class Bridge:
        def __init__(self):
            manager = Manager()
            self.work_queue = manager.Queue()
            self.download_queue = manager.Queue()
            self.config_dict = manager.dict()
            self.init_config()
    
        def init_config(self):
            self.config_dict["running_work_cnt"] = 0
            self.config_dict["work_start_flag"] = False
    
        def init_works(self, workshop_lst):
            for workshop in workshop_lst:
                self.put_work_queue(workshop)
                self.work_cnt_increase()
    
        def flag_start(self):
            self.config_dict["work_start_flag"] = True
    
        def work_end(self):
            return self.config_dict["work_start_flag"]\
                   and not self.config_dict["running_work_cnt"]
    
        def work_cnt_increase(self):
            self.config_dict["running_work_cnt"] += 1
    
        def work_cnt_decrease(self):
            self.config_dict["running_work_cnt"] -= 1
    
        def put_work_queue(self, workshop):
            self.work_queue.put_nowait(workshop)
    
        def get_work_queue(self):
            return self.work_queue.get_nowait()
    
        def work_queue_empty(self):
            return self.work_queue.empty()
    
        def put_download_queue(self, workshop):
            self.download_queue.put_nowait(workshop)
    
        def get_download_queue(self):
            return self.download_queue.get_nowait()
    
        def download_queue_empty(self):
            return self.download_queue.empty()
    
    
    class Workshop:
        def __init__(self, url, need_download):
            self.url = url
            self.need_download = need_download
            self.html = None
            self._next_process = None
    
        def set_start_process(self, func):
            self._next_process = func
    
        def set_next_process(self, func):
            self._next_process = func
    
        def set_end(self):
            self._next_process = "/EOF"
    
        def is_end(self):
            return self._next_process == "/EOF"
    
        async def run_next_process(self):
            workshop = await self._next_process()
            if workshop:
                return workshop
            else:
                return self
    
    
    class Works:
        def __init__(self):
            self.bridge = None
    
        def distribute_works(self, task):
            workshop = task.result()
            if not workshop.is_end():
                self.bridge.put_work_queue(workshop)
            else:
                self.bridge.work_cnt_decrease()
    
        async def run_works(self):
            self.bridge.flag_start()
            while not self.bridge.work_end():
                task_lst = list()
                while not self.bridge.work_queue_empty():
                    workshop = self.bridge.get_work_queue()
                    if workshop.need_download:
                        self.bridge.put_download_queue(workshop)
                        continue
                    task = asyncio.create_task(workshop.run_next_process())
                    task_lst.append(task)
                for task in task_lst:
                    await task
                    self.distribute_works(task)
    
        @print_run_time
        def async_run(self, bridge):
            self.bridge = bridge
            asyncio.run(self.run_works())
    
    
    class Downloader:
        def __init__(self):
            self.bridge = None
    
        async def get_async(self, session, url):
            async with session.get(url=url) as resp:
                return await resp.text()
    
        async def get_page(self, session, workshop):
            workshop.html = await self.get_async(session, workshop.url)
            workshop.need_download = False
            return workshop
    
        async def download(self):
            while not self.bridge.work_end():
                async with ClientSession() as session:
                    download_tasks = list()
                    while not self.bridge.download_queue_empty():
                        workshop = self.bridge.get_download_queue()
                        task = asyncio.create_task(self.get_page(session, workshop))
                        download_tasks.append(task)
                    for task in download_tasks:
                        await task
                        workshop = task.result()
                        self.bridge.put_work_queue(workshop)
    
        def async_run(self, bridge):
            self.bridge = bridge
            asyncio.run(self.download())
    
    
    class App:
        def __init__(self):
            self.works = Works()
            self.bridge = Bridge()
            self.download = Downloader()
    
        def async_run(self, workshop_lst):
            self.bridge.init_works(workshop_lst)
            p_run_works = Process(target=self.works.async_run,
                                  args=(self.bridge, ))
            p_download = Process(target=self.download.async_run,
                                 args=(self.bridge,))
            p_run_works.start()
            p_download.start()
            p_run_works.join()
            p_download.join()
    
    
    class MyWorkshop(Workshop):
        def __init__(self, url, need_download):
            super().__init__(url, need_download)
            self.set_start_process(self.extract_movie_name)
    
        async def extract_movie_name(self):
            soup = BeautifulSoup(self.html, "html.parser")
            name_tags = soup.find_all(class_="m-b-sm")
            for name_tag in name_tags:
                print(name_tag.string)
            self.set_end()
    
    
    if __name__ == "__main__":
        work_lst = list()
        url_template = "https://ssr1.scrape.center/page/{}"
        for i in range(1, 11):
            url = url_template.format(str(i))
            work_lst.append(
                MyWorkshop(url=url, need_download=True)
            )
        app = App()
        app.async_run(work_lst)
    

    本节总结

    经过本节的改造,我们已经得到了一个简单的异步爬虫。针对一系列新网页,只需要继承 Workshop 类,实现自己的爬取流程代码即可。当然,目前它只能胜任最简单的工作,没有考虑错误处理、定制请求参数、代理、日志等一系列问题,这些需要在日后的使用中慢慢完善。

    相关文章

      网友评论

          本文标题:Python进程+协程——从零开始搭建异步爬虫(2)

          本文链接:https://www.haomeiwen.com/subject/mzzhfltx.html