美文网首页
简易协程-1

简易协程-1

作者: 大海无垠_af22 | 来源:发表于2019-01-13 12:05 被阅读0次

    什么是协程

    协程是轻量级的线程,轻量的意思就是占用内存少,一个线程可以轻松的跑出数万以上的协程。协程的执行是并发的,也就意味着使用协程可以轻松实现大量并发,所以协程最大的用处在于便于编写高性能异步网络程序。
    从代码执行的角度来看,协程在函数执行中时,允许跳出当前函数,转而执行其它代码,在执行权切换回来时,恢复到原位继续执行。
    其实进程、线程的执行在cpu看来也是这样的,只是这个切换的过程是由操作系统来完成了。而协程是用户级别的轻量级线程,这个切换只能是用户自行完成。

    简易实现

    接下来,我们将基于python2.7版本的生成器特性来实现一个简单的协程,并以经典的“生产者-消费者”模型作为示例。
    在python中,包含yield关键字代码的函数就是生成器,调用这样的函数会返回一个生成器,类型是GeneratorType。生成器最重要的函数是send(),这个函数接收一个参数,并推动生成器执行部分代码,准确说是到下一处yield,接着跳出函数,反复调用send()可以完成生成器的迭代。也就是说,生成器能够满足前述的从函数中跳出再回来的需求。
    后续,协程指的就是一个生成器。

    协程的实现不需要考虑,我们需要考虑的是协程执行的实现,也就是一个协程调度器,一个简易的版本实现代码如下所示:

    from types import GeneratorType
    
    class Scheduler(object):
        def __init__(self):
            self.queue = []
        def _schedule(self, coroutine, stacks=(), init_val=None):
            def resume():
                value = init_val
                while True:
                    try:
                        value = coroutine.send(value)
                        if value is None:
                            self.queue.append(resume)
                            break
                        if isinstance(value, GeneratorType):
                            # 生成子协程,挂起父协程,调度子协程运行
                            self._schedule(value, (coroutine, stacks))
                            break
                    except StopIteration:
                        # stacks非空则表明有父协程,需要恢复父协程的调度
                        if stacks:
                            parent, stk = stacks
                            self._schedule(parent, stk, value)
                        break
            self.queue.append(resume)
        def add(self, coroutine):
            self._schedule(coroutine)
        def run(self):
            while self.queue:
                func = self.queue.pop(0)
                func()
    

    Scheduler是协程调度器,提供两个接口,add()添加协程,run()执行所有协程到结束。目前这个版本只支持无阻塞的协程,也就是说,不支持协程休眠或者IO时自动切换。
    接下来说一下具体的实现原理。
    数据结构上,Scheduler使用列表queue存储多个协程(实际上是函数)。
    流程上,add()仅仅是调用_schedule()将协程加入queue中,run()则是不断取出queue的第一个元素执行,直到queue变空。
    核心实现在_schedule()函数中,该函数的作用是调度协程运行,核心思想是将协程变成迭代运行的闭包。该函数有三个参数,第一个是协程,生成器类型;第二个参数是调用堆栈,一个空元祖或者二元的元祖,第一个元素是父协程,第二个元素是父协程的调用堆栈;第三个参数是传递给协程的初始参数,也是子协程的返回值,用来推动协程的运行。_schedule()函数返回一个闭包,是一个无参数的函数。
    _schedule函数内部定义的闭包函数叫resume(),该函数的作用是循环迭代生成器直到结束。调用coroutine.send(value),将value传给协程,协程接收这个参数并执行到下一个yield的地方,返回一个value,或者抛出StopIteration异常退出。
    先说抛出异常的情况,这个时候协程coroutine执行结束了,value是该协程最后一个返回值,如果此时有父协程,则需要重新恢复父协程的调度,并将该返回值传给父协程。
    再说第二种情况,send()返回正常值value,分三种可能:

    1. value是None,可以认为协程执行了一个yield;的代码,效果就是协程主动让出执行权,此时将协程添加到queue尾部,等待下一次执行;
    2. value是一个生成器,则表明当前协程产生了一个子协程,此时则挂起当前协程,调度产生的子协程。注意,此时调度子协程的stacks参数是(coroutine, stacks),第二个元素是当前协程的stacks,由此多层调用之后,就会形成一个stacks的嵌套,从而无形中保持了一个协程调用的堆栈。这个技巧很巧妙,在运行中实现了调用栈,避免了额外的变量来实现这个调用栈;
    3. value是其它值,则循环继续,也就是会继续调用send()。

    测试

    接下来,创建几个协程测试一下调度器。这里采用的典型的并发场景生产者-消费者模型,而且相互之间还有协作,可以测试协程的交替运行。
    生产者-消费者的代码如下所示。生产者与消费者之间使用一个队列作为缓冲,生产者和消费者的数量都是不固定的。

    qsize = 3
    
    def producer(q, pid):
        # type: (list, int) -> GeneratorType
        for j in range(5):
            while len(q) == qsize:
                print "producer %s sleep due to full queue" % pid
                yield
            q.append(j)
            print "producer %s put %s" % (pid, j)
            yield
        print "producer %s exit" % pid
    
    def consumer(q, cid):
        # type: (list, int) -> GeneratorType
        while True:
            for i in range(qsize):
                if q:
                    break
                else:
                    yield
            else:
                print "consumer %s dead due to hungry" % cid
                break
            j = q.pop(0)
            print "consumer %s get %s" % (cid, j)
            yield
    
    def run():
        s = Scheduler()
        q = []
        pnum = 3
        cnum = 2
        for i in range(pnum):
            s.add(producer(q, i))
        for i in range(cnum):
            s.add(consumer(q, i))
        s.run()
    
    if __name__ == "__main__":
        run()
    

    代码的输出如下所示。从结果中可以看到, 协程的交替执行。生产者在队列满的时候执行yield;代码让出执行权,等待下一次执行。消费者在队列空的时候让出执行权。

    producer 0 put 0
    producer 1 put 0
    producer 2 put 0
    consumer 0 get 0
    consumer 1 get 0
    producer 0 put 1
    producer 1 put 1
    producer 2 sleep due to full queue
    consumer 0 get 0
    consumer 1 get 1
    producer 0 put 2
    producer 1 put 2
    producer 2 sleep due to full queue
    consumer 0 get 1
    consumer 1 get 2
    producer 0 put 3
    producer 1 put 3
    producer 2 sleep due to full queue
    consumer 0 get 2
    consumer 1 get 3
    producer 0 put 4
    producer 1 put 4
    producer 2 sleep due to full queue
    consumer 0 get 3
    consumer 1 get 4
    producer 0 exit
    producer 1 exit
    producer 2 put 1
    consumer 0 get 4
    consumer 1 get 1
    producer 2 put 2
    consumer 0 get 2
    producer 2 put 3
    consumer 0 get 3
    producer 2 put 4
    consumer 0 get 4
    producer 2 exit
    consumer 1 dead due to hungry
    consumer 0 dead due to hungry
    

    下一步计划

    当前版本比较简陋,下一步需要支持的特性包括:

    • 协程休眠
    • IO切换

    相关文章

      网友评论

          本文标题:简易协程-1

          本文链接:https://www.haomeiwen.com/subject/zypkdqtx.html