美文网首页
ThreadPoolExecutor 线程池的使用

ThreadPoolExecutor 线程池的使用

作者: 越大大雨天 | 来源:发表于2019-06-23 22:05 被阅读0次

写在前面:GIL锁

关于GIL锁:多线程在Python中并不一定是鸡肋
CPython解释器存在GIL锁,一次只允许使用一个线程执行Python代码,但这并不是说多线程就是鸡肋的。
因为,标准库中所有执行耗时任务阻塞型I/O操作的函数,在等待操作系统返回结果时都会释放GIL,这意味着I/O密集型Python程序使用多线程是能提高运行效率的,比如:Python线程在等待网络响应时,阻塞型I/O函数会释放GIL,在运行新的线程。

future模块下的ThreadPoolExecutor

参考文章:http://c.biancheng.net/view/2627.html

Exectuor 提供了如下常用方法:

  • submit(fn, *args, kwargs):将 fn 函数提交给线程池。args 代表传给 fn 函数的参数,kwargs 代表以关键字参数的形式为 fn 函数传入参数。
  • map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
  • shutdown(wait=True):关闭线程池。

使用步骤

使用线程池来执行线程任务的步骤如下:

  1. 调用 ThreadPoolExecutor 类的构造器创建一个线程池。
  2. 定义一个普通函数作为线程任务。
  3. 调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。
  4. 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。

简单线程池示例

from concurrent.futures import ThreadPoolExecutor
import time
# 定义一个准备作为线程任务的函数
def action(max):
    my_sum = 0
    for i in range(max):
        print(i)
    return my_sum
# 创建一个包含2条线程的线程池
pool = ThreadPoolExecutor(max_workers=2)
# 向线程池提交一个task, 50会作为action()函数的参数
future1 = pool.submit(action, 50)
# 向线程池再提交一个task, 100会作为action()函数的参数
future2 = pool.submit(action, 100)
# 判断future1代表的任务是否结束
print(future1.done())
time.sleep(3)
# 判断future2代表的任务是否结束
print(future2.done())
# 查看future1代表的任务返回的结果
print(future1.result())
# 查看future2代表的任务返回的结果
print(future2.result())
# 关闭线程池
pool.shutdown()

只需使用submit将函数及参数提交给线程池,线程池会自动开启线程来执行函数,并自动管理线程,方便又高效。

结果返回

当程序使用 Future 的 result() 方法来获取异步任务的结果时,该方法会阻塞当前线程,如果没有指定 timeout 参数,当前线程将一直处于阻塞状态,直到 Future 代表的任务返回。

若不想阻塞的方式获取结果,可通过 Future 的add_done_callback()方法来添加回调函数,。当线程任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象作为参数传给该回调函数。

  • 使用非租塞式add_done_callback() 返回结果
  • 使用with上下文管理器管理线程池,可避免手动关闭线程池
from concurrent.futures import ThreadPoolExecutor
import time
# 定义一个准备作为线程任务的函数
def action(max):
    my_sum = 0
    for i in range(max):
        print(i)
    return my_sum
# 创建一个包含2条线程的线程池
with ThreadPoolExecutor(max_workers=2) as pool:
    # 向线程池提交一个task, 50会作为action()函数的参数
    future1 = pool.submit(action, 50)
    # 向线程池再提交一个task, 100会作为action()函数的参数
    future2 = pool.submit(action, 100)
    def get_result(future):
        print(future.result())
    # 为future1添加线程完成的回调函数
    future1.add_done_callback(get_result)
    # 为future2添加线程完成的回调函数
    future2.add_done_callback(get_result)
    print('--------------')

优雅的管理多线程:线程池

Exectuor 还提供了一个 map(func, *iterables, timeout=None, chunksize=1)方法,该方法的功能类似于全局函数 map(),区别在于线程池的 map() 方法会为 iterables 的每个元素启动一个线程,以并发方式来执行 func 函数。这种方式相当于启动 len(iterables) 个线程,井收集每个线程的执行结果。

from concurrent.futures import ThreadPoolExecutor
import time
# 定义一个准备作为线程任务的函数
def action(max):
    my_sum = 0
    for i in range(max):
        my_sum += i
    return my_sum
# 创建一个包含4条线程的线程池
with ThreadPoolExecutor(max_workers=4) as pool:
    # 使用线程执行map计算
    # 后面元组有3个元素,因此程序启动3条线程来执行action函数
    results = pool.map(action, (50, 100, 150))
    print('--------------')
    for r in results:
        print(r)

上面程序使用 map() 方法来启动 3 个线程(该程序的线程池包含 4 个线程,如果继续使用只包含两个线程的线程池,此时将有一个任务处于等待状态,必须等其中一个任务完成,线程空闲出来才会获得执行的机会),map() 方法的返回值将会收集每个线程任务的返回结果。

运行上面程序,同样可以看到 3 个线程并发执行的结果,最后通过 results 可以看到 3 个线程任务的返回结果。

通过上面程序可以看出,使用 map() 方法来启动线程,并收集线程的执行结果,不仅具有代码简单的优点,而且虽然程序会以并发方式来执行 action() 函数,但最后收集的 action() 函数的执行结果,依然与传入参数的结果保持一致,即函数返回结果顺序与调用顺序一致。

map方法还有一个特性:如果第一个调用生成器结果用时10秒,而其他调用只用一秒,代码会阻塞10秒,用以获取生成器的第一个结果。在此之后,获取后续结果时不会阻塞,因为后续的调用已经结束。

使用as_completed()绝不阻塞

如果不想要任何阻塞,也不需要调用顺序,只管获取到最终结果即可时,我们可以使用Excutor.submit 和 future.as_completed函数结合使用。

from concurrent.futures import ThreadPoolExecutor
import time
# 定义一个准备作为线程任务的函数
def action(max):
    my_sum = 0
    for i in range(max):
        my_sum += i
    return my_sum

cc_list = [50, 100, 150]
with ThreadPoolExecutor(max_workers=4) as pool:
    to_do_map = []
    for cc in cc_list:
        # 创建一个Future实例
        future = pool.submit(action, cc)
        to_do_map.append(future)
    result = []
    done_iter = future.as_complete(to_do_map)
    for future in done_iter:
        # 这里获取future.result()的方式绝不会阻塞
        res = future.result()
        result.append(res)
    print(result)

将原有的map方式改为使用两个for循环调用submit和as_completed方式,使用future.result()绝不会阻塞。

相关文章

网友评论

      本文标题:ThreadPoolExecutor 线程池的使用

      本文链接:https://www.haomeiwen.com/subject/aznwqctx.html