Python协程

作者: Recalcitrant | 来源:发表于2019-06-30 14:32 被阅读0次

    目录:
    一、基于生成器的协程
    二、协程状态
    三、协程预激装饰器
    四、终止协程和异常处理
    五、协程返回值
    六、yield from
    七、greenlet协程
    八、gevent协程

    Python并发之协程

    协程(coroutine)可以在执行期间暂停(suspend),这样就可以在等待外部数据处理完成之后(例如,等待网络I/O数据),从之前暂停的地方恢复执行(resume)

    一、基于生成器的协程

    • 生成器
      2001年,Python 2.2 通过了 PEP 255 -- "Simple Generators" ,引入了 yield 关键字实现了生成器函数。

    yield item:yield包含”产出“和”让步“两个含义。生成器中 yield x 这行代码会 产出 一个值,提供给 next(...) 的调用方。此外,还会作出让步,暂停执行生成器,让调用方继续工作,直到需要使用另一个值时再调用 next(...)。

    • 协程
      2005年,Python 2.5 通过了 PEP 342 -- "Coroutines via Enhanced Generators",给生成器增加了.send()、.throw()和.close()方法,第一次实现了基于生成器的协程函数(generator-based coroutines)。

    send(item):生成器的调用方可以使用send()方法发送数据,发送的数据会成为生成器中yield表达式的值。此时称生成器为协程。协程指与调用方协作的过程,产出由调用方提供的值。

    示例:

    def simple_coroutine():
        print("->协程开始")
        x = yield
        print("->协程收到x:", x)
    
    
    my_coro = simple_coroutine()
    print(my_coro)
    print(next(my_coro))
    my_coro.send(50)
    
    运行结果

    运行结果解释:

    ① yield在表达式中的使用:如果协程只需从客户那里接收数据,那么产出的值是None(这个值是隐式指定的,因为yield关键字右边没有表达式)。
    ② 首先要需要调用next(...)。因为生成器还没启动,未在yield语句处暂停,所以一开始无法发送数据。
    ③ 调用send()方法后,协程定义体中的yield表达式会计算出50。调用send()方法后协程会恢复,一直运行到下一个yield表达式,或者终止。

    二、协程状态

    GEN_CREATED:创建状态(等待开始执行)
    GEN_SUSPENDED:挂起状态(在yield表达式处暂停)
    GEN_RUNNING:运行状态(正在被解释器执行。只有在多线程应用中才能看到这个状态)
    GEN_CLOSED:关闭状态(执行结束)

    协程激活方法

    • 1.调用next():next(generator)
    • 2.发送None:generator.send(None)

    示例:

    In[1]:  from inspect import getgeneratorstate
            def simple_coroutine():
                print("->协程开始")
                x = yield
                print(getgeneratorstate(my_coro2))
                print("->协程收到x:", x)
    
            my_coro = simple_coroutine()
            next(my_coro)
            print(getgeneratorstate(my_coro))
            my_coro.send(50)
    
    In[2]:  print(getgeneratorstate(my_coro))
    

    三、协程预激装饰器

    1.@wraps装饰器

    Python装饰器(decorator)在实现的时候,被装饰后的函数其实已经是另外一个函数了(函数名等函数属性会发生改变)。为了消除这样的影响,Python的functools包中提供了一个叫wraps的decorator来消除这样的副作用。写一个decorator的时候,最好在实现之前加上functools的wrap,它能保留原有函数的名称和docstring。

    示例:

    from functools import wraps
    def my_decorater(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            """
            wrapper.doc
            :param args:
            :param kwargs:
            :return:
            """
            print("calling decorater start")
            func(*args, **kwargs)
            print("calling decorater end")
    
        return wrapper
    
    
    @my_decorater
    def exam():
        """
        exam.doc
        :return:
        """
        print("example....")
    
    
    exam()
    print("func_name:", exam.__name__)
    print("func_doc:", exam.__doc__)
    
    运行结果

    2.协程预激装饰器的实现

    from functools import wraps
    def coroutine_activator(function):
        @wraps(function)
        def wrapper(*args, **kwargs):
            generator = function(*args, **kwargs)
            next(generator)
            return generator
    
        return wrapper
    

    示例:

    def coroutine_activator(function):
        @wraps(function)
        def wrapper(*args, **kwargs):
            generator = function(*args, **kwargs)
            next(generator)
            return generator
    
        return wrapper
    
    
    @coroutine_activator
    def averager():
        total = 0.0
        count = 0
        average = 0.0
    
        while True:
            term = yield average
            total += term
            count += 1
            average = total / count
    
    
    cor_avg = averager()
    print(getgeneratorstate(cor_avg))
    
    运行结果

    四、终止协程和异常处理

    协程中未处理的异常会向上冒泡,传给next()函数或.send()方法的调用方(即触发协程的对象)。

    1.generator.throw(exc_type[, exc_value[, traceback]])

    • 1.该方法会使生成器在暂停的yield表达式处抛出指定的异常。
    • 2.如果生成器处理了该异常,代码会继续执行到下一个yield表达式,而产出的值会成为generator.throw()方法的返回值。
    • 3.如果没有处理这个异常,异常则会向上冒泡。

    示例:

    class DemoException(Exception):
        pass
    
    
    def demo_exc_handling():
        print("-->coroutine started")
    
        while True:
            try:
                x = yield
    #         except GeneratorExit:
    #             print("-->GeneratorExit has been handled. Call close methond...")
            except DemoException:
                print("-->DemoException has been handled. Continuing...")
            else:
                print("-->coroutine received:{!r}".format(x))
            finally:
                print("-->end...")
    
    
    exc_coro = demo_exc_handling()
    next(exc_coro)
    print(getgeneratorstate(exc_coro))
    
    exc_coro.throw(DemoException)
    print(getgeneratorstate(exc_coro))
    
    exc_coro.throw(ZeroDivisionError)
    print(getgeneratorstate(exc_coro))
    
    运行结果

    2.generator.close()

    • 1.该方法会使生成器在暂停的yield表达式处抛出GeneratorExit异常。
    • 2.如果生成器没有处理这个异常,或者抛出了StopIteration异常(通常是指运行到结尾),调用方不会报错。
    • 3.如果收到GeneratorExit异常,生成器一定不能产出值,否则解释器会抛出RuntimeError异常。
    • 4.生成器抛出的其他异常会向上冒泡,传给调用方。
    coro = demo_exc_handling()
    next(coro)
    coro.send(10)
    
    coro.close()
    coro.send(20)
    
    运行结果

    五、协程return值

    有些协程在被激活后,每次驱动(drive)协程时,不会产出值,而是在最后(协程正常终止时)返回一个值(通常是某种累加值)。

    from collections import namedtuple
    Result = namedtuple('Result', 'count average')
    def averager():
        total = 0.0
        count = 0
        average = 0.0
    
        while True:
            term = yield average
            # 为了返回值,协程必须正常终止。这里使用条件判断,以便退出累计循环
            if term is None:
                break
            total += term
            count += 1
            average = total / count
        # 返回一个 namedtuple,包含 count 和 average 两个字段。在 Python 3.3 之前,如果生成器返回值,解释器会报句法错误
        return Result(count, average)
    
    
    coro_avg = averager()
    next(coro_avg)
    coro_avg.send(10)
    coro_avg.send(20)
    coro_avg.send(30)
    
    #捕获StopIteration异常,获取 averager 返回的值:
    try:
        coro_avg.send(None)
    except StopIteration as exc_data:
        r = exc_data.value
    print(r)
    
    运行结果

    六、yield from

    • 在生成器gen中使用yield from subgen()时,子生成器subgen会获得控制权,把产出的值传给gen的调用方,即调用方可以直接控制subgen。与此同时,gen会阻塞,等待subgen终止。
    • yield from 的主要功能是打开双向通道,把最外层的调用方与最内层的子生成器连接起来,这样二者可以直接发送和产出值,还可以直接传入异常,而不用在位于中间的协程中添加大量处理异常的样板代码。通过这个结构,协程可以把功能委托给子生成器。
    • 调用方:调用委派生成器的客户端代码。
    • 委派生成器:包含 yield from <iterable> 表达式语句的生成器函数(包含yield from语句的生成器)。
    • 子生成器:从 yield from 表达式中 <iterable>部分获取的生成器(yield from后面的表达式)。

    简单示例:

    def gen():
        yield from "AB"
        yield from range(1, 3)
        
        
    print(list(gen()))
    
    运行结果

    示例:

    Result = namedtuple('Result', 'count average')
    
    
    # 子生成器
    def averager():
        total = 0.0
        count = 0
        average = 0.0
    
        while True:
            term = yield average
            if term is None:
                break
            total += term
            count += 1
            average = total / count
    
        return Result(count, average)
    
    
    # 委派生成器
    def grouper(results, key):
        while True:
            results[key] = yield from averager()
    
    
    # 客户端代码
    def main(data):
        results = {}
        for key, values in data.items():
            group = grouper(results, key)
            # 激活委派生成器
            next(group)
    
            for value in values:
                group.send(value)
    
            group.send(None)
    
        print(results)
    
    
    data = {"girls:kg": [49.9, 39.5, 44.3, 44.2, 50.3, 40.5, 40.6, 30.8, 50, 55.5],
            "girls:m": [1.6, 1.5, 1.4, 1.3, 1.6, 1.42, 1.55, 1.66, 1.43, 1.60],
            "boys:kg": [43.9, 48.5, 44.3, 44.2, 50.3, 40.5, 40.6, 30.8, 50, 55.5],
            "boys:m": [1.6, 1.4, 1.4, 1.3, 1.6, 1.42, 1.55, 1.66, 1.43, 1.60],
            }
    main(data)
    
    运行结果

    运行结果解释:

    委派生成器在yield from 表达式处暂停时,调用方可以直接把数据发给子生成器,子生成器再把产出的值发给调用方。子生成器返回之后,解释器会抛出StopIteration异常,并把返回值附加到异常对象上,此时委派生成器会恢复运行。

    注意:

    • 如果子生成器不终止,委派生成器会在yield from处永远暂停。
    • 因为委派生成器相当于管道,所以可以把任意数量个委派生成器连接在一起:一个委派生成器使用yield from调用一个子生成器,而那个子生成器本身也是委派生成器,使用yield from调用另一个子生成器,以此类推。最终,这个链条要以一个只使用yield表达式的简单生成器结束;不过,也能以任何可迭代的对象结束。
    • 任何yield from链条都必须由客户驱动,在最外层委派生成器上调用next(...)函数或.send(...)方法。

    yield from的意义

    • 1.子生成器产出的值都直接传给委派生成器的调用方(即客户端代码)。
    • 2.使用send()方法发给委派生成器的值都直接传给子生成器。如果发送的值是None,那么会调用子生成器的next()方法。如果发送的值不是None,那么会调用子生成器的send()方法。如果调用的方法抛出StopIteration异常,那么委派生成器恢复运行。任何其他异常都会向上冒泡,传给委派生成器。
    • 3.生成器退出时,生成器(或子生成器)中的return expr表达式会触发StopIteration(expr)异常抛出。
    • 4.yield from表达式的值是子生成器终止时传给StopIteration异常的第一个参数。
    • 5.传入委派生成器的异常,除了GeneratorExit之外都传给子生成器的throw()方法。如果调用throw()方法时抛出StopIteration异常,委派生成器恢复运行。StopIteration之外的异常会向上冒泡,传给委派生成器。
    • 6.如果把GeneratorExit异常传入委派生成器,或者在委派生成器上调用close()方法,那么在子生成器上调用close()方法,如果它有的话。如果调用close()方法导致异常抛出,那么异常会向上冒泡,传给委派生成器;否则,委派生成器抛出GeneratorExit异常。

    七、greenlet协程

    greenlet由来

    虽然CPython(标准Python)能够通过生成器来实现协程,但使用起来还并不是很方便。 与此同时,Python的一个衍生版 Stackless Python。实现了原生的协程,它更利于使用。 于是,大家开始将 Stackless 中关于协程的代码 。单独拿出来做成了CPython的扩展包。 这就是 greenlet 的由来,因此 greenlet 是底层实现了原生协程的 C扩展库。

    import greenlet
    
    # 创建greenlet协程
    coroutine = greenlet.greenlet() 
    # 运行greenlet协程
    coroutine.switch()
    

    示例:生产者-消费者模式

    import greenlet
    import random
    def producer():
        while True:
            item = random.randint(0, 99)
            print("生成数字{}".format(item))
            c.switch(item)          # 将data传给c,并切换到c
    
    def consumer():
        while True:
            item = p.switch()       # 切换到p,等待传入数值
            print("消费数字{}".format(item))
    
    
    if __name__ == "__main__":
        c = greenlet.greenlet(consumer)
        p = greenlet.greenlet(producer)
        c.switch()
    
    运行结果

    greenlet 的优势:

    • 1.高性能的原生协程。
    • 2语义更加明确的显式切换。
    • 3.直接将函数包装成协程,保持原有代码风格。

    八、gevent协程

    gevent是什么:http://www.gevent.org/

    gevent是第三方库,通过greenlet实现协程,其基本思想是:

    当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。
    由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成。

    import gevent
    # 创建协程
    gevent.spawn(参数)
    # gevent协程运行列表
    gevent.joinall([协程列表])  
    

    示例:生产者-消费者模式

    import gevent
    from gevent.queue import Queue
    import random
    
    
    def Consumer(queue):
            while True:
                item = queue.get()
                print("消费数字{}".format(item))
    
    
    def Producer(queue):
            while True:
                item = random.randint(0, 99)
                queue.put(item)
                print("生产数字{}".format(item))
    
    
    queue = Queue(3)
    
    p = gevent.spawn(Producer, queue)
    c = gevent.spawn(Consumer, queue)
    gevent.joinall([p, c])
    

    相关文章

      网友评论

        本文标题:Python协程

        本文链接:https://www.haomeiwen.com/subject/yeyvcctx.html