1、协程和多线程的混合使用
协程的最大优势是没有多线程的锁机制,因为它只有一个线程,也不存在同时写变量的冲突,所以执行效率比多线程高很多。不过,如果你的cpu不止一个核,那么就可以将协程和多线程(或者子线程)混合起来,进一步提高执行效率。执行流程大概如下图所示:
Paste_Image.png1.1 流水线的例子
还是看到上一篇博客中3.1节的例子,一个工厂流水线:
- manager随机产生一些5~10位的长度的字符串,传递给第一个工人
- 第一个工人将字符串截断,只取前5个字符
- 第二个工人将这个字符串中的“数字”去掉,然后排序,输出一个新字符串
- 第三个工人将字符串中的字符都变为大写
现在我们需要把第3、4个步骤放到一个新的线程里去执行,代码如下:
#协程和多线程混合的例子
from random import shuffle,randint
import re
def random_str():
#随机生成5~10位字符串
chars = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789'
chars_list=list(chars)
shuffle(chars_list)
res=''.join(chars_list[0:randint(5,10)])
return res
def manager(target):
#生成1万个随机字符串并传入work1
n=0
target.__next__()
while (n<100000):
n=n+1
resource = random_str()
print("Manager: %s" % resource)
target.send(resource)
def work1(target):
target.__next__()
while True:
input_str = yield
if len(input_str)>5:
#截断
res=input_str[0:5]
else:
res=input_str
#给下一个生成器传入值
target.send(res)
def work2(target):
target.__next__()
while True:
str_from_work1 = yield
if str_from_work1:
#去掉字符串中的数字
res=re.sub(r'([\d]+)','',str_from_work1)
target.send(res)
def work3():
#字母变为大写
while True:
str_from_work2 = yield
if str_from_work2:
res=str_from_work2.upper()
print("output: %s" % res)
from threading import Thread
from queue import Queue
def cothread(target):
target.__next__()
#由于开多线程,使用一个Queue新线程进行沟通
message = Queue()
def run_target():
while True:
item = message.get()
if item is GeneratorExit:
target.close()
return
else:
target.send(item)
#开始一个新的线程
Thread(target=run_target).start()
#主线程通过queue和新的线程通信
try:
while True:
item = yield
message.put(item)
except GeneratorExit:
message.put(GeneratorExit)
if __name__ == '__main__':
manager(work1(cothread(work2(work3()))))
由上面的例子可以看到work2和work3之前多了一个cothread的生成器,这个生成器打开了一个新的Thread,并通过一个Queue实现线程间的通信。类似的,还可以通过subprocess(pipe通信)、网络等方法去包装协程。也就是说使用协程可以把你的“实现”和“环境”分割开来,上面例子中的work和manager就相当于“实现”的逻辑。而不同的实现环节可以放到不同的“环境(多线程、子线程、网络)”中去具体执行。
1.2 特别注意
需要特别注意的两点:
- 在调用协程的send函数时,必须是同步的。如果给正在执行的生成器send一个值,生成器会crash
- 在将生成器组合成流水线时,生成器的连接不能存在loop
2、协程与任务调度器
在David Beazley教程的后半部分,第7章开始(http://dabeaz.com/coroutines/),讨论的主要是只用协程能不能用来构造一个类似于操作系统的调度器?答案是:能!
首先来看看一个操作系统的调度器需要实现那些东西:
- 需要有一个task类
- 需要一个调度器,scheduler
- scheduler能够调度multitask,多任务交替运行
- task执行完之后可以退出
- 允许有系统调用,对task进行基本的管理
- 可以创建新的task
- 系统调用可以kill task也可以wait for task(异步task)
一下是一个例子,在这个例子中,只用协程(不使用多线程,子线程)就实现了以上的各种功能:
class Task(object):
taskid = 0
def __init__(self,target):
Task.taskid += 1
self.tid = Task.taskid # Task ID
self.target = target # Target coroutine
self.sendval = None # Value to send
# Run a task until it hits the next yield statement
def run(self):
return self.target.send(self.sendval)
# ------------------------------------------------------------
# === Scheduler ===
# ------------------------------------------------------------
from queue import Queue
class Scheduler(object):
def __init__(self):
self.ready = Queue()
self.taskmap = {}
# Tasks waiting for other tasks to exit
self.exit_waiting = {}
def new(self,target):
newtask = Task(target)
self.taskmap[newtask.tid] = newtask
self.schedule(newtask)
return newtask.tid
def exit(self,task):
print ("Task %d terminated" % task.tid)
del self.taskmap[task.tid]
# Notify other tasks waiting for exit
# 如果有别的task正在等这个task,那么调度别的task
for task in self.exit_waiting.pop(task.tid,[]):
self.schedule(task)
def waitforexit(self,task,waittid):
#如果waitid在taskmap中,将waittid放入self.exit_waiting字典中
#将需要等待waitid的task,注册到这个字典里面
if waittid in self.taskmap:
self.exit_waiting.setdefault(waittid,[]).append(task)
return True
else:
return False
def schedule(self,task):
self.ready.put(task)
def mainloop(self):
while self.taskmap:
task = self.ready.get()
try:
result = task.run()
if isinstance(result,SystemCall):
result.task = task
result.sched = self
result.handle()
continue
self.schedule(task)
# ------------------------------------------------------------
# === System Calls ===
# ------------------------------------------------------------
class SystemCall(object):
def handle(self):
pass
# Return a task's ID number
class GetTid(SystemCall):
def handle(self):
self.task.sendval = self.task.tid
self.sched.schedule(self.task)
# Create a new task
class NewTask(SystemCall):
def __init__(self,target):
self.target = target
def handle(self):
tid = self.sched.new(self.target)
self.task.sendval = tid
self.sched.schedule(self.task)
# Kill a task
class KillTask(SystemCall):
def __init__(self,tid):
self.tid = tid
def handle(self):
task = self.sched.taskmap.get(self.tid,None)
if task:
task.target.close()
self.task.sendval = True
else:
self.task.sendval = False
self.sched.schedule(self.task)
# Wait for a task to exit
class WaitTask(SystemCall):
def __init__(self,tid):
self.tid = tid
def handle(self):
result = self.sched.waitforexit(self.task,self.tid)
self.task.sendval = result
# If waiting for a non-existent task,
# return immediately without waiting
if not result:
self.sched.schedule(self.task)
# ------------------------------------------------------------
# === Example ===
# ------------------------------------------------------------
if __name__ == '__main__':
def foo():
for i in range(5):
print ("I'm foo")
yield
def main():
child = yield NewTask(foo())
print ("Waiting for child")
yield WaitTask(child)
print ("Child done")
sched = Scheduler()
sched.new(main())
sched.mainloop()
上面的代码很长,有兴趣可以去教程的网站上,查看作者是如何一步一步的实现一个os的各种调度的功能的。
网友评论