美文网首页Python中文社区python机器学习爬虫大数据 爬虫Python AI Sql
[Python] 多进程 多线程 协程及其延伸 代码实例

[Python] 多进程 多线程 协程及其延伸 代码实例

作者: 半为花间酒 | 来源:发表于2020-04-01 12:08 被阅读0次

    上一篇文章爬取生信坑实例已经实战爬取了论坛全部帖子的信息,实现了目的后发现了很多新的思路,也有许多值得改进的地方。
    今天就利用多个库实现多进程 多线程 协程加速爬取信息

    本文对不深入介绍理论和原理,一切都在代码中


    注:为了方便说明问题,下文代码中如果是新增加的部分,代码行前会加上 > 符号便于观察,实际运行需要去掉


    一、同步

    首先对之前的源码进行简化,对各个功能细分,有意识进行函数式编程

    parse_1函数可以组成79个页面url,然后一次循环就将url传入parse_2函数解析页面。为了方便演示,解析页码url的循环迭代用for循环举例

    import requests
    from lxml import html
    
    def parse_1():
        url = 'https://www.bioinfo.info/?/sort_type-new__day-0__is_recommend-0__page-{}'
        for i in range(1, 80):
            parse_2(url.format(num))
    
    def parse_2(url):
        response = requests.get(url)
        response.encoding = 'utf-8'
        selector = html.fromstring(response.text)
        questions = selector.xpath("//div[@class='aw-question-content']")
        for i in questions:
            title = i.xpath("h4/a/text()")[0]
            link = i.xpath("h4/a/@href")[0]
            print(title, link)
    
    if __name__ == '__main__':
        parse_page()
    

    性能的消耗主要在IO请求中,当单进程单线程模式下请求URL时必然会引起等待

    示例代码就是典型的串行逻辑,parse_1产生新的url传递给parse_2执行,parse_2解析结束后parse_1继续迭代一次,重复之前步骤

    二、多线程

    因为CPU在执行程序时每个时间刻度上只会存在一个线程,因此多线程实际上提高了进程的使用率从而提高了CPU的使用率

    实现多线程的库有很多,这里用concurrent.futures中的ThreadPoolExecutor来演示。介绍ThreadPoolExecutor库是因为它相比其他库,代码更简洁

    import requests
    from lxml import html
    > from concurrent.futures import ThreadPoolExecutor
    
    def parse_1():
        url = 'https://www.bioinfo.info/?/sort_type-new__day-0__is_recommend-0__page-{}'
        # 建立线程池
        > pool = ThreadPoolExecutor(6)
        for i in range(1, 80):
            > pool.submit(parse_2, url.format(num))
        > pool.shutdown(wait=True)
    
    def parse_2(url):
        response = requests.get(url)
        response.encoding = 'utf-8'
        selector = html.fromstring(response.text)
        questions = selector.xpath("//div[@class='aw-question-content']")
        for i in questions:
            title = i.xpath("h4/a/text()")[0]
            link = i.xpath("h4/a/@href")[0]
            print(title, link)
    
    if __name__ == '__main__':
        parse_page()
    

    跟同步相对的就是异步

    异步就是彼此独立,在等待某事件的过程中继续做自己的事,不需要等待这一事件完成后再工作。线程就是实现异步的一个方式,也就是说多线程是异步处理

    异步就意味着不知道处理结果,有时候我们需要了解处理结果,就可以采用回调

    import requests
    from lxml import html
    from concurrent.futures import ThreadPoolExecutor
    
    # 增加回调函数
    > def callback(future):
        > print(future.result())
    
    def parse_1():
        url = 'https://www.bioinfo.info/?/sort_type-new__day-0__is_recommend-0__page-{}'
        pool = ThreadPoolExecutor(6)
        for i in range(1, 80):
            > results = pool.submit(parse_2, url.format(num))
            # 回调的关键步骤
            > results.add_done_callback(callback)
        pool.shutdown(wait=True)
    
    def parse_2(url):
        response = requests.get(url)
        response.encoding = 'utf-8'
        selector = html.fromstring(response.text)
        questions = selector.xpath("//div[@class='aw-question-content']")
        for i in questions:
            title = i.xpath("h4/a/text()")[0]
            link = i.xpath("h4/a/@href")[0]
            print(title, link)
    
    if __name__ == '__main__':
        parse_page()
    

    python实现多线程有一个无数人诟病的GIL(全局解释器锁),但多线程对于爬取网页这种多数属于IO密集型的任务依旧很合适

    三、多进程

    多进程用两个方法实现:ProcessPoolExecutor和multiprocessing

    1. ProcessPoolExecutor

    和实现多线程的ThreadPoolExecutor类似

    import requests
    from lxml import html
    > from concurrent.futures import ProcessPoolExecutor
    
    def parse_1():
        url = 'https://www.bioinfo.info/?/sort_type-new__day-0__is_recommend-0__page-{}'
        # 建立线程池
        > pool = ProcessPoolExecutor(6)
        for i in range(1, 80):
            > pool.submit(parse_2, url.format(num))
        > pool.shutdown(wait=True)
    
    def parse_2(url):
        response = requests.get(url)
        response.encoding = 'utf-8'
        selector = html.fromstring(response.text)
        questions = selector.xpath("//div[@class='aw-question-content']")
        for i in questions:
            title = i.xpath("h4/a/text()")[0]
            link = i.xpath("h4/a/@href")[0]
            print(title, link)
    
    if __name__ == '__main__':
        parse_page()
    

    可以看到改动了两次类名,代码依旧很简洁

    同理也可以添加回调函数

    import requests
    from lxml import html
    from concurrent.futures import ProcessPoolExecutor
    
    > def callback(future):
        > print(future.result())
    
    def parse_1():
        url = 'https://www.bioinfo.info/?/sort_type-new__day-0__is_recommend-0__page-{}'
        pool = ProcessPoolExecutor(6)
        for i in range(1, 80):
            > results = pool.submit(parse_2, url.format(num))
            > results.add_done_callback(callback)
        pool.shutdown(wait=True)
    
    def parse_2(url):
        response = requests.get(url)
        response.encoding = 'utf-8'
        selector = html.fromstring(response.text)
        questions = selector.xpath("//div[@class='aw-question-content']")
        for i in questions:
            title = i.xpath("h4/a/text()")[0]
            link = i.xpath("h4/a/@href")[0]
            print(title, link)
    
    if __name__ == '__main__':
        parse_page()
    
    2. multiprocessing
    import requests
    from lxml import html
    > from multiprocessing import Pool
    
    def parse_1():
        url = 'https://www.bioinfo.info/?/sort_type-new__day-0__is_recommend-0__page-{}'
        # 建池
        > pool = Pool(processes=5) 
        # 存放结果
        > res_lst = []
        for i in range(1, 80):
            # 把任务加入池中
            > res = pool.apply_async(func=parse_2, args=(url.format(num),))
            # 获取完成的结果(需要取出)
            > res_lst.append(res)
        # 存放最终结果(也可以直接存储或者print)
        > good_res_lst = [] 
        > for res in res_lst:
            # 利用get获取处理后的结果
            > good_res = res.get()
            # 判断结果的好坏
            > if good_res:
                > good_res_lst.append(good_res)
        # 关闭和等待完成
        > pool.close()
        > pool.join()
    
    def parse_2(url):
        response = requests.get(url)
        response.encoding = 'utf-8'
        selector = html.fromstring(response.text)
        questions = selector.xpath("//div[@class='aw-question-content']")
        for i in questions:
            title = i.xpath("h4/a/text()")[0]
            link = i.xpath("h4/a/@href")[0]
            print(title, link)
    
    if __name__ == '__main__':
        parse_page()
    

    可以看到multiprocessing库的代码稍繁琐,但支持更多的拓展

    多进程和多线程确实能够达到加速的目的,但如果遇到IO阻塞会出现线程或者进程的浪费,因此有一个更好的方法……

    四、异步非阻塞

    协程+回调配合动态协作就可以达到异步非阻塞的目的,本质只用了一个线程,所以很大程度利用了资源

    实现异步非阻塞经典是利用asyncio库+yield,为了方便利用逐渐出现了更上层的封装 aiohttp,要想更好的理解异步非阻塞最好还是深入了解asyncio库

    gevent是一个非常方便实现协程的库

    import requests
    from lxml import html
    > from gevent import monkey
    # 猴子补丁是协作运行的灵魂
    > monkey.patch_all()
    > import gevent
    
    def parse_1():
        url = 'https://www.bioinfo.info/?/sort_type-new__day-0__is_recommend-0__page-{}'
        # 建立任务列表
        > tasks_list = []
        for i in range(1, 80):
            > task = gevent.spawn(parse_2, url.format(num))
            > tasks_list.append(task)
        > gevent.joinall(tasks_list)
    
    def parse_2(url):
        response = requests.get(url)
        response.encoding = 'utf-8'
        selector = html.fromstring(response.text)
        questions = selector.xpath("//div[@class='aw-question-content']")
        for i in questions:
            title = i.xpath("h4/a/text()")[0]
            link = i.xpath("h4/a/@href")[0]
            print(title, link)
    
    if __name__ == '__main__':
        parse_page()
    

    gevent能很大提速,也引入了新的问题:
    如果我们不想速度太快给服务器造成太大负担怎么办?

    如果是多进程多线程的建池方法,可以控制池内数量。如果用gevent想要控制速度也有一个不错的方法:建立队列

    gevent中也提供了Quene类,下面代码改动较大

    import requests
    from lxml import html
    from gevent import monkey
    monkey.patch_all()
    import gevent
    > from gevent.queue import Queue
    
    def parse_1():
        url = 'https://www.bioinfo.info/?/sort_type-new__day-0__is_recommend-0__page-{}'
        tasks_list = []
        # 实例化队列
        > quene = Queue()
        for i in range(1, 80):
            # 全部url压入队列
            > quene.put_nowait(url.format(num))
        # 两路队列
        > for _ in range(2):
            > task = gevent.spawn(parse_2)
            > tasks_list.append(task)
        gevent.joinall(tasks_list)
    
    # 不需要传入参数,都在队列中
    > def parse_2():
        # 循环判断队列是否为空
        > while not quene.empty():
            # 弹出队列
            > url = quene.get_nowait()
            response = requests.get(url)
            # 判断队列状态 
            > print(quene.qsize(), response .status_code)
            response.encoding = 'utf-8'
            selector = html.fromstring(response.text)
            questions = selector.xpath("//div[@class='aw-question-content']")
            for i in questions:
                title = i.xpath("h4/a/text()")[0]
                link = i.xpath("h4/a/@href")[0]
                print(title, link)
    
    if __name__ == '__main__':
        parse_page()
    

    写在最后:
    以上就是几种常用的加速方法,测试网站依然是生信坑。如果对代码测试感兴趣可以加上time判断运行时间

    适当控制速度也是爬虫工作者的良好习惯,不要给服务器太大压力

    相关文章

      网友评论

        本文标题:[Python] 多进程 多线程 协程及其延伸 代码实例

        本文链接:https://www.haomeiwen.com/subject/czdwuhtx.html