美文网首页
Python使用异步IO请求来提升效率

Python使用异步IO请求来提升效率

作者: 东方胖 | 来源:发表于2022-04-26 17:32 被阅读0次

    常见的批量处理任务

    • 有一批数据,可能有50w以上的条目,现在需要在其中遴选出有问题的数据,输出到一个文件。遴选的逻辑比较复杂,不过有一个server能做到。你要做的就是把数据交给该服务处理,用它返回的结果就可以。
    • 从某服务器拉一批数据下来,它只有一个 get RestAPI 供你使用
    • 给一个服务接口制造一点压力。我们要模拟一堆请求批量发出去

    解决方案

    对于第一个任务,我们如果用一个loop来处理,假设每次检查调用一次 POST的接口,往返一次 200ms,1s钟我们能处理 5 条数据,写一个单进程并使用 request 库处理,算下来我们需要
    500000 \div 5 \div 3600 \approx 27 小时

    熟悉操作系统知识的人知道,这里面其实消耗了大量的时间在 IO等待上,所以,如果一开始就想到用多进程的方式去“提速”,效果未必理想,多起几个进程确实会提升很大的效率。但对 IO 密集的问题,我们肯定是先用线程或者其它手段先节约一些 IO 等待的时间,然后再考虑用多进程模型进一步缩小消耗的时间。


    本文分享一个用异步 io 而非线程的模型来处理此类问题,然后比较一下多线程和 异步io 的方案和原始最低效的办法的差异

    原始的方法

    写一个任务,去请求 'https://www.qq.com' 200 次

    import requests
    import time
    
    
    def elapse(func):
        def wrapper(*args, **kwargs):
            t1 = time.time()
            ret = func(*args, **kwargs)
            t2 = time.time()
            print("func : {} cost {}ms".format(func.__name__,  round(t2 - t1, 3) * 1000))
            return ret
        return wrapper
    
    @elapse
    def get_page():
        url = "https://www.qq.com"
        for i in range(200):
            resp = requests.request("GET", url)
            if resp.status_code != 200:
                print("error, return code is ", resp.status_code)
    
    
    if __name__ == '__main__':
        g = get_page()
    

    代码中先写了一个 eplase 装饰器用来统计函数的运行时常,然后简单地在 get_page 中暴力地请求了200次企鹅的官网,想象一下,一次请求200ms的话,我们一共要花 40000ms左右才能完成这个任务,实际的
    输出是

    get_page cost 21223.0ms

    然后再使用一个多线程的模型来处理,对比一下

    多线程方案

    我们使用 2 个线程

    import time
    import requests
    from threading import Lock, Thread
    
    
    
    def elapse(func):
        def wrapper(*args, **kwargs):
            t1 = time.time()
            ret = func(*args, **kwargs)
            t2 = time.time()
            print("func : {} cost {}ms".format(func.__name__,  round(t2 - t1, 3) * 1000))
            return ret
        return wrapper
    
    count = 0
    def task():
        url = "https://www.qq.com"
        global count
        with Lock():
            while count < 200:
                requests.request("GET", url)
                count += 1
    
    
    @elapse
    def get_page(num=4):
        threads = []
        for i in range(num):
            local_thread = Thread(target=task)
            threads.append(local_thread)
    
        for th in threads:
            th.start()
    
        for th in threads:
            th.join()
    
    
    if __name__ == '__main__':
        get_page(2)
    

    task 函数是一个 200圈的循环,用一个公共的变量 count 来统计请求数量,为了让这个数字是原子增加,必须为 count += 1 外层的代码加上锁,
    测试 2 个线程的结果 大约是 12s左右
    输出

    get_page cost 12723.0ms

    我们增加线程数量,分别得到运行时间如下

    线程数 耗时
    2 12723 ms
    4 8151ms
    8 5556ms
    12 3809ms
    16 3337 ms
    20 2089ms
    24 2178ms
    28 2191ms
    32 2284ms

    可以看到随着线程数不断增加,得到的收益逐渐递减,20个线程是比较合适的,200个请求最终下降到 2000ms 左右,比单线程 22wms 左右的耗时提升了数百倍之多 。
    因为开启线程有一定的时间损耗,如果任务量更大,线程启动的时间摊销到总耗时里的比例就越小,如果是请求 1000个任务,我们可以开启 50个线程,最后我们的总用时是 10s左右。

    估算一下 20个线程处理任务1 的时间 ,忽略线程启动和关闭的代价,那么 总消耗时间大约可以缩小到 1到1.5小时


    再考虑异步请求的方案

    aiohttp + asynio 库

    写一个 asyn+await 的协程

    async def task(num):
        url = "https://www.qq.com"
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                status_code = response.status
                await response.text("gbk")
                print(f'task: {threading.current_thread()} :{num}:{status_code}')
    
    @elapse
    def get_page():
        loop = asyncio.get_event_loop()
        tasks = [task(i) for i in range(200)]
        loop.run_until_complete(asyncio.wait(tasks))
    
    
    if __name__ == '__main__':
        get_page()
    

    运行结果 , 约 3500ms

    ...
    task: <_MainThread(MainThread, started 8672437760)> :183:200
    task: <_MainThread(MainThread, started 8672437760)> :113:200
    task: <_MainThread(MainThread, started 8672437760)> :97:200
    task: <_MainThread(MainThread, started 8672437760)> :184:200
    task: <_MainThread(MainThread, started 8672437760)> :157:200
    task: <_MainThread(MainThread, started 8672437760)> :199:200
    func : get_page cost 3588.0ms

    上面是一部分输出,其中只有一个主线程,没有切换线程,是不是很妙。我们也看到这种异步io的方式比用线程的方案,耗时差不多,但是不需要上锁,管理线程这些繁琐的步骤,也无需掂量开几个线程合适这种问题

    其它支持 异步io的库: httpx

    httpx

    httpx 用法与aiohttp 相同

    # # -*- coding: gb2312-*
    # Author: donghua.chen@shopee.com
    # CrateTime: 2022/4/26 15:13
    # Filename: async.py
    
    import time
    import asyncio
    import aiohttp
    import threading
    import httpx
    
    client = httpx.AsyncClient()
    async def httpx_task(num):
        url = "https://www.qq.com"
        response = await client.get(url)
        status_code = response.status_code
        print(f'httpx_task: {threading.current_thread()}: {num}:{status_code}')
    
    @elapse
    def get_page():
        loop = asyncio.get_event_loop()
        tasks = [httpx_task(i) for i in range(200)]
        loop.run_until_complete(asyncio.wait(tasks))
    
    
    if __name__ == '__main__':
        get_page()
    

    client = httpx.AsyncClient() 这行代码放在 httpx_task 的外面可以节约连接,以及对象创建的开销,这段代码的最终结果和多线程最快的结果很接近

    拥抱异步IO吧,太香

    相关文章

      网友评论

          本文标题:Python使用异步IO请求来提升效率

          本文链接:https://www.haomeiwen.com/subject/zxxhyrtx.html