美文网首页
Python基于进/线程池实现大数据量爬虫项目

Python基于进/线程池实现大数据量爬虫项目

作者: SlashBoyMr_wang | 来源:发表于2018-11-10 14:17 被阅读0次

    如今计算机已经进入多核CPU的时代了,使用多线程或多进程能够充分利用CPU多核性能来提高程序的执行效率。

    Python多任务的解决方案主要有以下三种:
    1.启动多进程,每个进程只有一个线程,通过多进程执行多任务;
    2.启动单进程(即多线程),在进程内启动多线程,通过多线程执行多任务;
    3.启动多进程,在每个进程内再启动多个线程,同时执行更多的任务;

    我们都知道,由于Cpython解释器存在全局GIL锁原因,无论是单核还是多核CPU,任意特定时刻只有一个线程会被Python解释器执行。但是创建一个线程操作系统花费的开销要比创建一个进程少很多。
    从这两个方面我们可以得出python多线程和多进程的选择的原则:

    多进程:高CPU利用型(计算密集型)
    多线程:低CPU利用型(I/O密集型)

    计算密集型特点
    计算密集型任务的特点是需要进行大量的计算,在整个时间片内始终消耗CPU的资源。由于GIL机制的原因多线程中无法利用多核参与计算,但多线程之间切换的开销时间仍然存在,因此多线程比单一线程需要更多的执行时间。而多进程中有各自独立的GIL锁互不影响,可以充分利用多核参与计算,加快了执行速度。

    I/O密集型特点
    I/O密集型任务的特点是CPU消耗很少,任务大部分时间都在等待I/O操作的完成(I/O速度远低于CPU和内存速度)

    python全局GIL锁
    Python代码的执行由Python解释器进行控制。目前Python的解释器有多种,如CPython、PyPy、Jython等,其中CPython为最广泛使用的Python解释器。理论上CPU是多核时支持多个线程同时执行,但在Python设计之初考虑到在Python解释器的主循环中执行Python代码,于是CPython中设计了全局解释器锁GIL(Global Interpreter Lock)机制用于管理解释器的访问,Python线程的执行必须先竞争到GIL权限才能执行。

    全局解释器锁GIL机制流程
      a、设置 GIL;
      b、切换到一个线程去运行;
      c、运行指定数量的字节码指令或者线程主动让出控制(可以调用 time.sleep(0));
      d、把线程设置为睡眠状态;
      e、解锁 GIL;
      d、再次重复以上所有步骤。

    池的概念:
      创建进程或线程都需要消耗时间,销毁进程/线程也需要消耗时间。即便开启了成千上万的进程/线程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程/线程。
    所以就有了池的概念。
      定义一个池子,在里面放上固定数量的进程/线程,有需求来了,就拿一个池中的进程/线程来处理任务,等到处理完毕,并不关闭,而是将进程/线程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程/线程数量不够,任务就要等待之前的进程/线程执行任务完毕归来,拿到空闲进程/线程才能继续执行。
      减少了进程/线程创建/销毁带来的消耗,同时又可以最大化的利用CPU。

    进程池/线程池数量的确定
    知道了池的概念,那进程/线程开启的数量该怎么确定呢?多少个进程/线程才能最大化利用CPU,并且不会给操作系统带来额外的消耗呢?

    进程数:CPU数量 < 进程数 < CPU数量*2
    线程数: CPU数量 * 5

    实际案例实现:
    http://db.pharmcube.com/database/cfda/detail/cfda_cn_instrument/135999
    爬取药监局13万条医疗器材名录

    方式一:python进程池实现13万+数据的爬取
      进程池的创建很简单,直接用multiprocessing.Pool类
      爬取的数据写入redis数据库中。

    ###进程池
    
    import requests
    from lxml import etree
    import redis
    from multiprocessing import Pool
    
    redis = redis.Redis()
    
    def get_msg(url,i):
        headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) '
                          'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36'
        }
        ret_html = requests.get(url=url, headers=headers)
        ret_text = ret_html.text
    
        tree = etree.HTML(ret_text)
        tr_list = tree.xpath("/html/body/div/table/tbody/tr")
        tr_list.remove(tr_list[7])
        dic = {}
        for tr in tr_list:
            name = tr.xpath("./td[1]/text()")[0] if tr.xpath("./td[1]/text()") else "kong"
            value = tr.xpath("./td[2]/text()")[0] if tr.xpath("./td[2]/text()") else "kong"
            dic[name] = value
    
        redis.hset("equipment","%s"%i,dic)
        print("已完成%s条信息"%i)
    
    if __name__ == '__main__':
        p = Pool(8)
    
        for i in range(1,136000):
            url = "http://db.pharmcube.com/database/cfda/detail/cfda_cn_instrument/%s"%i
            # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行
            # 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务
            # 需要注意的是,进程池中的三个进程不会同时开启或者同时结束
            # 而是执行完一个就释放一个进程,这个进程就去接收新的任务。 
            res = p.apply_async(get_msg,args=(url,i))
    
        # 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,
        # 等待进程池内任务都处理完,然后可以用get收集结果
        # 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
        #调用join之前,先调用close函数,否则会出错。执行完close后不会有
        # 新的进程加入到pool,join函数等待所有子进程结束
        p.close()
        p.join()
    

    方式二:python线程池爬取13w+数据
      线程池的创建有两种方式:
        第一种:multiprocessing.dummy.Pool
        第二种:基于queue队列来创建
      下面这个示例采用第一种方式创建。

    import requests
    from lxml import etree
    import redis
    from multiprocessing.dummy import Pool as Threadpool
    
    # 创建redis链接
    redis = redis.Redis()
    
    def get_msg(i):
        url = "http://db.pharmcube.com/database/cfda/detail/cfda_cn_instrument/%s" % i
        headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) '
                          'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36'
        }
        ret_html = requests.get(url=url, headers=headers)
        ret_text = ret_html.text
    
        tree = etree.HTML(ret_text)
        tr_list = tree.xpath("/html/body/div/table/tbody/tr")
        tr_list.remove(tr_list[7])
        dic = {}
        for tr in tr_list:
            name = tr.xpath("./td[1]/text()")[0] if tr.xpath("./td[1]/text()") else "kong"
            value = tr.xpath("./td[2]/text()")[0] if tr.xpath("./td[2]/text()") else "kong"
            dic[name] = value
    
        redis.hset("equipment", "%s" % i, dic)
        print("已完成%s条信息" % i)
    
    def main():
        th = Threadpool(20)
        th.map(get_msg, [i for i in range(1, 136000)])
    
    if __name__ == '__main__':
        main()
    

    方式三:python进程中开线程实现爬取数据:

    import requests
    from lxml import etree
    import redis
    from multiprocessing.dummy import Pool as Threadpool
    from multiprocessing import Pool
    import time
    
    # 创建redis链接
    redis = redis.Redis()
    
    def get_msg(i):
        url = "http://db.pharmcube.com/database/cfda/detail/cfda_cn_instrument/%s" % i
        headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) '
                          'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36'
        }
        ret_html = requests.get(url=url, headers=headers)
        ret_text = ret_html.text
    
        tree = etree.HTML(ret_text)
        tr_list = tree.xpath("/html/body/div/table/tbody/tr")
        tr_list.remove(tr_list[7])
        dic = {}
        for tr in tr_list:
            name = tr.xpath("./td[1]/text()")[0] if tr.xpath("./td[1]/text()") else "kong"
            value = tr.xpath("./td[2]/text()")[0] if tr.xpath("./td[2]/text()") else "kong"
            dic[name] = value
    
        redis.hset("Hequipment", "%s" % i, dic)
        print("已完成第%s条信息" % i)
    
    def main():
        th = Threadpool(10)
        th.map(get_msg, [i for i in range(1, 136000)])
    
    if __name__ == '__main__':
        start_time = time.time()
        p = Pool(5)
        for i in range(5):
            p.apply_async(main)
        p.close()
        p.join()
        stop_time= time.time()
        print("136000条数据总共用时%s s"%(start_time- stop_time))
    

    附:基于queue创建线程池的另一个例子:(6000+数据)
      http://125.35.6.84:81/xk/
      国家药品监督管理总局

    import requests
    import threading
    import redis
    import queue
    
    redis = redis.Redis()
    
    def get_msg(th, i):
        headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) '
                          'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36'
        }
        url = "http://125.35.6.84:81/xk/itownet/portalAction.do?method=getXkzsList"
    
        data = {
            'on': 'true',
            'page': i,
            'pageSize': 15,
            'productName': '',
            'conditionType': 1
        }
        proxies = {
            "http": "122.226.73.12:80",
        }
        req = requests.post(url=url, headers=headers, data=data, proxies=proxies)
    
        date_dict = req.json()
        print("正在下载第%s页" % i)
    
        for d in date_dict['list']:
            ID = d["ID"]
    
            n_data = {
                "id": ID
            }
            ret_url = "http://125.35.6.84:81/xk/itownet/portalAction.do?method=getXkzsById"
            text = requests.post(url=ret_url, data=n_data, headers=headers)
            redis.hset("business", ID, text.text)
    
        th.add_thread()  # 向队列中添加线程,保证线程的数量
    
    # 定义一个线程类
    class ThreadPool(object):
        def __init__(self, max_num):
            # 基于队列来设定线程的总个数
            self.queue = queue.Queue(max_num)
            for i in range(max_num):
                self.queue.put(threading.Thread)
    
        # 定义方法从队列中得到线程
        @property
        def get_thread(self):
            return self.queue.get()
    
        # 定义方法线程结束后向队列中添加线程,保证线程总数量
        def add_thread(self):
            self.queue.put(threading.Thread)
    
    if __name__ == '__main__':
    
        # 创建线程池类,执行类的init方法
        th = ThreadPool(20)
    
        for i in range(1, 314):
            # 从队列中获得线程,执行操作
            thread = th.get_thread
            cur_th = thread(target=get_msg, args=(th, i))
            cur_th.start()  # 开启线程
    

    以上两个网站的数据爬去都是合法的,放心!!!

    相关文章

      网友评论

          本文标题:Python基于进/线程池实现大数据量爬虫项目

          本文链接:https://www.haomeiwen.com/subject/sdrwxqtx.html