美文网首页我的Python自学之路程序员
Python笔记_从迭代器、生成器到协程(三)

Python笔记_从迭代器、生成器到协程(三)

作者: Stansosleepy | 来源:发表于2017-01-07 08:31 被阅读119次

    1、协程和多线程的混合使用

    协程的最大优势是没有多线程的锁机制,因为它只有一个线程,也不存在同时写变量的冲突,所以执行效率比多线程高很多。不过,如果你的cpu不止一个核,那么就可以将协程和多线程(或者子线程)混合起来,进一步提高执行效率。执行流程大概如下图所示:

    Paste_Image.png

    1.1 流水线的例子

    还是看到上一篇博客中3.1节的例子,一个工厂流水线:

    1. manager随机产生一些5~10位的长度的字符串,传递给第一个工人
    2. 第一个工人将字符串截断,只取前5个字符
    3. 第二个工人将这个字符串中的“数字”去掉,然后排序,输出一个新字符串
    4. 第三个工人将字符串中的字符都变为大写

    现在我们需要把第3、4个步骤放到一个新的线程里去执行,代码如下:

    #协程和多线程混合的例子
    from random import shuffle,randint
    import re
    
    def random_str():
        #随机生成5~10位字符串
        chars = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789'
        chars_list=list(chars)
        shuffle(chars_list)
        res=''.join(chars_list[0:randint(5,10)])
        return res 
    
    def manager(target):
        #生成1万个随机字符串并传入work1
        n=0 
        target.__next__()
        while (n<100000):
            n=n+1
            resource = random_str()
            print("Manager: %s" % resource)
            target.send(resource)
    
    def work1(target):
        target.__next__()
        while True:
            input_str = yield
            if len(input_str)>5:
                #截断
                res=input_str[0:5]
            else:
                res=input_str
            #给下一个生成器传入值
            target.send(res)
    
    def work2(target):
        target.__next__()
        while True:
            str_from_work1 = yield
            if str_from_work1:
                #去掉字符串中的数字
                res=re.sub(r'([\d]+)','',str_from_work1)
                target.send(res)
    
    
    def work3():
        #字母变为大写
        while True:
            str_from_work2 = yield
            if str_from_work2:
                res=str_from_work2.upper()
                print("output: %s" % res)
    
    from threading import Thread
    from queue import Queue
    
    def cothread(target):
        target.__next__()
        #由于开多线程,使用一个Queue新线程进行沟通
        message = Queue()
    
        def run_target():
            while True:
                item = message.get()
                if item is GeneratorExit:
                    target.close()
                    return
                else:
                    target.send(item)
    
        #开始一个新的线程
        Thread(target=run_target).start()
    
        #主线程通过queue和新的线程通信
        try:
            while True:
                item = yield
                message.put(item)
        except GeneratorExit:
            message.put(GeneratorExit)
    
    if __name__ == '__main__':
        manager(work1(cothread(work2(work3()))))    
    

    由上面的例子可以看到work2和work3之前多了一个cothread的生成器,这个生成器打开了一个新的Thread,并通过一个Queue实现线程间的通信。类似的,还可以通过subprocess(pipe通信)、网络等方法去包装协程。也就是说使用协程可以把你的“实现”和“环境”分割开来,上面例子中的work和manager就相当于“实现”的逻辑。而不同的实现环节可以放到不同的“环境(多线程、子线程、网络)”中去具体执行。

    1.2 特别注意

    需要特别注意的两点:

    1. 在调用协程的send函数时,必须是同步的。如果给正在执行的生成器send一个值,生成器会crash
    2. 在将生成器组合成流水线时,生成器的连接不能存在loop

    2、协程与任务调度器

    在David Beazley教程的后半部分,第7章开始(http://dabeaz.com/coroutines/),讨论的主要是只用协程能不能用来构造一个类似于操作系统的调度器?答案是:能!

    首先来看看一个操作系统的调度器需要实现那些东西:

    • 需要有一个task类
    • 需要一个调度器,scheduler
    • scheduler能够调度multitask,多任务交替运行
    • task执行完之后可以退出
    • 允许有系统调用,对task进行基本的管理
    • 可以创建新的task
    • 系统调用可以kill task也可以wait for task(异步task)

    一下是一个例子,在这个例子中,只用协程(不使用多线程,子线程)就实现了以上的各种功能:

    class Task(object):
        taskid = 0 
        def __init__(self,target):
            Task.taskid += 1
            self.tid     = Task.taskid   # Task ID
            self.target  = target        # Target coroutine
            self.sendval = None          # Value to send
    
        # Run a task until it hits the next yield statement
        def run(self):
            return self.target.send(self.sendval)
    
    # ------------------------------------------------------------
    #                      === Scheduler ===
    # ------------------------------------------------------------
    from queue import Queue
    
    class Scheduler(object):
        def __init__(self):
            self.ready   = Queue()   
            self.taskmap = {}    
    
            # Tasks waiting for other tasks to exit
            self.exit_waiting = {}
    
        def new(self,target):
            newtask = Task(target)
            self.taskmap[newtask.tid] = newtask
            self.schedule(newtask)
            return newtask.tid
    
        def exit(self,task):
            print ("Task %d terminated" % task.tid)
            del self.taskmap[task.tid]
            # Notify other tasks waiting for exit
            # 如果有别的task正在等这个task,那么调度别的task
            for task in self.exit_waiting.pop(task.tid,[]):
                self.schedule(task)
    
        def waitforexit(self,task,waittid):
            #如果waitid在taskmap中,将waittid放入self.exit_waiting字典中
            #将需要等待waitid的task,注册到这个字典里面
            if waittid in self.taskmap:
                self.exit_waiting.setdefault(waittid,[]).append(task)
                return True
            else:
                return False
    
        def schedule(self,task):
            self.ready.put(task)
    
        def mainloop(self):
             while self.taskmap:
                task = self.ready.get()
                try:
                    result = task.run()
                    if isinstance(result,SystemCall):
                        result.task  = task
                        result.sched = self
                        result.handle()
                    continue
                self.schedule(task)
    
    # ------------------------------------------------------------
    #                   === System Calls ===
    # ------------------------------------------------------------
    
    class SystemCall(object):
        def handle(self):
            pass
    
    # Return a task's ID number
    class GetTid(SystemCall):
        def handle(self):
            self.task.sendval = self.task.tid
            self.sched.schedule(self.task)
    
    # Create a new task
    class NewTask(SystemCall):
        def __init__(self,target):
            self.target = target
        def handle(self):
            tid = self.sched.new(self.target)
            self.task.sendval = tid
            self.sched.schedule(self.task)
    
    # Kill a task
    class KillTask(SystemCall):
        def __init__(self,tid):
            self.tid = tid
        def handle(self):
            task = self.sched.taskmap.get(self.tid,None)
            if task:
                task.target.close()
                self.task.sendval = True
            else:
                self.task.sendval = False
            self.sched.schedule(self.task)
    
    # Wait for a task to exit
    class WaitTask(SystemCall):
        def __init__(self,tid):
            self.tid = tid
        def handle(self):
            result = self.sched.waitforexit(self.task,self.tid)
            self.task.sendval = result
            # If waiting for a non-existent task,
            # return immediately without waiting
            if not result:
                self.sched.schedule(self.task)
    
    # ------------------------------------------------------------
    #                      === Example ===
    # ------------------------------------------------------------
    if __name__ == '__main__':
        def foo():
            for i in range(5):
                print ("I'm foo")
                yield
    
        def main():
            child = yield NewTask(foo())
            print ("Waiting for child")
            yield WaitTask(child)
            print ("Child done")
    
        sched = Scheduler()
        sched.new(main())
        sched.mainloop()
    

    上面的代码很长,有兴趣可以去教程的网站上,查看作者是如何一步一步的实现一个os的各种调度的功能的。

    相关文章

      网友评论

        本文标题:Python笔记_从迭代器、生成器到协程(三)

        本文链接:https://www.haomeiwen.com/subject/gntfbttx.html