要点
- yield关键字后可接参数, 可以是一个数, 也可以是一个表达式, 其返回的对象是其调用者(send/next函数)
- send函数必须传参数, 其参数为yield的结果, 这个结果可以被赋值给变量; send函数的返回值即yield表达式的值
- next函数的返回值也是yield表达式的值
注意: yield生成的值与其后表达式的值可以不同, 这是因为send函数的参数改变了yield的生成值. 即yield生成什么由send决定, 不由其后的表达式决定.
python并发管理包concurrent.futures
concurrent.futures提供了一种简单的方式实现线程池与进程池任务管理. 线程与进程管理的接口是一致的, 使得最小的变化可在线程与进程间进行切换.
concurrent.futures模块提供两个类与池进行交互. Executors用于管理池中的任务(线程与workers), futures负责管理任务计算的结果.
应用程序首先创建合适的Executor(ThreadPoolExecutor和ProcessPoolExecutor)实例, 并向其实例submit不同的tasks.当task开始后, 系统将创建future实例. 当应用程序或者调用方需要使用task的执行过程状态与执行结果时, 可能使用future实例. 支持future模式的API都提供等待task完成的机制, 不需要调用方显示创建future对象.
python的并发处理通过Executor管理线程池与进程池中的tasks, 当task启动后会创建一个Future对象, Future对象负责管理各task的结果.
简单map()进行多线程处理
from concurrent import futures
import threading
import time
def tsk(n):
print(" {}: sleeping {}".format(threading.current_thread().name, n))
time.sleep(n /10)
print(" {}: done with {}".format(threading.current_thread().name, n))
return n/10
#####################################################
# 通过map()模拟多线程执行任务的过程, 无论线程执行任务的顺序如何,
# 这里的map()函数总是能按输入序列的顺序返回结果
#####################################################
ex = futures.ThreadPoolExecutor(max_workers=2)
print("main: starting")
result = ex.map(tsk, range(5, 0, -1))
print("main: unprocessed result {}".format(result))
print("main: waiting for real results")
real_result = list(result)
print("main: results: {}".format(real_result))
使用future进行间任务调度
#####################################################
# 调度单个任务, 作用future对象管理任务执行结果, 如何实现等待的?
# future对象在任务执行时, 状态为running;
# tsk完成结果返回给future后, 其状态为finished
#####################################################
ex = futures.ThreadPoolExecutor(max_workers=2)
print("main: starting")
f = ex.submit(tsk, 5)
print("main: future {}".format(f))
print("main: waiting for results")
result = f.result()
print("main: result: {}".format(result))
print("main: final future {}".format(f))
多任务按完成顺序进行返回
#####################################################
# 按任意顺序返回任务执行结果
# futures.as_completed, 指按照任务完成的顺序记录结果
#####################################################
import random
def tsk1(n):
time.sleep(random.random())
return n, n/10
ex = futures.ThreadPoolExecutor(max_workers=5)
print("main: starting")
wait_for = [ ex.submit(tsk1, i) for i in range(5, 0, -1)]
for f in futures.as_completed(wait_for):
print("main: finised future result: {}".format(f.result()))
回调函数
def done(fn):
if fn.cancelled():
print('{}: canceled'.format(fn.arg))
elif fn.done():
print('{}: not canceled'.format(fn.arg))
ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
tasks = []
for i in range(10, 0, -1):
print('main: submitting {}'.format(i))
f = ex.submit(tsk, i)
f.arg = i
f.add_done_callback(done)
tasks.append((i, f))
for i, t in reversed(tasks):
if not t.cancel():
print('main: did not cancel {}'.format(i))
ex.shutdown()
网友评论