美文网首页
python入门难?十之八九是因为python 协程吧!

python入门难?十之八九是因为python 协程吧!

作者: 梦想橡皮擦 | 来源:发表于2022-01-31 22:31 被阅读0次

    本篇博客补充一下协程并发数控制相关知识点。

    在正式编码前,先介绍一下本篇博客要采集的站点:【看历史,通天下-历史剧网】

    目标数据是该站点下的热门历史事件,列表页分页规则如下所示:

    http://www.lishiju.net/hotevents/p0
    http://www.lishiju.net/hotevents/p1
    http://www.lishiju.net/hotevents/p2
    

    首先我们通过普通的多线程,对该数据进行采集,由于本文主要目的是学习如何控制并发数,所以每页仅输出历史事件的标题内容。

    普通的多线程代码

    import threading
    import time
    
    import requests
    from bs4 import BeautifulSoup
    
    
    class MyThread(threading.Thread):
        def __init__(self, url):
            threading.Thread.__init__(self)
            self.__url = url
    
        def run(self):
            res = requests.get(url=self.__url)
            soup = BeautifulSoup(res.text, 'html.parser')
            title_tags = soup.find_all(attrs={'class': 'item-title'})
            event_names = [item.a.text for item in title_tags]
            print(event_names)
            print("")
    
    
    if __name__ == "__main__":
        start_time = time.perf_counter()
        threads = []
        for i in range(111):  # 创建了110个线程。
            threads.append(MyThread(url="http://www.lishiju.net/hotevents/p{}".format(i)))
        for t in threads:
            t.start()  # 启动了110个线程。
    
        for t in threads:
            t.join()  # 等待线程结束
    
        print("累计耗时:", time.perf_counter() - start_time)
        # 累计耗时: 1.537718624
    

    上述代码同时开启所有线程,累计耗时 1.5 秒,程序采集结束。

    多线程之信号量

    python 信号量(Semaphore)用来控制线程并发数,信号量管理一个内置的计数器。
    信号量对象每次调用其 acquire() 方法时,信号量计数器执行 -1 操作,调用 release() 方法,计数器执行 +1 操作,当计数器等于 0 时,acquire() 方法会阻塞线程,一直等到其它线程调用 release() 后,计数器重新 +1,线程的阻塞才会解除。

    使用 threading.Semaphore() 创建一个信号量对象。

    修改上述并发代码:

    import threading
    import time
    
    import requests
    from bs4 import BeautifulSoup
    
    
    class MyThread(threading.Thread):
        def __init__(self, url):
            threading.Thread.__init__(self)
            self.__url = url
    
        def run(self):
            if semaphore.acquire():  # 计数器 -1
                print("正在采集:", self.__url)
                res = requests.get(url=self.__url)
                soup = BeautifulSoup(res.text, 'html.parser')
                title_tags = soup.find_all(attrs={'class': 'item-title'})
                event_names = [item.a.text for item in title_tags]
                print(event_names)
                print("")
                semaphore.release()  # 计数器 +1
    
    
    if __name__ == "__main__":
        semaphore = threading.Semaphore(5)  # 控制每次最多执行 5 个线程
        start_time = time.perf_counter()
        threads = []
        for i in range(111):  # 创建了110个线程。
            threads.append(MyThread(url="http://www.lishiju.net/hotevents/p{}".format(i)))
        for t in threads:
            t.start()  # 启动了110个线程。
    
        for t in threads:
            t.join()  # 等待线程结束
    
        print("累计耗时:", time.perf_counter() - start_time)
        # 累计耗时: 2.8005530640000003
    

    当控制并发线程数量之后,累计耗时变多。

    补充知识点之 GIL

    GIL 是 python 里面的全局解释器锁(互斥锁),在同一进程,同一时间下,只能运行一个线程,这就导致了同一个进程下多个线程,只能实现并发而不能实现并行

    需要注意 python 语言并没有全局解释锁,只是因为历史的原因,在 CPython 解析器中,无法移除 GIL,所以使用 CPython 解析器,是会受到互斥锁影响的。

    还有一点是在编写爬虫程序时,多线程比单线程性能是有所提升的,因为遇到 I/O 阻塞会自动释放 GIL 锁。

    协程中使用信号量控制并发

    下面将信号量管理并发数,应用到协程代码中,在正式编写前,使用协程写法重构上述代码。

    import time
    
    import asyncio
    import aiohttp
    from bs4 import BeautifulSoup
    
    
    async def get_title(url):
        print("正在采集:", url)
        async with aiohttp.request('GET', url) as res:
            html = await res.text()
            soup = BeautifulSoup(html, 'html.parser')
            title_tags = soup.find_all(attrs={'class': 'item-title'})
            event_names = [item.a.text for item in title_tags]
            print(event_names)
    
    
    
    async def main():
        tasks = [asyncio.ensure_future(get_title("http://www.lishiju.net/hotevents/p{}".format(i))) for i in range(111)]
        dones, pendings = await asyncio.wait(tasks)
        # for task in dones:
        #     print(len(task.result()))
    
    
    if __name__ == '__main__':
        start_time = time.perf_counter()
        asyncio.run(main())
        print("代码运行时间为:", time.perf_counter() - start_time)
        # 代码运行时间为: 1.6422313430000002
    

    代码一次性并发 110 个协程,耗时 1.6 秒执行完毕,接下来就对上述代码,增加信号量管理代码。

    核心代码是 semaphore = asyncio.Semaphore(10),控制事件循环中并发的协程数量。

    import time
    
    import asyncio
    import aiohttp
    from bs4 import BeautifulSoup
    
    
    async def get_title(semaphore, url):
        async with semaphore:
            print("正在采集:", url)
            async with aiohttp.request('GET', url) as res:
                html = await res.text()
                soup = BeautifulSoup(html, 'html.parser')
                title_tags = soup.find_all(attrs={'class': 'item-title'})
                event_names = [item.a.text for item in title_tags]
                print(event_names)
    
    
    async def main():
        semaphore = asyncio.Semaphore(10)  # 控制每次最多执行 10 个线程
        tasks = [asyncio.ensure_future(get_title(semaphore, "http://www.lishiju.net/hotevents/p{}".format(i))) for i in
                 range(111)]
        dones, pendings = await asyncio.wait(tasks)
        # for task in dones:
        #     print(len(task.result()))
    
    
    if __name__ == '__main__':
        start_time = time.perf_counter()
        asyncio.run(main())
        print("代码运行时间为:", time.perf_counter() - start_time)
        # 代码运行时间为: 2.227831242
    

    aiohttp 中 TCPConnector 连接池

    既然上述代码已经用到了 aiohttp 模块,该模块下通过限制同时连接数,也可以控制线程并发数量,不过这个不是很好验证,所以从数据上进行验证,先设置控制并发数为 2,测试代码运行时间为 5.56 秒,然后修改并发数为 10,得到的时间为 1.4 秒,与协程信号量控制并发数得到的时间一致。所以使用 TCPConnector 连接池控制并发数也是有效的。

    import time
    
    import asyncio
    import aiohttp
    from bs4 import BeautifulSoup
    
    
    async def get_title(session, url):
        async with session.get(url) as res:
            print("正在采集:", url)
            html = await res.text()
            soup = BeautifulSoup(html, 'html.parser')
            title_tags = soup.find_all(attrs={'class': 'item-title'})
            event_names = [item.a.text for item in title_tags]
            print(event_names)
    
    
    
    async def main():
        connector = aiohttp.TCPConnector(limit=1)  # 限制同时连接数
        async with aiohttp.ClientSession(connector=connector) as session:
            tasks = [asyncio.ensure_future(get_title(session, "http://www.lishiju.net/hotevents/p{}".format(i))) for i in
                     range(111)]
            await asyncio.wait(tasks)
    
    
    
    if __name__ == '__main__':
        start_time = time.perf_counter()
        asyncio.run(main())
        print("代码运行时间为:", time.perf_counter() - start_time)
    

    写在后面

    今天是持续写作的第 <font color=red>245</font> / 365 天。
    期待 <font color=#04a9f4>关注</font>,<font color=#04a9f4>点赞</font>、<font color=#04a9f4>评论</font>、<font color=#04a9f4>收藏</font>。

    更多精彩

    《爬虫 100 例,专栏销售中,买完就能学会系列专栏》
    [图片上传失败...(image-37dccf-1643639594092)]

    相关文章

      网友评论

          本文标题:python入门难?十之八九是因为python 协程吧!

          本文链接:https://www.haomeiwen.com/subject/sfsckrtx.html