美文网首页
ThreadPoolExecutor 线程池的使用

ThreadPoolExecutor 线程池的使用

作者: 越大大雨天 | 来源:发表于2019-06-23 22:05 被阅读0次

    写在前面:GIL锁

    关于GIL锁:多线程在Python中并不一定是鸡肋
    CPython解释器存在GIL锁,一次只允许使用一个线程执行Python代码,但这并不是说多线程就是鸡肋的。
    因为,标准库中所有执行耗时任务阻塞型I/O操作的函数,在等待操作系统返回结果时都会释放GIL,这意味着I/O密集型Python程序使用多线程是能提高运行效率的,比如:Python线程在等待网络响应时,阻塞型I/O函数会释放GIL,在运行新的线程。

    future模块下的ThreadPoolExecutor

    参考文章:http://c.biancheng.net/view/2627.html

    Exectuor 提供了如下常用方法:

    • submit(fn, *args, kwargs):将 fn 函数提交给线程池。args 代表传给 fn 函数的参数,kwargs 代表以关键字参数的形式为 fn 函数传入参数。
    • map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
    • shutdown(wait=True):关闭线程池。

    使用步骤

    使用线程池来执行线程任务的步骤如下:

    1. 调用 ThreadPoolExecutor 类的构造器创建一个线程池。
    2. 定义一个普通函数作为线程任务。
    3. 调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。
    4. 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。

    简单线程池示例

    from concurrent.futures import ThreadPoolExecutor
    import time
    # 定义一个准备作为线程任务的函数
    def action(max):
        my_sum = 0
        for i in range(max):
            print(i)
        return my_sum
    # 创建一个包含2条线程的线程池
    pool = ThreadPoolExecutor(max_workers=2)
    # 向线程池提交一个task, 50会作为action()函数的参数
    future1 = pool.submit(action, 50)
    # 向线程池再提交一个task, 100会作为action()函数的参数
    future2 = pool.submit(action, 100)
    # 判断future1代表的任务是否结束
    print(future1.done())
    time.sleep(3)
    # 判断future2代表的任务是否结束
    print(future2.done())
    # 查看future1代表的任务返回的结果
    print(future1.result())
    # 查看future2代表的任务返回的结果
    print(future2.result())
    # 关闭线程池
    pool.shutdown()
    

    只需使用submit将函数及参数提交给线程池,线程池会自动开启线程来执行函数,并自动管理线程,方便又高效。

    结果返回

    当程序使用 Future 的 result() 方法来获取异步任务的结果时,该方法会阻塞当前线程,如果没有指定 timeout 参数,当前线程将一直处于阻塞状态,直到 Future 代表的任务返回。

    若不想阻塞的方式获取结果,可通过 Future 的add_done_callback()方法来添加回调函数,。当线程任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象作为参数传给该回调函数。

    • 使用非租塞式add_done_callback() 返回结果
    • 使用with上下文管理器管理线程池,可避免手动关闭线程池
    from concurrent.futures import ThreadPoolExecutor
    import time
    # 定义一个准备作为线程任务的函数
    def action(max):
        my_sum = 0
        for i in range(max):
            print(i)
        return my_sum
    # 创建一个包含2条线程的线程池
    with ThreadPoolExecutor(max_workers=2) as pool:
        # 向线程池提交一个task, 50会作为action()函数的参数
        future1 = pool.submit(action, 50)
        # 向线程池再提交一个task, 100会作为action()函数的参数
        future2 = pool.submit(action, 100)
        def get_result(future):
            print(future.result())
        # 为future1添加线程完成的回调函数
        future1.add_done_callback(get_result)
        # 为future2添加线程完成的回调函数
        future2.add_done_callback(get_result)
        print('--------------')
    

    优雅的管理多线程:线程池

    Exectuor 还提供了一个 map(func, *iterables, timeout=None, chunksize=1)方法,该方法的功能类似于全局函数 map(),区别在于线程池的 map() 方法会为 iterables 的每个元素启动一个线程,以并发方式来执行 func 函数。这种方式相当于启动 len(iterables) 个线程,井收集每个线程的执行结果。

    from concurrent.futures import ThreadPoolExecutor
    import time
    # 定义一个准备作为线程任务的函数
    def action(max):
        my_sum = 0
        for i in range(max):
            my_sum += i
        return my_sum
    # 创建一个包含4条线程的线程池
    with ThreadPoolExecutor(max_workers=4) as pool:
        # 使用线程执行map计算
        # 后面元组有3个元素,因此程序启动3条线程来执行action函数
        results = pool.map(action, (50, 100, 150))
        print('--------------')
        for r in results:
            print(r)
    

    上面程序使用 map() 方法来启动 3 个线程(该程序的线程池包含 4 个线程,如果继续使用只包含两个线程的线程池,此时将有一个任务处于等待状态,必须等其中一个任务完成,线程空闲出来才会获得执行的机会),map() 方法的返回值将会收集每个线程任务的返回结果。

    运行上面程序,同样可以看到 3 个线程并发执行的结果,最后通过 results 可以看到 3 个线程任务的返回结果。

    通过上面程序可以看出,使用 map() 方法来启动线程,并收集线程的执行结果,不仅具有代码简单的优点,而且虽然程序会以并发方式来执行 action() 函数,但最后收集的 action() 函数的执行结果,依然与传入参数的结果保持一致,即函数返回结果顺序与调用顺序一致。

    map方法还有一个特性:如果第一个调用生成器结果用时10秒,而其他调用只用一秒,代码会阻塞10秒,用以获取生成器的第一个结果。在此之后,获取后续结果时不会阻塞,因为后续的调用已经结束。

    使用as_completed()绝不阻塞

    如果不想要任何阻塞,也不需要调用顺序,只管获取到最终结果即可时,我们可以使用Excutor.submit 和 future.as_completed函数结合使用。

    from concurrent.futures import ThreadPoolExecutor
    import time
    # 定义一个准备作为线程任务的函数
    def action(max):
        my_sum = 0
        for i in range(max):
            my_sum += i
        return my_sum
    
    cc_list = [50, 100, 150]
    with ThreadPoolExecutor(max_workers=4) as pool:
        to_do_map = []
        for cc in cc_list:
            # 创建一个Future实例
            future = pool.submit(action, cc)
            to_do_map.append(future)
        result = []
        done_iter = future.as_complete(to_do_map)
        for future in done_iter:
            # 这里获取future.result()的方式绝不会阻塞
            res = future.result()
            result.append(res)
        print(result)
    

    将原有的map方式改为使用两个for循环调用submit和as_completed方式,使用future.result()绝不会阻塞。

    相关文章

      网友评论

          本文标题:ThreadPoolExecutor 线程池的使用

          本文链接:https://www.haomeiwen.com/subject/aznwqctx.html