人生只有三天,迷惑的人活在昨天,奢望的人活在明天,只有清澈的人活在今天。昨天已经过去,是过了期的支票,明天还没有来到,是不可提取的支票,只有活在今天是最现实的!
总结:
- 并发的解决方案:同步阻塞,异步非阻塞的方式(效率高)差异;
- 对于是IO密集型、还是CPU密集型;
1. concurrent包
concurrent.futures
Future对象是futures库实现异步任务的核心,该对象由线程池/进程池对象进行初始化,并返回给客户端,告诉客户端这是一个正在执行的任务,你可以现在去等待任务完成,然后继续别的工作,也可以先做别的,之后想获取结果时再从中获取。
3.2版本引入的模块。
异步并行任务编程模块,提供一个高级的异步可执行的便利接口。
提供了2个池执行器
ThreadPoolExecutor 异步调用的线程池的Executor
ProcessPoolExecutor 异步调用的进程池的Executor
选择它们的经验法则如下:
- 执行重I / O操作的任务(IO密集型)选择ThreadPoolExecutor,例如请求网页数据,文件读写等涉及网络,磁盘I / O相关的内容;
- 执行重CPU的任务(CPU密集型)选择ProcessPoolExecutor,例如大量消耗CPU的数学与逻辑运算,视频编解码等内容;
from concurrent.futures import ProcessPoolExecutor
f = open('1.txt', 'a+')
def write(f, line):
f.writeline(line)
with ProcessPoolExecutor() as executor:
future = executor.submit(write, f, 'abc')
print(f'RESULT: {future.result()}')
其中ProcessPoolExecutor可以避免开GIL的问题,但是由于需要传递参数给工作进程,所以正常情况下只有可序列化的对象可以执行并返回,看一个会出错的例子:
ThreadPoolExecutor对象
首先需要定义一个池的执行器对象,Executor类子类对象。
方法 | 含义 |
---|---|
ThreadPoolExecutor(max_workers=1) | 池中至多创建max_workers个线程的池来同步异步执行,返回 |
submit(fn, *args, **kwargs) | 提交执行的函数及其参数,返回Future类的实例 |
shutdown(wait=True) | 清理池 |
Future类
方法 | 含义 |
---|---|
done() | 如果调用被成功的取消或者执行完成,返回True |
cancelled() | 如果调用被成功的取消,返回True |
running() | 如果正在运行且不能被取消,返回True |
cancel() | 尝试取消调用。如果已经执行且不能取消返回False,否则返回True |
result(timeout=None) | 取返回的结果,timeout为None,一直等待返回;timeout设置到期,抛出concurrent.futures.TimeoutError 异常 |
exception(timeout=None) | 取返回的异常,timeout为None,一直等待返回;timeout设置到期,抛出concurrent.futures.TimeoutError 异常 |
import threading
from concurrent import futures
import logging
import time
# 输出格式定义
FORMAT = '%(asctime)-15s\t [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
def worker(n):
logging.info('begin to work{}'.format(n))
time.sleep(5)
logging.info('finished{}'.format(n))
# 创建线程池执行器,池的容量为3
executor = futures.ThreadPoolExecutor(max_workers=3)
fs = [] # 任务集合管理
for i in range(3):
future = executor.submit(worker, i)
fs.append(future)
# for i in range(3, 6):
# future = executor.submit(worker, i)
# fs.append(future)
while True:
time.sleep(2)
logging.info(threading.enumerate())
flag = True
for f in fs: #
logging.info(f.done())
flag = flag and f.done()
if flag:
executor.shutdown() # 清理池,池中线程全部杀掉
logging.info(threading.enumerate())
break
# 线程池一旦创建了线程,就不需要频繁清除
#------------------------------------------------------------------------------------
2020-01-15 16:18:26,134 [MainProcess:ThreadPoolExecutor-0_0, 24400: 29700] begin to work0
2020-01-15 16:18:26,134 [MainProcess:ThreadPoolExecutor-0_1, 24400: 25208] begin to work1
2020-01-15 16:18:26,135 [MainProcess:ThreadPoolExecutor-0_2, 24400: 3380] begin to work2
2020-01-15 16:18:28,136 [MainProcess:MainThread, 24400: 26468] [<_MainThread(MainThread, started 26468)>, <Thread(ThreadPoolExecutor-0_0, started daemon 29700)>, <Thread(ThreadPoolExecutor-0_1, started daemon 25208)>, <Thread(ThreadPoolExecutor-0_2, started daemon 3380)>]
2020-01-15 16:18:28,136 [MainProcess:MainThread, 24400: 26468] False
2020-01-15 16:18:28,136 [MainProcess:MainThread, 24400: 26468] False
2020-01-15 16:18:28,136 [MainProcess:MainThread, 24400: 26468] False
2020-01-15 16:18:30,136 [MainProcess:MainThread, 24400: 26468] [<_MainThread(MainThread, started 26468)>, <Thread(ThreadPoolExecutor-0_0, started daemon 29700)>, <Thread(ThreadPoolExecutor-0_1, started daemon 25208)>, <Thread(ThreadPoolExecutor-0_2, started daemon 3380)>]
2020-01-15 16:18:30,136 [MainProcess:MainThread, 24400: 26468] False
2020-01-15 16:18:30,136 [MainProcess:MainThread, 24400: 26468] False
2020-01-15 16:18:30,136 [MainProcess:MainThread, 24400: 26468] False
2020-01-15 16:18:31,136 [MainProcess:ThreadPoolExecutor-0_0, 24400: 29700] finished0
2020-01-15 16:18:31,136 [MainProcess:ThreadPoolExecutor-0_2, 24400: 3380] finished2
2020-01-15 16:18:31,136 [MainProcess:ThreadPoolExecutor-0_1, 24400: 25208] finished1
2020-01-15 16:18:32,137 [MainProcess:MainThread, 24400: 26468] [<_MainThread(MainThread, started 26468)>, <Thread(ThreadPoolExecutor-0_0, started daemon 29700)>, <Thread(ThreadPoolExecutor-0_1, started daemon 25208)>, <Thread(ThreadPoolExecutor-0_2, started daemon 3380)>]
2020-01-15 16:18:32,137 [MainProcess:MainThread, 24400: 26468] True
2020-01-15 16:18:32,137 [MainProcess:MainThread, 24400: 26468] True
2020-01-15 16:18:32,137 [MainProcess:MainThread, 24400: 26468] True
2020-01-15 16:18:32,137 [MainProcess:MainThread, 24400: 26468] [<_MainThread(MainThread, started 26468)>]
ProcessPoolExecutor对象
方法一样。就是使用多进程完成。
import threading
from concurrent import futures
import logging
import time,random
# 输出格式定义
FORMAT = '%(asctime)-15s\t [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
def worker(n):
logging.info('begin to work{}'.format(n))
time.sleep(5)
logging.info('finished{}'.format(n))
return random.randint(1,10)
if __name__ == '__main__':
# 创建进程池,池的容量为3
executor = futures.ThreadPoolExecutor(max_workers=3)
fs = []
for i in range(3):
future = executor.submit(worker, i)
fs.append(future)
# for i in range(3, 6):
# future = executor.submit(worker, i)
# fs.append(future)
while True:
time.sleep(2)
logging.info(threading.enumerate())
flag = True
for f in fs: #
logging.info('{} result={}'.format(f,f.result()))
flag = flag and f.done()
if flag:
executor.shutdown() # 清理池,池中线程全部杀掉
logging.info(threading.enumerate())
break
# 线程池一旦创建了线程,就不需要频繁清除
#---------------------------------------------------------------------
2020-01-15 16:34:02,539 [MainProcess:ThreadPoolExecutor-0_0, 27048: 24796] begin to work0
2020-01-15 16:34:02,539 [MainProcess:ThreadPoolExecutor-0_1, 27048: 30168] begin to work1
2020-01-15 16:34:02,539 [MainProcess:ThreadPoolExecutor-0_2, 27048: 29756] begin to work2
2020-01-15 16:34:04,540 [MainProcess:MainThread, 27048: 31676] [<_MainThread(MainThread, started 31676)>, <Thread(ThreadPoolExecutor-0_0, started daemon 24796)>, <Thread(ThreadPoolExecutor-0_1, started daemon 30168)>, <Thread(ThreadPoolExecutor-0_2, started daemon 29756)>]
2020-01-15 16:34:07,539 [MainProcess:ThreadPoolExecutor-0_1, 27048: 30168] finished1
2020-01-15 16:34:07,539 [MainProcess:ThreadPoolExecutor-0_0, 27048: 24796] finished0
2020-01-15 16:34:07,539 [MainProcess:MainThread, 27048: 31676] <Future at 0x22f6151b438 state=finished returned int> result=10
2020-01-15 16:34:07,539 [MainProcess:MainThread, 27048: 31676] <Future at 0x22f615370f0 state=finished returned int> result=3
2020-01-15 16:34:07,539 [MainProcess:ThreadPoolExecutor-0_2, 27048: 29756] finished2
2020-01-15 16:34:07,539 [MainProcess:MainThread, 27048: 31676] <Future at 0x22f6152acc0 state=finished returned int> result=9
2020-01-15 16:34:07,539 [MainProcess:MainThread, 27048: 31676] [<_MainThread(MainThread, started 31676)>]
支持上下文管理
concurrent.futures.ProcessPoolExecutor继承自concurrent.futures.base.Executor,而父类有_enter 、
_exit方法,支持上下文管理。可以使用with语句。
_exit方法本质还是调用的shutdown(wait=True),就是一直阻塞到所有运行的任务完成
使用方法
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result())
import threading
from concurrent import futures
import logging
import time,random
# 输出格式定义
FORMAT = '%(asctime)-15s\t [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
def worker(n):
logging.info('begin to work{}'.format(n))
time.sleep(5)
logging.info('finished{}'.format(n))
return random.randint(1,10)
if __name__ == '__main__':
# 创建进程池,池的容量为3
executor = futures.ThreadPoolExecutor(max_workers=3)
with executor:
fs = []
for i in range(3):
future = executor.submit(worker, i)
fs.append(future)
# for i in range(3, 6):
# future = executor.submit(worker, i)
# fs.append(future)
while True:
time.sleep(2)
logging.info(threading.enumerate())
flag = True
for f in fs: #
logging.info('{} result={}'.format(f,f.result()))
flag = flag and f.done()
if flag:
executor.shutdown() # 清理池,池中线程全部杀掉
logging.info(threading.enumerate())
break
# 线程池一旦创建了线程,就不需要频繁清除
#----------------------------------------------------------------------
2020-01-15 16:40:22,852 [MainProcess:ThreadPoolExecutor-0_0, 24168: 34932] begin to work0
2020-01-15 16:40:22,852 [MainProcess:ThreadPoolExecutor-0_1, 24168: 10820] begin to work1
2020-01-15 16:40:22,852 [MainProcess:ThreadPoolExecutor-0_2, 24168: 264] begin to work2
2020-01-15 16:40:24,853 [MainProcess:MainThread, 24168: 24748] [<_MainThread(MainThread, started 24748)>, <Thread(ThreadPoolExecutor-0_0, started daemon 34932)>, <Thread(ThreadPoolExecutor-0_1, started daemon 10820)>, <Thread(ThreadPoolExecutor-0_2, started daemon 264)>]
2020-01-15 16:40:27,853 [MainProcess:ThreadPoolExecutor-0_0, 24168: 34932] finished0
2020-01-15 16:40:27,853 [MainProcess:MainThread, 24168: 24748] <Future at 0x19beab5b438 state=finished returned int> result=7
2020-01-15 16:40:27,854 [MainProcess:ThreadPoolExecutor-0_2, 24168: 264] finished2
2020-01-15 16:40:27,854 [MainProcess:ThreadPoolExecutor-0_1, 24168: 10820] finished1
2020-01-15 16:40:27,854 [MainProcess:MainThread, 24168: 24748] <Future at 0x19beab6acc0 state=finished returned int> result=9
2020-01-15 16:40:27,854 [MainProcess:MainThread, 24168: 24748] <Future at 0x19beab791d0 state=finished returned int> result=6
2020-01-15 16:40:27,854 [MainProcess:MainThread, 24168: 24748] [<_MainThread(MainThread, started 24748)>]
总结
该库统一了线程池、进程池调用,简化了编程。
是Python简单的思想哲学的体现。
唯一的缺点:无法设置线程名称。但这都不值一提;
网友评论