why 协程
一旦谈到网络相关的编程,总是绕不开同步、异步、阻塞以及非阻塞以及多线程等概念。近几年,有关协程的话题也越来越多。学习一个概念的第一步总是学习它的作用以及它所解决的问题。要研究协程解决的问题则先要弄清楚如果不用协程,存在哪些问题。这也是本文讨论的一个重点。
阻塞、非阻塞
要搞清楚为什么需要协程,首先需要弄清楚网络编程中的一些概念。用爬虫来举例子: 当我们爬一个网页时可以简单分为两个步骤:
- 向服务器发送请求
- 等待服务器返回数据
我们的电脑所做的事情大概时一层一层向下封装数据包。然后向网卡发送数据,继而等待网卡返回数据。这时便有了一个问题,等待网卡返回数据不需要CPU参与。也就是说,如果一直等待网卡返回数据而不做其他的事,就浪费了这一段等待的时间。例如爬虫函数如下:
def crawl(url):
fd = send(url) # 发送请求
wait(fd) # 等待数据返回
如果 需要wait
获得服务器数据后才能继续执行后续的方法。则将wait
法称为阻塞的。采用阻塞的方式,如果爬取1个网页需要两秒,那么爬取10个网页就需要20秒。假如我并不关心 wait 是否获取到了服务器的内容,而希望它一执行就马上返回,继续执行接下来的操作。那么这种方式的方法称之为非阻塞的。使用非阻塞方法时,程序虽然不会阻塞在方法处。因此发送上一个请求后马上可以发送下一个请求,能大量的节省等待时间。一个阻塞与非阻塞时序图如下:
这里的一个重点是:阻塞的等待是串行的,即上一个请求的等待完成后,才开始下一个请求的等待。而非阻塞的请求等待是并行的,所有的等待几乎同时进行。
使用多线程(本文所提多线程,都是指单核多线程)可以达到类似非阻塞的效果。由于多线程是在不同线程之间切换。当某个阻塞处于等待阶段时,还可以切换到其他线程去发送请求。因此可以达到非阻塞的效果。但是使用多线程又引入了线程的切换的开销,竞态条件等问题。为了避免竞态条件,又需要引入同步机制等一系列新的开销。当然,多线程的主要问题还是竞态条件的产生。就算有大量的工具和同步机制,在编写具有多线程的程序时依旧十分困难。因此,多线程仍然不是理想的解决方案。
同步与异步
上节提到阻塞与非阻塞的区别。才哟个非阻塞的方式编写代码当然更高效。但是还存在另一个问题: 如果函数不会阻塞直到获得服务器结果,那么如何知道服务器何时返回数据呢?一般来说有两种方式,一种方式是不断的查看代码的执行状况,如果完成才继续运行。例如:
def crawl(url):
fd = send(url) # 发送请求
r = wait(fd) # 假设 服务器返回数据 wait 会返回一个非空值
while not r: # 等待数据返回
r = wait(fd)
这种方式称之为同步,即不断的主动询问是否完成。当然这种方式与阻塞没什么两样,等待的时间也没办法做其他的事情,因此这种方式显然是行不通的。还有另一种方式,称之为异步。即函数完成之后会进行通知,一般形式为一个回调函数:
def on_success(value):
pass
def crawl(url):
fd = send(url) # 发送请求
r = wait(fd, on_success) # wait 等待完成会调用 on_success
这样看来,采用异步非阻塞编写代码的方式似乎解决了所有的问题。即不存在串行的等待,并且也无需多线程之间的开销以及竞态条件。但采用异步的方式编写程序实际也有许多的缺点。例如,如果多个步骤之间存在依赖关系,采用异步编写则表现为多个回调函数的嵌套,非常的不直观。如:
def action1(callback):
callback()
def action2(callback):
callback()
def action3(callback):
callback
def done():
pass
action1(lambda : action2(lambda: action3(lambda: done)))
协程
这时候就轮到协程出场了。我们先回想一下多线程的解决方案。多线程依赖于多个线程轮番执行,来达到避免串行等待的目的。但是由于多线程切换的机制,导致程序切换的位置并不确定。所以我们才需要各种机制来保证多线程中的线程安全。但是如果能有一种方法,使得不同线程之间切换时机是固定的。即当代码阻塞时切换到其他线程进行执行,阻塞完成后又自动切换回到当前线程,其他时间不进行切换。岂不是就完美的解决了多线程和异步的缺点。没错,这样的方式就是协程。利用协程编写的代码既是同步的,也可以避免串性等待的问题。协程和多线程的区别是: 多线程的切换时机是不固定的,每一行代码都可能切换。而协程的切换则是开发者自己决定的。这样开发者不需要考虑竞态条件的问题。
how协程
不同的语言对于协程的实现是不同的。在 python 里主要依赖于生成器
,即包含yield
关键字的函数。有关yield
关键字的的基本概念可以参考生成器。先编写一个模拟异步非阻塞的类,如下:
op = DelayOP()
op.start_op("spider", 2000, callback=callback) # 定义名为2000ms的
# 延时操作,即2000ms后会调用 callback 回调
op.loop()
然后编写一个 Future 类保存方法运行的结果。
class Future:
def __init__(self):
self.result = None
self.callback = None
def set_result(self, result):
self.result = result
self.callback and self.callback(self)
def set_callback(self, callback):
self.callback = callback
因此,此时可以利用 yield 可以这样改造:
def crawl(op):
f = Future()
op.start_op("download", 2000, lambda result : f.set_result(result))
yield f
可以看出,方法运行到 yield f 时会主动退出。而由于回调函数为 lambda result : f.set_result(result)。因此在延时完成后,会执行回调,将执行结果设置到 Feture 的 result 中。但是如何驱动方法继续运行呢,此时我们还需要另一个类:
class Task:
def __init__(self, co):
self.co = co
self.step()
def step(self, future=None):
try:
next_future = self.co.send(future.result if future else None)
except StopIteration:
return
next_future.set_callback(self.step)
使用时:
def show_res():
res = yield from crawl(op)
print(res)
Task(get_res())
这个类为什么可以驱动方法继续运行呢。首先看 Task 中的 co 代表 Task 管理的生成器,也就是 show_res() 方法。当一个 Task 实例生成时就会执行 step 方法。第一次执行 step 方法相当于 co.send(None)
。而这里的 show_res 由于其中存在 yield from 语句,相当于这个 co.send(None)
实际上是:
c = crawl(op)
c.send(None)
c.send(None) 会返回 crawl 中的 f 。因此 co.send(None) 返回值即为 f。获得 f 后,f 将 step 函数添加为其完成时的回调。将在 set_result 时调用。而当延时完成后,首先 set_result 将会被调用。然后 step 作为 f 完成时回调被调用,并且参数为自身。此时将执行 co.send(f.result)
,驱动 crawl 继续执行。由于 crawl 已没有其他代码,因此继续执行将退出方法。会导致 crawl 抛出 StopIteration 异常。该值将会被 yield from 捕获并传递给 res。当然这里只存在一个延时操作,如果有多个延时操作则可以这样写:
def crawl(op):
f = Future()
op.start_op("download", 2000, lambda result: f.set_result(result))
yield f
def save(text, op):
f = Future()
op.start_op(f"save {text}", 2000, lambda result: f.set_result(result))
yield f
def show_res(op):
text = yield from crawl(op) # 延时操作1
text = yield from save(op, text) # 延时操作2
op = DelayOP()
Task(show_res(op))
Task(show_res(op)) # 两个延时为 4000 ms 的任务
start_time = op.ms_timestap()
op.loop()
print(f"cost {op.ms_timestap() - start_time} ms")
# cost 4002 ms
这里进行了两个延时为 4000 ms 的任务。最终的耗时总共为 4002 ms。可以看出这两个任务的等待时并行的。也就是这里的代码用同步的代码,写出了异步非阻塞的效果。没有使用多线程,因此无需考虑竞态条件。并且编写代码的逻辑的是同步的,并不是异步回调的方式。多么的优雅呀,这就是协程的威力。
这里还有可以改进的地方,代码中的 yield 与 yield from 是混用的。这样不太好,因此可以改一下 Future 的实现:
class Future:
# 新增
def __iter__(self):
if self.done:
return self.result
else:
yield self
def __await__(self):
if self.done:
return self.result
else:
yield self
然后可将 crawl 以及 down 中的 yield 改为 yield from,变得更统一。
python 还在 3.5 之后推出了 async & await 关键字。 async 表名一个方法为协程,而 await 表示则等待一个协程返回。因此只要 Future 中含有 __await__
方法。代码还可以继续修改为:
async def crawl(op):
f = Future()
op.start_op("download", 2000, lambda result: f.set_result(result))
await f
async def save(op, text):
f = Future()
op.start_op(f"save {text}", 2000, lambda result: f.set_result(result))
await f
async def show_res(op):
text = await crawl(op)
text = await save(op, text)
这就是 python 中使用协程的最新方式。
网友评论