美文网首页
使用multiprocessing与requests进行爬虫制作

使用multiprocessing与requests进行爬虫制作

作者: 奕剑听雨 | 来源:发表于2017-12-28 19:19 被阅读83次

    导读疑问:1、为什么用multiprocessing? 2、Multiprocessing中有哪些方法? 3、本次爬虫中如何使用多进程提升爬虫效率?


    Ironman.jpg

    1.为什么要使用multiprocessing:简单的爬虫一般都是单线程的,因为处理的数据量比较少,例如一些公司用的爬取数据的爬虫,只是定时执行爬取某日单个数据,哪这样的话多进程没有必要。但是当我们要爬取的数据有很大数目时,多线程或者多进程的必要性就显现了。在python处理这类问题时,一般有Threading和multiprocessing两个库可以进行,而Threadings实质上市进行多线程活动,在爬取性能上提升较少,而multiprocessing根据电脑CPU核数可以进行多进程并发任务,在性能上提升非常明显。

    2.在以下例子中并没有将multiprocessing的方法全部展示,我只用到了Pool,map以及join和close;原因是在我的爬虫中需要处理的数据量有上十万的时候,单单使用apply等方法不能满足要求,而使用进程池pool则可以让处理更便捷。使用map是因为在爬虫中间过程中我也使用了多进程,需要取得进程执行结果,而使用apply_async则只能获取进程结果在内存中保存位置。

    3.在本次爬虫中我有三处使用多进程,包括通过page_number获取pic_id和catalogid、通过pic_id与catalogid获得指向下载前连接url、通过下载前连接指向下载中间html中的目标url,在处理过程中每个数据集合都是十万级的,因此多进程非常重要。

    以下只针对中间部分方法进行讲解,最后贴上全部代码。

        def get_single_result(self, page_num):#单个页面处理
            dict_middle_result = {}
            true_result = {}
            keyword = self.search_keyword
            url = "http://search.quanjing.com/search?key=%s&pageSize=200&pageNum=%s&imageType=2&sortType=1&callback=searchresult&_=1513818809900"%(keyword, page_num)
            try:
                response = requests.get(url)#接口请求
                content = response.content#请求结果获得
                result_2 = re.findall(r'\"imglist\"\:(.*)', content, re.S)[0]
                result_3 = re.findall(r'\{\"pic.*\"\}', result_2, re.S)[0]#对信息使用正则表达式进行筛选
                result_4 = result_3.split(',{')
                for j in range(0, len(result_4)):
                    if j == 0:
                        dict_ = json.loads(result_4[j])
                        dict_middle_result[dict_['pic_id']] = dict_['catalogid']
                    else:
                        strlog = '{'+result_4[j]
                        dict_ = json.loads(strlog)
                        dict_middle_result[dict_['pic_id']] = dict_['catalogid']#通过关键字获得pic_id和catalogid等信息
                true_result[page_num] = dict_middle_result
                return true_result#正常返回{5:{pic_id:catalogid,.....}}
            except:
                false_result = {}
                false_result[page_num] = 'fail'
                return false_result#异常处理,返回字典{5:'fail'}
    

    以上是一个通过上一接口请求获得总搜索获得页面数,通过访问页面数取得pic_id和catalogid的方法。
    其中进行接口请求,对取得结果使用正则表达式进行切割取得最终需要的结果存入字典,因为需要使用到map,所以我们使用map方法时要提供一个可迭代对象,必然需要创建一个单个处理方法,用于处理迭代。在处理中使用带了try....except来处理异常,因为爬虫在短时间内向服务器发起的请求很多,非常容易被服务器拒绝,所以对异常的page_num也要进行保存,以待后续处理。

        def get_middle_result_by_page_numbers(self):#多进程获取中间结果
            final_result = []
            list_for_pagenum = self.get_url_page_number_by_keyword()#此处为获得所有page_num的方法
            flag = 0
            while flag < 4:#循环执行
                if len(list_for_pagenum) == 0:
                    break
                elif flag == 3:
                    break
                else:
                    p = multiprocessing.Pool(processes=5)#建立4进程
                    res = p.map(self.get_single_result, list_for_pagenum)  #进程执行
                    p.close()#锁定进程
                    p.join()#进程空闲则加入请求
                    list_for_pagenum = []
                    for item in res:#{5:{pic_id:catalogid,.....}
                        for key in item:
                            if item[key] == 'fail':
                                list_for_pagenum.append(key)#异常page_num返回继续处理
                            else:
                                for i in item[key]:
                                    dict_ = {}
                                    dict_[i] = item[key][i]
                                    final_result.append(dict_)
                    flag += 1
            return final_result#返回列表结果,其中元素为多个字典
    
    

    以上代码中,使用while循环处理3次(因为效率关系,没有进行while True死循环),每次处理完将可迭代对象list_for_page_num清空并将请求失败的page_num追加至列表继续处理。map方法需要的主要参数为单个处理方法get_single_result和一个用来提供对象给方法处理的可迭代对象。map处理完后将结果保存至res中,这是一个列表。方法返回为一个含有多个字典的列表用于后续多进程处理。

    全部代码如下:

    import requests
    import json
    import re
    import sys
    import random
    import os
    import time
    import multiprocessing
    from multiprocessing import Pool
    from time import sleep
    import copy_reg
    import types
    
    
    def _pickle_method(m):
        if m.im_self is None:
            return getattr, (m.im_class, m.im_func.func_name)
        else:
            return getattr, (m.im_self, m.im_func.func_name)
    
    copy_reg.pickle(types.MethodType, _pickle_method)#使用copy_reg将MethodType注册为可序列化的方法
    
    reload(sys)
    sys.setdefaultencoding('utf-8')#初始化编码防止乱码
    
    
    class PictureSpider(object):
    
        def __init__(self, search_keyword):
            self. search_keyword = search_keyword
    
        def make_saving_path(self):#创建保存文件
            key = self.search_keyword.encode('gbk')
            directory_path = 'F:\\test_auto\\spider\\pictures\\%s' % key
            os.mkdir(directory_path)
            return directory_path
    
        def get_url_page_number_by_keyword(self):#通过关键字取得下载中间结果字典(包含pic_id和catalogid的字典)
            list_for_pagenum = []
            keyword = self.search_keyword
            dict_middle_result = {}
            url = "http://search.quanjing.com/search?key=%s&pageSize=200&pageNum=1&imageType=2&sortType=1&callback=searchresult&_=1513818809900"%keyword
            response = requests.get(url)
            content = response.content
            result_1 = re.findall(r'\"pagecount\".*?\d+', content, re.S)
            page_number = str(result_1[0].split(":")[-1])#总页数对应pageNum
            for i in range(1, int(page_number)+1):
                list_for_pagenum.append(i)
            return list_for_pagenum
    
        def get_single_result(self, page_num):#单个页面处理
            dict_middle_result = {}
            true_result = {}
            keyword = self.search_keyword
            url = "http://search.quanjing.com/search?key=%s&pageSize=200&pageNum=%s&imageType=2&sortType=1&callback=searchresult&_=1513818809900"%(keyword, page_num)
            try:
                response = requests.get(url)
                content = response.content
                result_2 = re.findall(r'\"imglist\"\:(.*)', content, re.S)[0]
                result_3 = re.findall(r'\{\"pic.*\"\}', result_2, re.S)[0]#对信息使用正则表达式进行筛选
                result_4 = result_3.split(',{')
                for j in range(0, len(result_4)):
                    if j == 0:
                        dict_ = json.loads(result_4[j])
                        dict_middle_result[dict_['pic_id']] = dict_['catalogid']
                    else:
                        strlog = '{'+result_4[j]
                        dict_ = json.loads(strlog)
                        dict_middle_result[dict_['pic_id']] = dict_['catalogid']#通过关键字获得pic_id和catalogid等信息
                true_result[page_num] = dict_middle_result
                return true_result#正常返回{5:{pic_id:catalogid,.....}}
            except:
                false_result = {}
                false_result[page_num] = 'fail'
                return false_result#异常处理,返回字典{5:'fail'}
    
        def get_middle_result_by_page_numbers(self):
            final_result = []
            list_for_pagenum = self.get_url_page_number_by_keyword()
            flag = 0
            while flag < 3:
                if len(list_for_pagenum) == 0:
                    break
                elif flag == 3:
                    break
                else:
                    p = multiprocessing.Pool(processes=5)#建立4进程
                    res = p.map(self.get_single_result, list_for_pagenum)  #进程执行
                    p.close()
                    p.join()
                    list_for_pagenum = []
                    for item in res:#{5:{pic_id:catalogid,.....}
                        for key in item:
                            if item[key] == 'fail':
                                list_for_pagenum.append(key)
                            else:
                                for i in item[key]:
                                    dict_ = {}
                                    dict_[i] = item[key][i]
                                    final_result.append(dict_)
                    flag += 1
            return final_result#返回列表结果,其中元素为多个字典
    
        def get_url_by_picid(self, pic_id_dict):#单个获得最终下载链接过程
            pic_id_list = []
            for i in pic_id_dict:
                pic_id_list.append(i)
            pic_id = pic_id_list[0]
            catalogid = pic_id_dict[pic_id]
            url = "http://www.quanjing.com/downcomp.aspx?pic_id=%s&Catalogid=%s" % (pic_id, catalogid)
            response = requests.get(url)
            content = response.content
            final_url_1 = re.findall(r'<script.*</script>', content, re.S)[0]
            final_url_2 = re.findall(r'document.location.href =(.*)\;<{1}', final_url_1, re.S)
            final_url_3 = final_url_2[0]#正则表达式处理获得信息
            final_url = re.findall(r'\'(.*)\'', final_url_3, re.S)[0]#取得最终下载URL
            return final_url
    
        def get_urls_by_middle_result(self):#多进行进行下载链接获取
            final_result = self.get_middle_result_by_page_numbers()
            p = multiprocessing.Pool(processes=4)#建立4进程
            res = p.map(self.get_url_by_picid, final_result)  #进程执行
            p.close()
            p.join()
            return res
    
        def get_picture_by_url(self, url):#单进程下载图片
            key = self.search_keyword.encode('gbk')
            try:
                directory = self.make_saving_path()
            except:
                fold_path = 'F:\\test_auto\\spider\\pictures\\%s' % key
            else:
                fold_path = directory
            picture_name = key+str(random.randint(10000, 99999))+'.jpg'#定义保存文件名
            picture_path = '%s\\%s' % (fold_path, picture_name)
            if os.path.exists(picture_path):
                picture_name = key+str(random.randint(1000, 9999))+'.jpg'
                picture_path = '%s\\%s' % (fold_path, picture_name)
            try:
                response = requests.get(url)
                content = response.content
                with open(picture_path, 'wb') as fi:
                    fi.write(content)
                    fi.close()
                picture_size = os.path.getsize(picture_path)
                if picture_size < 7000:#下载文件出错异常处理
                    os.remove(picture_path)
                    fail_url = url
                    dict_ = {}
                    dict_[fail_url] = 'fail'
                    return dict_
                else:
                    print "%s下载完成..." % picture_name
                    success_url = url
                    dict_ = {}
                    dict_[success_url] = 'success'
                    return dict_#下载成功返回数据
            except:#连接异常处理
                print "%s连接失败..." % url
                dict_ = {}
                dict_[url] = 'fail'
                return dict_#下载失败返回数据
    
    
    def main():
        start_time = time.time()
        keyword = raw_input('请输入需要搜索下载的图片关键字:')
        spider = PictureSpider(keyword)
        url_pool = spider.get_urls_by_middle_result()
        picture_num = len(url_pool)
        print "根据关键字一共搜索到%s,现在开始下载..." % picture_num
        while True:  #因为多进程下载存在文件破损或者服务器因为访问过于频繁而出错,所以将破损文件URL加入URL池无限循环直到全部下载完成
            if len(url_pool) == 0:
                break
            else:
                p = multiprocessing.Pool(processes=4)#建立4进程
                res = p.map(spider.get_picture_by_url, url_pool)  #进程执行
                p.close()
                p.join()
                url_pool = []
                for item in res:
                    for key in item:
                        if item[key] == 'fail':
                            url_pool.append(key)  #损坏文件重新加入URL池重新下载
        end_time = time.time()
        time_used = end_time - start_time#统计下载用时
        print "全部下载完成,用时%s秒。" % time_used
    
    
    if __name__ == '__main__':
        main()
    

    代码执行环境为python27,需要使用到的库都可以用pip install+库名安装,初学者,欢迎批评。


    远山.jpg

    相关文章

      网友评论

          本文标题:使用multiprocessing与requests进行爬虫制作

          本文链接:https://www.haomeiwen.com/subject/bhcvgxtx.html