什么是协程
协程是轻量级的线程,轻量的意思就是占用内存少,一个线程可以轻松的跑出数万以上的协程。协程的执行是并发的,也就意味着使用协程可以轻松实现大量并发,所以协程最大的用处在于便于编写高性能异步网络程序。
从代码执行的角度来看,协程在函数执行中时,允许跳出当前函数,转而执行其它代码,在执行权切换回来时,恢复到原位继续执行。
其实进程、线程的执行在cpu看来也是这样的,只是这个切换的过程是由操作系统来完成了。而协程是用户级别的轻量级线程,这个切换只能是用户自行完成。
简易实现
接下来,我们将基于python2.7版本的生成器特性来实现一个简单的协程,并以经典的“生产者-消费者”模型作为示例。
在python中,包含yield
关键字代码的函数就是生成器,调用这样的函数会返回一个生成器,类型是GeneratorType
。生成器最重要的函数是send()
,这个函数接收一个参数,并推动生成器执行部分代码,准确说是到下一处yield
,接着跳出函数,反复调用send()
可以完成生成器的迭代。也就是说,生成器能够满足前述的从函数中跳出再回来的需求。
后续,协程指的就是一个生成器。
协程的实现不需要考虑,我们需要考虑的是协程执行的实现,也就是一个协程调度器,一个简易的版本实现代码如下所示:
from types import GeneratorType
class Scheduler(object):
def __init__(self):
self.queue = []
def _schedule(self, coroutine, stacks=(), init_val=None):
def resume():
value = init_val
while True:
try:
value = coroutine.send(value)
if value is None:
self.queue.append(resume)
break
if isinstance(value, GeneratorType):
# 生成子协程,挂起父协程,调度子协程运行
self._schedule(value, (coroutine, stacks))
break
except StopIteration:
# stacks非空则表明有父协程,需要恢复父协程的调度
if stacks:
parent, stk = stacks
self._schedule(parent, stk, value)
break
self.queue.append(resume)
def add(self, coroutine):
self._schedule(coroutine)
def run(self):
while self.queue:
func = self.queue.pop(0)
func()
Scheduler是协程调度器,提供两个接口,add()添加协程,run()执行所有协程到结束。目前这个版本只支持无阻塞的协程,也就是说,不支持协程休眠或者IO时自动切换。
接下来说一下具体的实现原理。
数据结构上,Scheduler使用列表queue存储多个协程(实际上是函数)。
流程上,add()仅仅是调用_schedule()将协程加入queue中,run()则是不断取出queue的第一个元素执行,直到queue变空。
核心实现在_schedule()函数中,该函数的作用是调度协程运行,核心思想是将协程变成迭代运行的闭包。该函数有三个参数,第一个是协程,生成器类型;第二个参数是调用堆栈,一个空元祖或者二元的元祖,第一个元素是父协程,第二个元素是父协程的调用堆栈;第三个参数是传递给协程的初始参数,也是子协程的返回值,用来推动协程的运行。_schedule()函数返回一个闭包,是一个无参数的函数。
_schedule函数内部定义的闭包函数叫resume(),该函数的作用是循环迭代生成器直到结束。调用coroutine.send(value),将value传给协程,协程接收这个参数并执行到下一个yield的地方,返回一个value,或者抛出StopIteration异常退出。
先说抛出异常的情况,这个时候协程coroutine执行结束了,value是该协程最后一个返回值,如果此时有父协程,则需要重新恢复父协程的调度,并将该返回值传给父协程。
再说第二种情况,send()返回正常值value,分三种可能:
- value是None,可以认为协程执行了一个
yield;
的代码,效果就是协程主动让出执行权,此时将协程添加到queue尾部,等待下一次执行; - value是一个生成器,则表明当前协程产生了一个子协程,此时则挂起当前协程,调度产生的子协程。注意,此时调度子协程的stacks参数是
(coroutine, stacks)
,第二个元素是当前协程的stacks,由此多层调用之后,就会形成一个stacks的嵌套,从而无形中保持了一个协程调用的堆栈。这个技巧很巧妙,在运行中实现了调用栈,避免了额外的变量来实现这个调用栈; - value是其它值,则循环继续,也就是会继续调用send()。
测试
接下来,创建几个协程测试一下调度器。这里采用的典型的并发场景生产者-消费者模型,而且相互之间还有协作,可以测试协程的交替运行。
生产者-消费者的代码如下所示。生产者与消费者之间使用一个队列作为缓冲,生产者和消费者的数量都是不固定的。
qsize = 3
def producer(q, pid):
# type: (list, int) -> GeneratorType
for j in range(5):
while len(q) == qsize:
print "producer %s sleep due to full queue" % pid
yield
q.append(j)
print "producer %s put %s" % (pid, j)
yield
print "producer %s exit" % pid
def consumer(q, cid):
# type: (list, int) -> GeneratorType
while True:
for i in range(qsize):
if q:
break
else:
yield
else:
print "consumer %s dead due to hungry" % cid
break
j = q.pop(0)
print "consumer %s get %s" % (cid, j)
yield
def run():
s = Scheduler()
q = []
pnum = 3
cnum = 2
for i in range(pnum):
s.add(producer(q, i))
for i in range(cnum):
s.add(consumer(q, i))
s.run()
if __name__ == "__main__":
run()
代码的输出如下所示。从结果中可以看到, 协程的交替执行。生产者在队列满的时候执行yield;
代码让出执行权,等待下一次执行。消费者在队列空的时候让出执行权。
producer 0 put 0
producer 1 put 0
producer 2 put 0
consumer 0 get 0
consumer 1 get 0
producer 0 put 1
producer 1 put 1
producer 2 sleep due to full queue
consumer 0 get 0
consumer 1 get 1
producer 0 put 2
producer 1 put 2
producer 2 sleep due to full queue
consumer 0 get 1
consumer 1 get 2
producer 0 put 3
producer 1 put 3
producer 2 sleep due to full queue
consumer 0 get 2
consumer 1 get 3
producer 0 put 4
producer 1 put 4
producer 2 sleep due to full queue
consumer 0 get 3
consumer 1 get 4
producer 0 exit
producer 1 exit
producer 2 put 1
consumer 0 get 4
consumer 1 get 1
producer 2 put 2
consumer 0 get 2
producer 2 put 3
consumer 0 get 3
producer 2 put 4
consumer 0 get 4
producer 2 exit
consumer 1 dead due to hungry
consumer 0 dead due to hungry
下一步计划
当前版本比较简陋,下一步需要支持的特性包括:
- 协程休眠
- IO切换
网友评论