美文网首页
Python使用异步IO请求来提升效率

Python使用异步IO请求来提升效率

作者: 东方胖 | 来源:发表于2022-04-26 17:32 被阅读0次

常见的批量处理任务

  • 有一批数据,可能有50w以上的条目,现在需要在其中遴选出有问题的数据,输出到一个文件。遴选的逻辑比较复杂,不过有一个server能做到。你要做的就是把数据交给该服务处理,用它返回的结果就可以。
  • 从某服务器拉一批数据下来,它只有一个 get RestAPI 供你使用
  • 给一个服务接口制造一点压力。我们要模拟一堆请求批量发出去

解决方案

对于第一个任务,我们如果用一个loop来处理,假设每次检查调用一次 POST的接口,往返一次 200ms,1s钟我们能处理 5 条数据,写一个单进程并使用 request 库处理,算下来我们需要
500000 \div 5 \div 3600 \approx 27 小时

熟悉操作系统知识的人知道,这里面其实消耗了大量的时间在 IO等待上,所以,如果一开始就想到用多进程的方式去“提速”,效果未必理想,多起几个进程确实会提升很大的效率。但对 IO 密集的问题,我们肯定是先用线程或者其它手段先节约一些 IO 等待的时间,然后再考虑用多进程模型进一步缩小消耗的时间。


本文分享一个用异步 io 而非线程的模型来处理此类问题,然后比较一下多线程和 异步io 的方案和原始最低效的办法的差异

原始的方法

写一个任务,去请求 'https://www.qq.com' 200 次

import requests
import time


def elapse(func):
    def wrapper(*args, **kwargs):
        t1 = time.time()
        ret = func(*args, **kwargs)
        t2 = time.time()
        print("func : {} cost {}ms".format(func.__name__,  round(t2 - t1, 3) * 1000))
        return ret
    return wrapper

@elapse
def get_page():
    url = "https://www.qq.com"
    for i in range(200):
        resp = requests.request("GET", url)
        if resp.status_code != 200:
            print("error, return code is ", resp.status_code)


if __name__ == '__main__':
    g = get_page()

代码中先写了一个 eplase 装饰器用来统计函数的运行时常,然后简单地在 get_page 中暴力地请求了200次企鹅的官网,想象一下,一次请求200ms的话,我们一共要花 40000ms左右才能完成这个任务,实际的
输出是

get_page cost 21223.0ms

然后再使用一个多线程的模型来处理,对比一下

多线程方案

我们使用 2 个线程

import time
import requests
from threading import Lock, Thread



def elapse(func):
    def wrapper(*args, **kwargs):
        t1 = time.time()
        ret = func(*args, **kwargs)
        t2 = time.time()
        print("func : {} cost {}ms".format(func.__name__,  round(t2 - t1, 3) * 1000))
        return ret
    return wrapper

count = 0
def task():
    url = "https://www.qq.com"
    global count
    with Lock():
        while count < 200:
            requests.request("GET", url)
            count += 1


@elapse
def get_page(num=4):
    threads = []
    for i in range(num):
        local_thread = Thread(target=task)
        threads.append(local_thread)

    for th in threads:
        th.start()

    for th in threads:
        th.join()


if __name__ == '__main__':
    get_page(2)

task 函数是一个 200圈的循环,用一个公共的变量 count 来统计请求数量,为了让这个数字是原子增加,必须为 count += 1 外层的代码加上锁,
测试 2 个线程的结果 大约是 12s左右
输出

get_page cost 12723.0ms

我们增加线程数量,分别得到运行时间如下

线程数 耗时
2 12723 ms
4 8151ms
8 5556ms
12 3809ms
16 3337 ms
20 2089ms
24 2178ms
28 2191ms
32 2284ms

可以看到随着线程数不断增加,得到的收益逐渐递减,20个线程是比较合适的,200个请求最终下降到 2000ms 左右,比单线程 22wms 左右的耗时提升了数百倍之多 。
因为开启线程有一定的时间损耗,如果任务量更大,线程启动的时间摊销到总耗时里的比例就越小,如果是请求 1000个任务,我们可以开启 50个线程,最后我们的总用时是 10s左右。

估算一下 20个线程处理任务1 的时间 ,忽略线程启动和关闭的代价,那么 总消耗时间大约可以缩小到 1到1.5小时


再考虑异步请求的方案

aiohttp + asynio 库

写一个 asyn+await 的协程

async def task(num):
    url = "https://www.qq.com"
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            status_code = response.status
            await response.text("gbk")
            print(f'task: {threading.current_thread()} :{num}:{status_code}')

@elapse
def get_page():
    loop = asyncio.get_event_loop()
    tasks = [task(i) for i in range(200)]
    loop.run_until_complete(asyncio.wait(tasks))


if __name__ == '__main__':
    get_page()

运行结果 , 约 3500ms

...
task: <_MainThread(MainThread, started 8672437760)> :183:200
task: <_MainThread(MainThread, started 8672437760)> :113:200
task: <_MainThread(MainThread, started 8672437760)> :97:200
task: <_MainThread(MainThread, started 8672437760)> :184:200
task: <_MainThread(MainThread, started 8672437760)> :157:200
task: <_MainThread(MainThread, started 8672437760)> :199:200
func : get_page cost 3588.0ms

上面是一部分输出,其中只有一个主线程,没有切换线程,是不是很妙。我们也看到这种异步io的方式比用线程的方案,耗时差不多,但是不需要上锁,管理线程这些繁琐的步骤,也无需掂量开几个线程合适这种问题

其它支持 异步io的库: httpx

httpx

httpx 用法与aiohttp 相同

# # -*- coding: gb2312-*
# Author: donghua.chen@shopee.com
# CrateTime: 2022/4/26 15:13
# Filename: async.py

import time
import asyncio
import aiohttp
import threading
import httpx

client = httpx.AsyncClient()
async def httpx_task(num):
    url = "https://www.qq.com"
    response = await client.get(url)
    status_code = response.status_code
    print(f'httpx_task: {threading.current_thread()}: {num}:{status_code}')

@elapse
def get_page():
    loop = asyncio.get_event_loop()
    tasks = [httpx_task(i) for i in range(200)]
    loop.run_until_complete(asyncio.wait(tasks))


if __name__ == '__main__':
    get_page()

client = httpx.AsyncClient() 这行代码放在 httpx_task 的外面可以节约连接,以及对象创建的开销,这段代码的最终结果和多线程最快的结果很接近

拥抱异步IO吧,太香

相关文章

网友评论

      本文标题:Python使用异步IO请求来提升效率

      本文链接:https://www.haomeiwen.com/subject/zxxhyrtx.html