美文网首页
39.2-concurrent.futures使用

39.2-concurrent.futures使用

作者: BeautifulSoulpy | 来源:发表于2020-02-03 08:04 被阅读0次

人生只有三天,迷惑的人活在昨天,奢望的人活在明天,只有清澈的人活在今天。昨天已经过去,是过了期的支票,明天还没有来到,是不可提取的支票,只有活在今天是最现实的!

总结:

  1. 并发的解决方案:同步阻塞,异步非阻塞的方式(效率高)差异;
  2. 对于是IO密集型、还是CPU密集型;

1. concurrent包

concurrent.futures

Future对象是futures库实现异步任务的核心,该对象由线程池/进程池对象进行初始化,并返回给客户端,告诉客户端这是一个正在执行的任务,你可以现在去等待任务完成,然后继续别的工作,也可以先做别的,之后想获取结果时再从中获取。

3.2版本引入的模块。
异步并行任务编程模块,提供一个高级的异步可执行的便利接口

提供了2个池执行器
ThreadPoolExecutor 异步调用的线程池的Executor
ProcessPoolExecutor 异步调用的进程池的Executor

选择它们的经验法则如下:

  1. 执行重I / O操作的任务(IO密集型)选择ThreadPoolExecutor,例如请求网页数据,文件读写等涉及网络,磁盘I / O相关的内容;
  2. 执行重CPU的任务(CPU密集型)选择ProcessPoolExecutor,例如大量消耗CPU的数学与逻辑运算,视频编解码等内容;
from concurrent.futures import ProcessPoolExecutor

f = open('1.txt', 'a+')

def write(f, line):
    f.writeline(line)

with ProcessPoolExecutor() as executor:
    future = executor.submit(write, f, 'abc')   
    print(f'RESULT: {future.result()}')

其中ProcessPoolExecutor可以避免开GIL的问题,但是由于需要传递参数给工作进程,所以正常情况下只有可序列化的对象可以执行并返回,看一个会出错的例子:

ThreadPoolExecutor对象

首先需要定义一个池的执行器对象,Executor类子类对象。

方法 含义
ThreadPoolExecutor(max_workers=1) 池中至多创建max_workers个线程的池来同步异步执行,返回
submit(fn, *args, **kwargs) 提交执行的函数及其参数,返回Future类的实例
shutdown(wait=True) 清理池
Future类
方法 含义
done() 如果调用被成功的取消或者执行完成,返回True
cancelled() 如果调用被成功的取消,返回True
running() 如果正在运行且不能被取消,返回True
cancel() 尝试取消调用。如果已经执行且不能取消返回False,否则返回True
result(timeout=None) 取返回的结果,timeout为None,一直等待返回;timeout设置到期,抛出concurrent.futures.TimeoutError 异常
exception(timeout=None) 取返回的异常,timeout为None,一直等待返回;timeout设置到期,抛出concurrent.futures.TimeoutError 异常

import threading
from concurrent import futures
import logging
import time

# 输出格式定义
FORMAT = '%(asctime)-15s\t [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

def worker(n):
    logging.info('begin to work{}'.format(n))
    time.sleep(5)
    logging.info('finished{}'.format(n))

# 创建线程池执行器,池的容量为3
executor = futures.ThreadPoolExecutor(max_workers=3)

fs = []   # 任务集合管理
for i in range(3):
    future = executor.submit(worker, i)
    fs.append(future)

# for i in range(3, 6):
#     future = executor.submit(worker, i)
#     fs.append(future)

while True:
    time.sleep(2)
    logging.info(threading.enumerate())

    flag = True
    for f in fs: #
        logging.info(f.done())
        flag = flag and f.done()

    if flag:
        executor.shutdown() # 清理池,池中线程全部杀掉
        logging.info(threading.enumerate())
        break
# 线程池一旦创建了线程,就不需要频繁清除
#------------------------------------------------------------------------------------
2020-01-15 16:18:26,134  [MainProcess:ThreadPoolExecutor-0_0, 24400:   29700] begin to work0
2020-01-15 16:18:26,134  [MainProcess:ThreadPoolExecutor-0_1, 24400:   25208] begin to work1
2020-01-15 16:18:26,135  [MainProcess:ThreadPoolExecutor-0_2, 24400:    3380] begin to work2
2020-01-15 16:18:28,136  [MainProcess:MainThread, 24400:   26468] [<_MainThread(MainThread, started 26468)>, <Thread(ThreadPoolExecutor-0_0, started daemon 29700)>, <Thread(ThreadPoolExecutor-0_1, started daemon 25208)>, <Thread(ThreadPoolExecutor-0_2, started daemon 3380)>]
2020-01-15 16:18:28,136  [MainProcess:MainThread, 24400:   26468] False
2020-01-15 16:18:28,136  [MainProcess:MainThread, 24400:   26468] False
2020-01-15 16:18:28,136  [MainProcess:MainThread, 24400:   26468] False
2020-01-15 16:18:30,136  [MainProcess:MainThread, 24400:   26468] [<_MainThread(MainThread, started 26468)>, <Thread(ThreadPoolExecutor-0_0, started daemon 29700)>, <Thread(ThreadPoolExecutor-0_1, started daemon 25208)>, <Thread(ThreadPoolExecutor-0_2, started daemon 3380)>]
2020-01-15 16:18:30,136  [MainProcess:MainThread, 24400:   26468] False
2020-01-15 16:18:30,136  [MainProcess:MainThread, 24400:   26468] False
2020-01-15 16:18:30,136  [MainProcess:MainThread, 24400:   26468] False
2020-01-15 16:18:31,136  [MainProcess:ThreadPoolExecutor-0_0, 24400:   29700] finished0
2020-01-15 16:18:31,136  [MainProcess:ThreadPoolExecutor-0_2, 24400:    3380] finished2
2020-01-15 16:18:31,136  [MainProcess:ThreadPoolExecutor-0_1, 24400:   25208] finished1
2020-01-15 16:18:32,137  [MainProcess:MainThread, 24400:   26468] [<_MainThread(MainThread, started 26468)>, <Thread(ThreadPoolExecutor-0_0, started daemon 29700)>, <Thread(ThreadPoolExecutor-0_1, started daemon 25208)>, <Thread(ThreadPoolExecutor-0_2, started daemon 3380)>]
2020-01-15 16:18:32,137  [MainProcess:MainThread, 24400:   26468] True
2020-01-15 16:18:32,137  [MainProcess:MainThread, 24400:   26468] True
2020-01-15 16:18:32,137  [MainProcess:MainThread, 24400:   26468] True
2020-01-15 16:18:32,137  [MainProcess:MainThread, 24400:   26468] [<_MainThread(MainThread, started 26468)>]

ProcessPoolExecutor对象

方法一样。就是使用多进程完成。


import threading
from concurrent import futures
import logging
import time,random

# 输出格式定义
FORMAT = '%(asctime)-15s\t [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)


def worker(n):
    logging.info('begin to work{}'.format(n))
    time.sleep(5)
    logging.info('finished{}'.format(n))
    return random.randint(1,10)

if __name__ == '__main__':
    # 创建进程池,池的容量为3
    executor = futures.ThreadPoolExecutor(max_workers=3)

    fs = []
    for i in range(3):
        future = executor.submit(worker, i)
        fs.append(future)

    # for i in range(3, 6):
    #     future = executor.submit(worker, i)
    #     fs.append(future)

    while True:
        time.sleep(2)
        logging.info(threading.enumerate())

        flag = True
        for f in fs: #
            logging.info('{} result={}'.format(f,f.result()))
            flag = flag and f.done()

        if flag:
            executor.shutdown() # 清理池,池中线程全部杀掉
            logging.info(threading.enumerate())
            break
    # 线程池一旦创建了线程,就不需要频繁清除
#---------------------------------------------------------------------
2020-01-15 16:34:02,539  [MainProcess:ThreadPoolExecutor-0_0, 27048:   24796] begin to work0
2020-01-15 16:34:02,539  [MainProcess:ThreadPoolExecutor-0_1, 27048:   30168] begin to work1
2020-01-15 16:34:02,539  [MainProcess:ThreadPoolExecutor-0_2, 27048:   29756] begin to work2
2020-01-15 16:34:04,540  [MainProcess:MainThread, 27048:   31676] [<_MainThread(MainThread, started 31676)>, <Thread(ThreadPoolExecutor-0_0, started daemon 24796)>, <Thread(ThreadPoolExecutor-0_1, started daemon 30168)>, <Thread(ThreadPoolExecutor-0_2, started daemon 29756)>]
2020-01-15 16:34:07,539  [MainProcess:ThreadPoolExecutor-0_1, 27048:   30168] finished1
2020-01-15 16:34:07,539  [MainProcess:ThreadPoolExecutor-0_0, 27048:   24796] finished0
2020-01-15 16:34:07,539  [MainProcess:MainThread, 27048:   31676] <Future at 0x22f6151b438 state=finished returned int> result=10
2020-01-15 16:34:07,539  [MainProcess:MainThread, 27048:   31676] <Future at 0x22f615370f0 state=finished returned int> result=3
2020-01-15 16:34:07,539  [MainProcess:ThreadPoolExecutor-0_2, 27048:   29756] finished2
2020-01-15 16:34:07,539  [MainProcess:MainThread, 27048:   31676] <Future at 0x22f6152acc0 state=finished returned int> result=9
2020-01-15 16:34:07,539  [MainProcess:MainThread, 27048:   31676] [<_MainThread(MainThread, started 31676)>]

支持上下文管理

concurrent.futures.ProcessPoolExecutor继承自concurrent.futures.base.Executor,而父类有_enter

_exit方法,支持上下文管理。可以使用with语句。
_exit
方法本质还是调用的shutdown(wait=True),就是一直阻塞到所有运行的任务完成

使用方法

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
import threading
from concurrent import futures
import logging
import time,random

# 输出格式定义
FORMAT = '%(asctime)-15s\t [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)


def worker(n):
    logging.info('begin to work{}'.format(n))
    time.sleep(5)
    logging.info('finished{}'.format(n))
    return random.randint(1,10)

if __name__ == '__main__':
    # 创建进程池,池的容量为3
    executor = futures.ThreadPoolExecutor(max_workers=3)

    with executor:
        fs = []
        for i in range(3):
            future = executor.submit(worker, i)
            fs.append(future)

        # for i in range(3, 6):
        #     future = executor.submit(worker, i)
        #     fs.append(future)

        while True:
            time.sleep(2)
            logging.info(threading.enumerate())

            flag = True
            for f in fs: #
                logging.info('{} result={}'.format(f,f.result()))
                flag = flag and f.done()

            if flag:
                executor.shutdown() # 清理池,池中线程全部杀掉
                logging.info(threading.enumerate())
                break
        # 线程池一旦创建了线程,就不需要频繁清除
#----------------------------------------------------------------------
2020-01-15 16:40:22,852  [MainProcess:ThreadPoolExecutor-0_0, 24168:   34932] begin to work0
2020-01-15 16:40:22,852  [MainProcess:ThreadPoolExecutor-0_1, 24168:   10820] begin to work1
2020-01-15 16:40:22,852  [MainProcess:ThreadPoolExecutor-0_2, 24168:     264] begin to work2
2020-01-15 16:40:24,853  [MainProcess:MainThread, 24168:   24748] [<_MainThread(MainThread, started 24748)>, <Thread(ThreadPoolExecutor-0_0, started daemon 34932)>, <Thread(ThreadPoolExecutor-0_1, started daemon 10820)>, <Thread(ThreadPoolExecutor-0_2, started daemon 264)>]
2020-01-15 16:40:27,853  [MainProcess:ThreadPoolExecutor-0_0, 24168:   34932] finished0
2020-01-15 16:40:27,853  [MainProcess:MainThread, 24168:   24748] <Future at 0x19beab5b438 state=finished returned int> result=7
2020-01-15 16:40:27,854  [MainProcess:ThreadPoolExecutor-0_2, 24168:     264] finished2
2020-01-15 16:40:27,854  [MainProcess:ThreadPoolExecutor-0_1, 24168:   10820] finished1
2020-01-15 16:40:27,854  [MainProcess:MainThread, 24168:   24748] <Future at 0x19beab6acc0 state=finished returned int> result=9
2020-01-15 16:40:27,854  [MainProcess:MainThread, 24168:   24748] <Future at 0x19beab791d0 state=finished returned int> result=6
2020-01-15 16:40:27,854  [MainProcess:MainThread, 24168:   24748] [<_MainThread(MainThread, started 24748)>]

总结

该库统一了线程池、进程池调用,简化了编程。
是Python简单的思想哲学的体现。

唯一的缺点:无法设置线程名称。但这都不值一提;

相关文章

网友评论

      本文标题:39.2-concurrent.futures使用

      本文链接:https://www.haomeiwen.com/subject/ilptzctx.html