常见的批量处理任务
- 有一批数据,可能有50w以上的条目,现在需要在其中遴选出有问题的数据,输出到一个文件。遴选的逻辑比较复杂,不过有一个server能做到。你要做的就是把数据交给该服务处理,用它返回的结果就可以。
- 从某服务器拉一批数据下来,它只有一个 get RestAPI 供你使用
- 给一个服务接口制造一点压力。我们要模拟一堆请求批量发出去
解决方案
对于第一个任务,我们如果用一个loop来处理,假设每次检查调用一次 POST的接口,往返一次 200ms,1s钟我们能处理 5 条数据,写一个单进程并使用 request 库处理,算下来我们需要
熟悉操作系统知识的人知道,这里面其实消耗了大量的时间在 IO等待上,所以,如果一开始就想到用多进程的方式去“提速”,效果未必理想,多起几个进程确实会提升很大的效率。但对 IO 密集的问题,我们肯定是先用线程或者其它手段先节约一些 IO 等待的时间,然后再考虑用多进程模型进一步缩小消耗的时间。
本文分享一个用异步 io 而非线程的模型来处理此类问题,然后比较一下多线程和 异步io 的方案和原始最低效的办法的差异
原始的方法
写一个任务,去请求 'https://www.qq.com' 200 次
import requests
import time
def elapse(func):
def wrapper(*args, **kwargs):
t1 = time.time()
ret = func(*args, **kwargs)
t2 = time.time()
print("func : {} cost {}ms".format(func.__name__, round(t2 - t1, 3) * 1000))
return ret
return wrapper
@elapse
def get_page():
url = "https://www.qq.com"
for i in range(200):
resp = requests.request("GET", url)
if resp.status_code != 200:
print("error, return code is ", resp.status_code)
if __name__ == '__main__':
g = get_page()
代码中先写了一个 eplase 装饰器用来统计函数的运行时常,然后简单地在 get_page 中暴力地请求了200次企鹅的官网,想象一下,一次请求200ms的话,我们一共要花 40000ms左右才能完成这个任务,实际的
输出是
get_page cost 21223.0ms
然后再使用一个多线程的模型来处理,对比一下
多线程方案
我们使用 2 个线程
import time
import requests
from threading import Lock, Thread
def elapse(func):
def wrapper(*args, **kwargs):
t1 = time.time()
ret = func(*args, **kwargs)
t2 = time.time()
print("func : {} cost {}ms".format(func.__name__, round(t2 - t1, 3) * 1000))
return ret
return wrapper
count = 0
def task():
url = "https://www.qq.com"
global count
with Lock():
while count < 200:
requests.request("GET", url)
count += 1
@elapse
def get_page(num=4):
threads = []
for i in range(num):
local_thread = Thread(target=task)
threads.append(local_thread)
for th in threads:
th.start()
for th in threads:
th.join()
if __name__ == '__main__':
get_page(2)
task 函数是一个 200圈的循环,用一个公共的变量 count 来统计请求数量,为了让这个数字是原子增加,必须为 count += 1 外层的代码加上锁,
测试 2 个线程的结果 大约是 12s左右
输出
get_page cost 12723.0ms
我们增加线程数量,分别得到运行时间如下
线程数 | 耗时 |
---|---|
2 | 12723 ms |
4 | 8151ms |
8 | 5556ms |
12 | 3809ms |
16 | 3337 ms |
20 | 2089ms |
24 | 2178ms |
28 | 2191ms |
32 | 2284ms |
可以看到随着线程数不断增加,得到的收益逐渐递减,20个线程是比较合适的,200个请求最终下降到 2000ms 左右,比单线程 22wms 左右的耗时提升了数百倍之多 。
因为开启线程有一定的时间损耗,如果任务量更大,线程启动的时间摊销到总耗时里的比例就越小,如果是请求 1000个任务,我们可以开启 50个线程,最后我们的总用时是 10s左右。
估算一下 20个线程处理任务1 的时间 ,忽略线程启动和关闭的代价,那么 总消耗时间大约可以缩小到 1到1.5小时
再考虑异步请求的方案
aiohttp + asynio 库
写一个 asyn+await 的协程
async def task(num):
url = "https://www.qq.com"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
status_code = response.status
await response.text("gbk")
print(f'task: {threading.current_thread()} :{num}:{status_code}')
@elapse
def get_page():
loop = asyncio.get_event_loop()
tasks = [task(i) for i in range(200)]
loop.run_until_complete(asyncio.wait(tasks))
if __name__ == '__main__':
get_page()
运行结果 , 约 3500ms
...
task: <_MainThread(MainThread, started 8672437760)> :183:200
task: <_MainThread(MainThread, started 8672437760)> :113:200
task: <_MainThread(MainThread, started 8672437760)> :97:200
task: <_MainThread(MainThread, started 8672437760)> :184:200
task: <_MainThread(MainThread, started 8672437760)> :157:200
task: <_MainThread(MainThread, started 8672437760)> :199:200
func : get_page cost 3588.0ms
上面是一部分输出,其中只有一个主线程,没有切换线程,是不是很妙。我们也看到这种异步io的方式比用线程的方案,耗时差不多,但是不需要上锁,管理线程这些繁琐的步骤,也无需掂量开几个线程合适这种问题
其它支持 异步io的库: httpx
httpx
httpx 用法与aiohttp 相同
# # -*- coding: gb2312-*
# Author: donghua.chen@shopee.com
# CrateTime: 2022/4/26 15:13
# Filename: async.py
import time
import asyncio
import aiohttp
import threading
import httpx
client = httpx.AsyncClient()
async def httpx_task(num):
url = "https://www.qq.com"
response = await client.get(url)
status_code = response.status_code
print(f'httpx_task: {threading.current_thread()}: {num}:{status_code}')
@elapse
def get_page():
loop = asyncio.get_event_loop()
tasks = [httpx_task(i) for i in range(200)]
loop.run_until_complete(asyncio.wait(tasks))
if __name__ == '__main__':
get_page()
client = httpx.AsyncClient() 这行代码放在 httpx_task 的外面可以节约连接,以及对象创建的开销,这段代码的最终结果和多线程最快的结果很接近
拥抱异步IO吧,太香
网友评论