美文网首页Python多线程 多进程
Python爬虫实战以及多进程/多线程/多协程的效率问题

Python爬虫实战以及多进程/多线程/多协程的效率问题

作者: 纳米君 | 来源:发表于2018-07-29 23:51 被阅读98次
    需求:抓取网站https://www.edge.org/library所有的书名、作者以及作者的维基百科简介

    工具:PyCharm
    第三方库:requests、BeautifulSoup

    import requests
    from bs4 import BeautifulSoup
    

    爬虫有两种方式,一种是模拟请求获取返回数据即可,另一种需要解析HTML。本次爬虫方式就是后者。
    BeautifulSoup是解析HTML的库。

    初次写爬虫,一开始傻不拉几的用同步方式,依次获取所有的页面的数据,总共耗时2个多小时,令人发指。后来自己摸索,分别采用多进程/多线程/多协程的方式,效率惊人。

    说到进程池,Python提供了两种创建方式,直接上代码:

    import re
    import time
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    from multiprocessing.pool import Pool
    
    import requests
    import sys
    from bs4 import BeautifulSoup
    
    from book.ExcelUtils import ExcelUtils
    
    headers = {
        'user-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36'}
    
    
    # 获取书名、作者、作者wiki简介
    def get_book_info(url):
        book_dict = {'name': [], 'author': [], 'info': []}
        res = requests.get(url, headers=headers)
        if res.status_code == 200:
            find_book_name_list = re.findall('<div class="views-field views-field-title">.*?<a href=.*?>(.*?)</a>',
                                             res.text)
            book_dict['name'] = find_book_name_list
    
            find_author_list = re.findall(
                '<span class="views-field views-field-field-edge-author">.*?<a href=".*?">(.*?)</a>', res.text)
            book_dict['author'] = find_author_list
    
            # 获取作者wiki简介
            get_wiki_author_info(find_author_list, book_dict)
    
        else:
            print('get_book_info is failed, url:[%s]' % url)
    
        return book_dict
    
    
    # 根据作者名字获取作者wiki简介
    def get_wiki_author_info(find_author_list, book_dict):
        for name in find_author_list:
            url = 'https://en.wikipedia.org/wiki/%s' % name.replace(' ', '_')
            try:
                res = requests.get(url, headers=headers)
                if res.status_code == 200:
                    soup = BeautifulSoup(res.text, "html.parser")
                    soup_select = soup.select('#mw-content-text p')
                    if str.strip(soup_select[0].get_text()) != '':
                        if soup_select[0].get_text().find('may refer to') != -1:
                            # 存在多种选项
                            name_list = soup.select('#mw-content-text li a')
                            for index, item in enumerate(name_list):
                                if item.get_text().find(name) != -1:
                                    # 递归,默认只取第一个匹配的
                                    get_wiki_author_info([item.get_text()], book_dict)
                                    break
                        else:
                            book_dict['info'].append(soup_select[0].get_text())
                    else:
                        book_dict['info'].append(soup_select[1].get_text())
                else:
                    book_dict['info'].append('get failed, url:[%s]' % url)
                    print('get_wiki_author_info is failed, url:[%s]' % url)
    
            except:
                book_dict['info'].append('get exception, url:[%s]' % url)
    
    # 进程池的一种创建方式
    def pool_test(url_list):
        book_list = []
        # 创建进程池
        pool = Pool(20)
        start = time.time()
        for url in url_list:
            time.sleep(0.5)
            result = pool.apply_async(get_book_info, args=(url,))
            book_list.append(result)
    
        # 关闭进程池,不再接受新的进程,依旧处理未处理完的任务
        pool.close()
        # 主进程等待所有子进程执行完毕,必须在close或terminate之后
        pool.join()
        print('time: ', time.time() - start)
    
        book_name_list = []
        author_list = []
        author_info_list = []
        print('book_list: ', len(book_list))
        for v in book_list:
            book_name_list.extend(v.get()['name'])
            author_list.extend(v.get()['author'])
            author_info_list.extend(v.get()['info'])
    
        ExcelUtils.write_data_to_excel('bookInfo', book_name_list, author_list, author_info_list)
    
    # 线程池的创建方式
    def thread_pool_test(url_list):
        book_list = []
        # 创建线程池
        pool = ThreadPoolExecutor(max_workers=20)
        start = time.time()
        for url in url_list:
            time.sleep(0.5)
            result = pool.submit(get_book_info, url)
            book_list.append(result)
    
        pool.shutdown()
        print('time: ', time.time() - start)
    
        book_name_list = []
        author_list = []
        author_info_list = []
        print('book_list: ', len(book_list))
        for future in book_list:
            book_name_list.extend(future.result()['name'])
            author_list.extend(future.result()['author'])
            author_info_list.extend(future.result()['info'])
    
        ExcelUtils.write_data_to_excel('bookInfo', book_name_list, author_list, author_info_list)
    
    # 进程池的另外一种创建方式,跟线程池的创建方式一样。其方法等也相同。
    def process_pool_test(url_list):
        book_list = []
        # 创建进程池
        pool = ProcessPoolExecutor(max_workers=20)
        start = time.time()
        for url in url_list:
            time.sleep(0.5)
            result = pool.submit(get_book_info, url)
            book_list.append(result)
    
        pool.shutdown()
        print('time: ', time.time() - start)
    
        book_name_list = []
        author_list = []
        author_info_list = []
        print('book_list: ', len(book_list))
        for future in book_list:
            book_name_list.extend(future.result()['name'])
            author_list.extend(future.result()['author'])
            author_info_list.extend(future.result()['info'])
    
        ExcelUtils.write_data_to_excel('bookInfo', book_name_list, author_list, author_info_list)
    
    
    if __name__ == '__main__':
        sys.setrecursionlimit(10000)
        url_list = ['https://www.edge.org/library']
        for i in range(1, 52):
            url_list.append('https://www.edge.org/library?page=%s' % i)
    
        thread_pool_test(url_list)
    
    

    Python协程爬虫,代码相对于进程线程,稍微麻烦了一点,如下:
    import asyncio
    import aiohttp
    import re
    import time
    
    import sys
    from bs4 import BeautifulSoup
    
    from spider.ExcelUtils import ExcelUtils
    
    headers = {
        'user-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36'}
    
    
    # 获取书名、作者
    async def get_book_info(url, semaphore):
        book_dict = {'name': [], 'author': []}
        async with semaphore:
            async with aiohttp.ClientSession() as session:
                async with session.get(url, headers=headers) as res:
                    if res.status == 200:
                        text = await res.text()
                        find_book_name_list = re.findall(
                            '<div class="views-field views-field-title">.*?<a href=.*?>(.*?)</a>', text)
                        book_dict['name'] = find_book_name_list
    
                        find_author_list = re.findall(
                            '<span class="views-field views-field-field-edge-author">.*?<a href=".*?">(.*?)</a>', text)
                        book_dict['author'] = find_author_list
    
                    else:
                        print('get_book_info is failed, url:[%s]' % url)
    
        return book_dict
    
    
    # 根据作者名字获取作者wiki简介
    async def get_wiki_author_info(name, semaphore):
        url = 'https://en.wikipedia.org/wiki/%s' % name.replace(' ', '_')
        try:
            async with semaphore:
                async with aiohttp.ClientSession() as session:
                    async with session.get(url, headers=headers) as res:
                        if res.status == 200:
                            text = await res.text()
                            soup = BeautifulSoup(text, "html.parser")
                            soup_select = soup.select('#mw-content-text p')
                            if str.strip(soup_select[0].get_text()) != '':
                                if soup_select[0].get_text().find('may refer to') != -1:
                                    # 存在多种选项
                                    name_list = soup.select('#mw-content-text li a')
                                    for index, item in enumerate(name_list):
                                        if item.get_text().find(name) != -1:
                                            # 递归,默认只取第一个匹配的
                                            await get_wiki_author_info([item.get_text(), semaphore])
                                            break
                                else:
                                    return soup_select[0].get_text()
                            else:
                                return soup_select[1].get_text()
                        else:
                            print('get_wiki_author_info is failed, url:[%s]' % url)
                            return 'get failed, url:[%s]' % url
    
        except:
            return 'get exception, url:[%s]' % url
    
    
    if __name__ == '__main__':
        sys.setrecursionlimit(10000)
        url_list = ['https://www.edge.org/library']
        for i in range(1, 52):
            url_list.append('https://www.edge.org/library?page=%s' % i)
    
        # 信号量,限制并发数,类比20个线程
        semaphore = asyncio.Semaphore(20)
        loop = asyncio.get_event_loop()
        tasks = [asyncio.ensure_future(get_book_info(url, semaphore)) for url in url_list]
    
        start = time.time()
        loop.run_until_complete(asyncio.wait(tasks))
    
        book_name_list = []
        author_list = []
        print('book_list: ', len(tasks))
        for future in tasks:
            book_name_list.extend(future.result()['name'])
            author_list.extend(future.result()['author'])
        # 获取作者wiki简介
        wiki_tasks = [asyncio.ensure_future(get_wiki_author_info(name, semaphore)) for name in author_list]
        loop.run_until_complete(asyncio.wait(wiki_tasks))
        author_info_list = [future.result() for future in wiki_tasks]
        print('time: ', time.time() - start)
    
        ExcelUtils.write_data_to_excel('bookInfo', book_name_list, author_list, author_info_list)
    
    

    ExcelUtils工具类的代码如下:

    import xlwt
    
    
    class ExcelUtils:
    
        @staticmethod
        def def_style():
            style = xlwt.XFStyle()
            alignment = xlwt.Alignment()
            alignment.horz = xlwt.Alignment.HORZ_CENTER  # 水平居中
            alignment.vert = xlwt.Alignment.VERT_CENTER  # 垂直居中
            style.alignment = alignment
    
            return style
    
        @staticmethod
        def write_data_to_excel(excel_name, book_name_list, author_list, author_info_list):
            # 实例化一个Workbook()对象(即excel文件)
            wbk = xlwt.Workbook()
            # 新建一个名为Sheet1的excel sheet。此处的cell_overwrite_ok =True是为了能对同一个单元格重复操作。
            sheet = wbk.add_sheet('Sheet1', cell_overwrite_ok=True)
            col_1 = sheet.col(0)
            col_2 = sheet.col(1)
            col_3 = sheet.col(2)
            col_1.width = 256 * 45
            col_2.width = 256 * 30
            col_3.width = 256 * 200
    
            sheet.write(0, 0, '书名', ExcelUtils.def_style())
            sheet.write(0, 1, '作者', ExcelUtils.def_style())
            sheet.write(0, 2, '作者简介', ExcelUtils.def_style())
    
            for i in range(len(author_info_list)):
                sheet.write(i + 1, 0, book_name_list[i])
                sheet.write(i + 1, 1, author_list[i], ExcelUtils.def_style())
                sheet.write(i + 1, 2, author_info_list[i])
    
            wbk.save(excel_name + '.xls')
    

    四种方式每次的实验时间都不同,这里只取其中一次的执行时间,如下:

    多进程 Pool(20)
    time: 215.41232109069824秒
    数据总数:1421
    
    多进程 ProcessPoolExecutor(max_workers=20)
    time:  202.88900017738342秒
    数据总数:1481
    
    
    多线程:ThreadPoolExecutor(max_workers=20)
    time: 198.67899990081787秒
    数据总数:1481
    
    协程: async
    time:  166.4319999217987秒
    数据总数:1481
    

    可以看出这四者的效率差别并不大,由于每次时间都不同,也不能确切的说哪种方式更优。

    不过根据前面讲的进程、线程、协程,一般情况下。协程执行效率是高于线程的,Pool的效率高于ProcessPoolExecutor。但是协程是单线程的,无法利用多核CPU,组合应用的时候,推荐Pool + 协程。

    相关文章

      网友评论

        本文标题:Python爬虫实战以及多进程/多线程/多协程的效率问题

        本文链接:https://www.haomeiwen.com/subject/xxfdmftx.html