美文网首页
【Python】协程

【Python】协程

作者: 下里巴人也 | 来源:发表于2018-01-10 18:14 被阅读62次

    参考:https://thief.one/2017/02/20/Python%E5%8D%8F%E7%A8%8B/
    http://blog.gusibi.com/post/python-coroutine-1-yield/

    1 概念

    协程(Coroutine),又称微线程。

    • 协程作用
      在执行函数A时,可以随时终端,去执行函数B,然后中断继续执行函数A(可以自由切换)。不同于函数调用(没有调用语句)。
    • 协程优势
      1)执行效率极高,因为子程序切换(函数)不是线程切换,由程序自身控制,没有切换线程的开销。所以与多线程相比,线程的数量越多,协程性能的优势越明显。
      2)不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在控制共享资源时也不需要加锁,因此执行效率高很多。
      3)协程可以处理IO密集型程序的效率问题,但是处理CPU密集型不是它的长处,如要充分发挥CPU利用率可以结合多进程+协程。

    2 Python2.x协程

    python2.x协程应用:

    • yield
    • gevent

    2.1 gevent

    gevent是第三方库,通过greenlet实现协程,其基本思想:
    当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

    2.1.1 install

    pip install gevent
    

    2.1.2 gevent usage

    • monkey可以使一些阻塞的模块变得不阻塞,机制:遇到IO操作则自动切换,手动切换可以用gevent.sleep(0)
    • gevent.spawn 启动协程,参数为函数名称,参数名称
    • gevent.joinall 停止协程

    2.1.3 示例

    一个爬虫的例子:

    #! -*- coding:utf-8 -*-
    import gevent
    from gevent import monkey;monkey.patch_all()
    import urllib2
    def get_body(i):
      print "start",i
      urllib2.urlopen("http://cn.bing.com")
      print "end",i
    tasks=[gevent.spawn(get_body,i) for i in range(3)]
    gevent.joinall(tasks)
    

    运行结果:

    start 0
    start 1
    start 2
    end 2
    end 0
    end 1
    

    说明:从结果上来看,执行get_body的顺序应该先是输出”start”,然后执行到urllib2时碰到IO堵塞,则会自动切换运行下一个程序(继续执行get_body输出start),直到urllib2返回结果,再执行end。也就是说,程序没有等待urllib2请求网站返回结果,而是直接先跳过了,等待执行完毕再回来获取返回值。值得一提的是,在此过程中,只有一个线程在执行,因此这与多线程的概念是不一样的。

    换成多线程实现:

    import threading
    import urllib2
    def get_body(i):
      print "start",i
      urllib2.urlopen("http://cn.bing.com")
      print "end",i
    for i in range(3):
      t=threading.Thread(target=get_body,args=(i,))
      t.start()
    

    运行结果:

    start 0
    start 1
    start 2
    end 1
    end 2
    end 0
    

    说明:从结果来看,多线程与协程的效果一样,都是达到了IO阻塞时切换的功能。不同的是,多线程切换的是线程(线程间切换),协程切换的是上下文(可以理解为执行的函数)。而切换线程的开销明显是要大于切换上下文的开销,因此当线程越多,协程的效率就越比多线程的高。(猜想多进程的切换开销应该是最大的)

    2.2 yield

    协程的底层架构是在pep342 中定义,并在python2.5 实现的。
    python2.5中,yield关键字可以在表达式中使用,而且生成器API中增加了.send(value)方法。生成器可以调用.send(...)方法发送数据,发送的数据会成为生成器函数中yield表达式的值。

    2.2.1 协程生成器的基本行为

    首先说明一下,协程有四个状态,可以使用inspect.getgeneratorstate(…)函数确定:

    • GEN_CREATED # 等待开始执行
    • GEN_RUNNING # 解释器正在执行(只有在多线程应用中才能看到这个状态)
    • GEN_SUSPENDED # 在yield表达式处暂停
    • GEN_CLOSED # 执行结束
      示例代码:
    #! -*- coding: utf-8 -*-
    import inspect
    
    # 协程使用生成器函数定义:定义体中有yield关键字。
    def simple_coroutine():
      print('-> coroutine started')
      # yield 在表达式中使用;如果协程只需要从客户那里接收数据,yield关键字右边不需要加表达式(yield默认返回None)
      x = yield
      print('-> coroutine received:', x)
    
    my_coro = simple_coroutine()
    my_coro # 和创建生成器的方式一样,调用函数得到生成器对象。
    # 协程处于 GEN_CREATED (等待开始状态)
    print(inspect.getgeneratorstate(my_coro))
    
    my_coro.send(None)
    # 首先要调用next()函数,因为生成器还没有启动,没有在yield语句处暂停,所以开始无法发送数据
    # 发送 None 可以达到相同的效果 my_coro.send(None) 
    next(my_coro)
    # 此时协程处于 GEN_SUSPENDED (在yield表达式处暂停)
    print(inspect.getgeneratorstate(my_coro))
    
    # 调用这个方法后,协程定义体中的yield表达式会计算出42;现在协程会恢复,一直运行到下一个yield表达式,或者终止。
    my_coro.send(42)
    print(inspect.getgeneratorstate(my_coro))
    

    运行上述代码,输出结果如下:

    GEN_CREATED
    -> coroutine started
    GEN_SUSPENDED
    -> coroutine received: 42
    
    # 这里,控制权流动到协程定义体的尾部,导致生成器像往常一样抛出StopIteration异常
    Traceback (most recent call last):
    File "/Users/gs/coroutine.py", line 18, in <module> 
      my_coro.send(42)
    StopIteration
    

    send方法的参数会成为暂停yield表达式的值,所以,仅当协程处于暂停状态是才能调用send方法。 如果协程还未激活(GEN_CREATED 状态)要调用next(my_coro) 激活协程,也可以调用my_coro.send(None)

    再看一个两个值的协程:

    def simple_coro2(a):
      print('-> coroutine started: a =', a)
      b = yield a
      print('-> Received: b =', b)
      c = yield a + b
      print('-> Received: c =', c)
    
    my_coro2 = simple_coro2(14)
    print(inspect.getgeneratorstate(my_coro2))
    # 这里inspect.getgeneratorstate(my_coro2) 得到结果为 GEN_CREATED (协程未启动)
    
    next(my_coro2)
    # 向前执行到第一个yield 处 打印 “-> coroutine started: a = 14”
    # 并且产生值 14 (yield a 执行 等待为b赋值)
    print(inspect.getgeneratorstate(my_coro2))
    # 这里inspect.getgeneratorstate(my_coro2) 得到结果为 GEN_SUSPENDED (协程处于暂停状态)
    
    my_coro2.send(28)
    # 向前执行到第二个yield 处 打印 “-> Received: b = 28”
    # 并且产生值 a + b = 42(yield a + b 执行 得到结果42 等待为c赋值)
    print(inspect.getgeneratorstate(my_coro2))
    # 这里inspect.getgeneratorstate(my_coro2) 得到结果为 GEN_SUSPENDED (协程处于暂停状态)
    
    my_coro2.send(99)
    # 把数字99发送给暂停协程,计算yield 表达式,得到99,然后把那个数赋值给c 打印 “-> Received: c = 99”
    # 协程终止,抛出StopIteration
    

    运行上述代码,输出结果如下:

    GEN_CREATED
    -> coroutine started: a = 14
    GEN_SUSPENDED
    -> Received: b = 28
    -> Received: c = 99
    
    Traceback (most recent call last):
    File "/Users/gs/coroutine.py", line 37, in <module>
      my_coro2.send(99)
    StopIteration
    

    simple_coro2 协程的执行过程分为3个阶段,如下图所示:


    1)调用next(my_coro2),打印第一个消息,然后执行yield a,产出数字14.
    2)调用my_coro2.send(28),把28赋值给b,打印第二个消息,然后执行 yield a + b 产生数字42
    3)调用my_coro2.send(99),把99赋值给c,然后打印第三个消息,协程终止。

    3 实战

    以下是读视频流然后做视频识别的实战案例。
    代码如下:

        def __video_capture(self, target):
          while self.__gate_opened:
              origin_frame = self.__video_stream.read()
              frame_rgb = cv2.cvtColor(origin_frame, cv2.COLOR_BGR2RGB)
              target.send(frame_rgb)
          else:
              target.close()
    
    @coroutine
      def __parse_origin_frame(self, target):
          try:
              while True:
                  frame = (yield)
                  if frame is None:
                      continue
    
                  category_index = ComponentManager.get_category_index()
                  updated_frame, score, classes,boxes = parse_origin_video_frame(frame, self.__session, self.__detection_graph, category_index)
                  items = item_detect(score, classes, boxes, category_index)
                  if items:
                      valid_frame = Frame()
                      #print "items: ",
                      for item in items:
                          #print item.get_id(), item.get_name(), item.get_score(), item.get_x(), item.get_y(),
                          valid_frame.add_item(item)
                      #print " end"
                      self.__cur_frame_manager.add_frame(valid_frame)
                  target.send(updated_frame)
          except GeneratorExit:
              target.close()
    
      @coroutine
      def __handle_cv(self):
          try:
              while True:
                  frame = (yield)
                  if frame is None:
                      continue
    
                  output_rgb = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
                  cv2.imshow(self.__video_name, output_rgb)
                  if cv2.waitKey(1) & 0xFF == ord('q'):
                      break
    
          except GeneratorExit:
              with self.__lock:
                  self.__cur_frame_manager.parse_frame()
                  self.__cur_frame_manager = None
    

    相关文章

      网友评论

          本文标题:【Python】协程

          本文链接:https://www.haomeiwen.com/subject/rdxonxtx.html