协程
协程是轻量级线程,拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此协程能保留上一次调用时的状态,即所有局部状态的一个特定组合,每次过程重入时,就相当于进入上一次调用的状态。
协程的应用场景:I/O 密集型任务。这一点与多线程有些类似,但协程调用是在一个线程内进行的,是单线程,切换的开销小,因此效率上略高于多线程。当程序在执行 I/O 时操作时,CPU 是空闲的,此时可以充分利用 CPU 的时间片来处理其他任务。在单线程中,一个函数调用,一般是从函数的第一行代码开始执行,结束于 return 语句、异常或者函数执行(也可以认为是隐式地返回了 None )。 有了协程,我们在函数的执行过程中,如果遇到了耗时的 I/O 操作,函数可以临时让出控制权,让 CPU 执行其他函数,等 I/O 操作执行完毕以后再收回控制权。
定义协程
Python3.4 加入了协程的概念,以生成器对象为基础,在 Python3.5 则增加了关键字 async/await,使得协程的实现更加方便。Python 中使用协程最常用的库莫过于 asyncio ,接下来我们以 asyncio 为基础来介绍协程的使用。
先看一个简单的例子来理解
1 import asyncio
2 import time
3
4
5 async def task():
6 print(f"{time.strftime('%H:%M:%S')} task 开始 ")
7 time.sleep(2)
8 print(f"{time.strftime('%H:%M:%S')} task 结束")
9
10
11 coroutine = task()
12 print(f"{time.strftime('%H:%M:%S')} 产生协程对象 {coroutine},函数并未被调用")
13 loop = asyncio.get_event_loop()
14 print(f"{time.strftime('%H:%M:%S')} 开始调用协程任务")
15 start = time.time()
16 loop.run_until_complete(coroutine)
17 end = time.time()
18 print(f"{time.strftime('%H:%M:%S')} 结束调用协程任务, 耗时{end - start} 秒")
19
运行结果如下所示
22:34:06 产生协程对象 <coroutine object task at 0x0000025B8CE62200>,函数并未被调用
22:34:06 开始调用协程任务
22:34:06 task 开始
22:34:08 task 结束
22:34:08 结束调用协程任务, 耗时2.015564203262329 秒
说明:首先引入 asyncio ,这样才可以使用 async 和 await 关键字( async 定义一个协程, await 用来临时挂起一个函数或方法的执行),接着我们使用 async 定义一协程方法,
随后我们直接调用了这个方法,然而这个方法并没有执行,而是返回了一个 coroutine 协程对象。随后我们使用 get_event_loop() 方法创建一个事件循环 loop ,并调用了 loop 对象的 run_until_complete() 方法将协程注册到事件循环 loop 中,然后启动。最后我们才看到了 task 方法打印了输出结果。
注意: async 定义的方法无法直接执行,必须将其注册到事件循环中才可以执行。
我们还可以为任务绑定回调函数:
1 import asyncio
2 import time
3
4
5 async def _task():
6 print(f"{time.strftime('%H:%M:%S')} task 开始 ")
7 time.sleep(2)
8 print(f"{time.strftime('%H:%M:%S')} task 结束")
9 return "运行结束"
10
11
12 def callback(task):
13 print(f"{time.strftime('%H:%M:%S')} 回调函数开始运行")
14 print(f"状态:{task.result()}")
15
16
17 coroutine = _task()
18 print(f"{time.strftime('%H:%M:%S')} 产生协程对象 {coroutine},函数并未被调用")
19 task = asyncio.ensure_future(coroutine)
20 task.add_done_callback(callback)
21 loop = asyncio.get_event_loop()
22 print(f"{time.strftime('%H:%M:%S')} 开始调用协程任务")
23 start = time.time()
24 loop.run_until_complete(task)
25 end = time.time()
26 print(f"{time.strftime('%H:%M:%S')} 结束调用协程任务, 耗时{end - start} 秒")
代码执行结果如下所示:
23:01:11 产生协程对象 <coroutine object _task at 0x000002B84B2A11A8>,函数并未被调用
23:01:11 开始调用协程任务
23:01:11 task 开始
23:01:13 task 结束
23:01:13 回调函数开始运行
状态:运行结束
23:01:13 结束调用协程任务, 耗时2.0018696784973145 秒
说明:在这里我们定义了一个协程方法和一个普通方法作为回调函数,协程方法执行后返回一个字符串’运行束’。其中回调函数接收一个参数,是 task 对象,然后调用 print() 方法打印了 task 对象的结果。 asyncio . ensure_future ( coroutine )可以返回 task 对象, add_done_callback() 为 task 对象增加一个回调任务。这样我们就定义好了一个 coroutine 对象和一个回调方法,执行的结果是当 coroutine 对象执行完毕之后,就去执行声明的 callback() 方法。
并发
前面的例子我们只执行了一个协程任务,如果我们需要执行多次并尽可能的提高效率该怎么办呢?我们可以定义一个 task 列表,然后使用 asyncio 的 wait() 方法即可执行,看下面的例子。
1 import asyncio
2 import time
3
4 async def task():
5 print(f"{time.strftime('%H:%M:%S')} task 开始 ")
6 # 异步调用asyncio.sleep(1):
7 await asyncio.sleep(2)
8 #time.sleep(2)
9 print(f"{time.strftime('%H:%M:%S')} task 结束" )
10
11 # 获取EventLoop:
12 loop = asyncio.get_event_loop()
13 # 执行coroutine
14 tasks = [task() for _ in range(5)]
15 start = time.time()
16 loop.run_until_complete(asyncio.wait(tasks))
17 loop.close()
18 end = time.time()
19 print(f"用时 {end-start} 秒")
运行结果如下:
23:25:25 task 开始
23:25:25 task 开始
23:25:25 task 开始
23:25:25 task 开始
23:25:25 task 开始
23:25:27 task 结束
23:25:27 task 结束
23:25:27 task 结束
23:25:27 task 结束
23:25:27 task 结束
用时 2.0225257873535156 秒
说明:
首先定义一个协程任务函数,模拟耗时 2 秒的任务,这里我们使用了 await 关键字,根据官方文档说明, await 后面的对象必须是如下类型之一:
- A native coroutine object returned from a native coroutine function,一个原生 coroutine 对象。
- A generator-based coroutine object returned from a function decorated with types.coroutine(),一个由 types.coroutine() 修饰的生成器,这个生成器可以返回 coroutine 对象。
- An object with an await__ method returning an iterator,一个包含 __await 方法的对象返回的一个迭代器。
代码中的 asyncio.sleep(2) 是一个由 coroutine 修饰的生成器函数,表示等待 2 秒。接下来我们定义了一个列表 tasks,由5个task() 组成,最后使用 loop.run_until_complete(asyncio.wait(tasks)) 提交执行,5 个任务并发执行,耗时接近于单个任务的耗时,这里并没有使用多进程或多线程,从而实现了并发操作。这里的 task 可以替换为任意耗时较高的I/O操作函数。
异步请求
前述的定义协程及并发编程似乎比多线程编程相比更加复杂:需要定义协程函数,使用关键字 async , await 等关键字,还要掌握 await 后面必须是哪些对象等等。这些复杂的操作都是为具体的高效应用做铺垫,接下来我们看下协程在 I/O 密集型任务中具有怎样的优势。
我们以常用的网络请求场景为例,网络请求较多的应用就是 I/O 密集型任务。首先我们需要建立一个服务器来响应 web 请求,为方便演示,我们使用轻量级的 web 框架 Flask ,来建立一个服务器。
1 from flask import Flask
2 import time
3
4 app = Flask(__name__)
5
6 @app.route('/')
7 def index():
8 time.sleep(3)
9 return ' Hello World!''
10
11 if __name__ == '__main__':
12 app.run(threaded=True)
在上述代码中我们定义了一个 Flask 服务,主入口是 index() 方法,方法里面先调用了 sleep() 方法休眠 3 秒,然后接着再返回结果,也就是说,每次请求这个接口至少要耗时 3 秒,这样我们就模拟了一个慢速的服务接口。注意这里服务启动的时候, run() 方法加了一个参数 threaded ,这表明 Flask 启动了多线程模式,不然默认是只有一个线程的。如果不开启多线程模式,同一时刻遇到多个请求的时候,只能顺次处理,这样即使我们使用协程异步请求了这个服务,也只能一个一个排队等待,瓶颈就会出现在服务端。所以,多线程模式是有必要打开的。
运行结果如下:
* Serving Flask app "coroutine_flask_demo" (lazy loading)
* Environment: production
WARNING: Do not use the development server in a production environment.
Use a production WSGI server instead.
* Debug mode: off
* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
我们打开浏览器,在地址栏中输入http://127.0.0.1:5000/ ,回车后,3秒后会看到如下图所示的页面:
a.png
接下来我们编写请求程序:
1 import asyncio
2 import requests
3 import time
4
5 start = time.time()
6
7 async def request():
8 url = 'http://127.0.0.1:5000'
9 print(f'{time.strftime("%H:%M:%S")} 请求 {url}')
10 response = requests.get(url)
11 print(f'{time.strftime("%H:%M:%S")} 得到响应 {response.text}')
12
13 tasks = [asyncio.ensure_future(request()) for _ in range(5)]
14 loop = asyncio.get_event_loop()
15 loop.run_until_complete(asyncio.wait(tasks))
16
17 end = time.time()
18 print(f'耗时 {end - start} 秒')
在这里我们创建了五个 task ,然后将 task 列表传给 wait() 方法并注册到时间循环中执行。
运行结果如下:
21:32:32 请求 http://127.0.0.1:5000
21:32:35 得到响应 Hello World!
21:32:35 请求 http://127.0.0.1:5000
21:32:38 得到响应 Hello World!
21:32:38 请求 http://127.0.0.1:5000
21:32:41 得到响应 Hello World!
21:32:41 请求 http://127.0.0.1:5000
21:32:44 得到响应 Hello World!
21:32:44 请求 http://127.0.0.1:5000
21:32:47 得到响应 Hello World!
耗时 15.100058555603027 秒
通过运行结果我们发现和正常的顺次执行没有区别,耗时 15 秒,平均一个请求耗时 3 秒,并未达到我们预期的要求。其实,要实现异步处理,我们得先要有挂起的操作,当一个任务需要等待 I/O 结果的时候,可以挂起当前任务,让出 CPU 的控制权,转而去执行其他任务,这样我们才能充分利用好资源,上面方法都是串行走下来,没有实现挂起,因此无法满足异步并发请求。
要实现异步,我们可以使用 await 可以将耗时等待的操作挂起,让出控制权。当协程执行的时候遇到 await ,时间循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕。参照 3 . 3 . 2 节中 await 后必须跟的对象类型,我们改写代码如下:
1 import asyncio
2 import requests
3 import time
4
5
6 async def get(url):
7 return requests.get(url)
8
9
10 async def request():
11 url = "http://127.0.0.1:5000"
12 print(f'{time.strftime("%H:%M:%S")} 请求 {url}')
13 response = await get(url)
14 print(f'{time.strftime("%H:%M:%S")} 得到响应 {response.text}')
15
16
17 start = time.time()
18 tasks = [asyncio.ensure_future(request()) for _ in range(5)]
19 loop = asyncio.get_event_loop()
20 loop.run_until_complete(asyncio.wait(tasks))
21 end = time.time()
22 print(f"耗时 {end - start} 秒")
上述代码将请求页面的方法封装成一个 coroutine 对象,在 request 方法中使用 await 来尝试挂起当前执行的 I / 0 ,运行结果如下:
22:33:35 请求 http://127.0.0.1:5000
22:33:38 得到响应 Hello World!
22:33:38 请求 http://127.0.0.1:5000
22:33:41 得到响应 Hello World!
22:33:41 请求 http://127.0.0.1:5000
22:33:44 得到响应 Hello World!
22:33:44 请求 http://127.0.0.1:5000
22:33:47 得到响应 Hello World!
22:33:47 请求 http://127.0.0.1:5000
22:33:50 得到响应 Hello World!
耗时 15.123773097991943 秒
可见上述的改动并未达到预期的并发效果,究其原因, request 不是异步请求,无论如何改封装都无济于事,因此我们需要寻找真正的异步 I/O 请求。 aiohttp 是一个支持异步请求的库,利用它和 anyncio 配合,即可以实现我们的异步请求操作。修改以上代码,使用 aiohttp 库来实现异步请求。
1 import asyncio
2 import aiohttp
3 import time
4
5 now = lambda: time.strftime("%H:%M:%S")
6
7
8 async def get(url):
9 session = aiohttp.ClientSession()
10 response = await session.get(url)
11 result = await response.text()
12 session.close()
13 return result
14
15
16 async def request():
17 url = "http://127.0.0.1:5000"
18 print(f"{now()} 请求 {url}")
19 result = await get(url)
20 print(f"{now()} 得到响应 {result}")
21
22
23 start = time.time()
24 tasks = [asyncio.ensure_future(request()) for _ in range(5)]
25 loop = asyncio.get_event_loop()
26 loop.run_until_complete(asyncio.wait(tasks))
27
28 end = time.time()
29 print(f"耗时 { end - start } 秒")
通过aiohttp的ClientSession类的get()方法进行请求,运行结果如下:
22:49:36 请求 http://127.0.0.1:5000
22:49:36 请求 http://127.0.0.1:5000
22:49:36 请求 http://127.0.0.1:5000
22:49:36 请求 http://127.0.0.1:5000
22:49:36 请求 http://127.0.0.1:5000
22:49:39 得到响应 Hello World!
22:49:39 得到响应 Hello World!
22:49:39 得到响应 Hello World!
22:49:39 得到响应 Hello World!
22:49:39 得到响应 Hello World!
耗时 3.0485894680023193 秒
运行结果符合异常请求,耗时由 15 秒变成了 3 秒,耗时直接变成了原来的 1/5 ,实现了并发访问。代码里面我们使用了 await ,后面跟了 get() 方法,在执行这五个协程的时候,如果遇到了 await ,那么就会将当前协程挂起,转而去执行其他的协程,直到其他的协程也挂起或执行完毕,再进行下一个协程的执行,异步操作的便捷之处,当遇到阻塞式操作时,任务被挂起,程序接着去执行其他的任务,而不是傻傻地等着,这样可以充分利用 CPU 时间,而不必把时间浪费在等待 I/O 上。
在发出网络请求后的 3 秒内, CPU 都是空闲的,那么增加协程任务的数量,最终的耗时还会是 3 秒吗?理论来说确实是这样的,不过有个前提,那就是服务器在同一时刻接受无限次请求都能保证正常返回结果,也就是服务器无限抗压,另外还要忽略 I/O 传输时延。我们可以将上述的任务数扩大 20 倍,如下所示
tasks = [asyncio.ensure_future(request()) for _ in range(100)]
最终的耗时如下:
耗时 3.7431812286376953 秒
运行时间也是在 3 秒左右,当然多出来的时间就是 I/O 的延迟。 使用异步协程我们几乎可以在相同的时间内实现成百上千倍次的网络请求。
综上:在 I/O 密集型任务中,使用协程 可以大大加快运行速度。
欢迎关注我的公众号somenzz.
扫码关注
qrcode_for_gh_4a29fa8ff18a_344.jpg
网友评论