美文网首页
Python ThreadPoolExecutor 异常中止解决

Python ThreadPoolExecutor 异常中止解决

作者: Simple丶Plan | 来源:发表于2022-06-10 16:59 被阅读0次

    1. 原始方案

    from concurrent.futures import ThreadPoolExecutor
    import time
    import threading
    import random
    
    
    def worker(n):
        threading_name = threading.current_thread().name
        print(f'[{threading_name}] {n} start')
        time.sleep(random.randint(1, 6))  # 随机休眠1-6s
        print(f'[{threading_name}] {n} end')
    
    def loop_worker():
        threadPool = ThreadPoolExecutor(max_workers=3)
        for q in range(20):
            threadPool.submit(worker, q)
        threadPool.shutdown(wait=True)
    
    if __name__ in '__main__':
        loop_worker()
    
    [ThreadPoolExecutor-1_0] 0 start
    [ThreadPoolExecutor-1_1] 1 start
    [ThreadPoolExecutor-1_2] 2 start
    [ThreadPoolExecutor-1_0] 0 end
    [ThreadPoolExecutor-1_0] 3 start
    [ThreadPoolExecutor-1_1] 1 end
    [ThreadPoolExecutor-1_0] 3 end
    [ThreadPoolExecutor-1_0] 4 start
    [ThreadPoolExecutor-1_1] 5 start
    [ThreadPoolExecutor-1_2] 2 end
    [ThreadPoolExecutor-1_2] 6 start
    [ThreadPoolExecutor-1_0] 4 end
    [ThreadPoolExecutor-1_0] 7 start
    [ThreadPoolExecutor-1_1] 5 end
    [ThreadPoolExecutor-1_2] 6 end
    [ThreadPoolExecutor-1_0] 7 end
    ......
    

    通常情况,我们利用 Ctrl+C 让程序触发 KeyboardInterrupt 异常,中止程序运行。线程池方案下,Ctrl-C 失效,当线程池里的线程任务跑完后,才会触发 KeyboardInterrupt

    1.1 上下文管理器协议格式

    def loop_worker():
        with ThreadPoolExecutor(max_workers=3) as executor:
            for q in range(20):
                executor.submit(worker, q)
    

    上下文管理协议相当于隐性地省略了 threadPool.shutdown(wait=True),同时,程序正常执行完成或出现异常中断的时候,就会调用 __exit__() 方法,接下来进行异常中止的基础。

    2. 以全局变量或事务为标识进行判断

    适用于 Django 等 WEB 应用框架,本身自带多线程,修改全局变量简单,但要注意线程安全。

    from concurrent.futures import ThreadPoolExecutor
    import time
    import threading
    import random
    
    sign = 0
    exiting = threading.Event()
    
    def worker(n):
        threading_name = threading.current_thread().name
        if sign == 1 or exiting.is_set():
            # 满足则直接跳过主程序
            print(f'[{threading_name}] {n} skip')
            return 
        print(f'[{threading_name}] {n} start')
        time.sleep(random.randint(1, 6))  # 随机休眠1-6s
        print(f'[{threading_name}] {n} end')
    
    def loop_worker():
        with ThreadPoolExecutor(max_workers=3) as executor:
            for q in range(20):
                executor.submit(worker, q)
    
    if __name__ in '__main__':
        loop_worker()
    

    程序运行中,只需 sign = 1 或者 exiting.set() ,worker 函数则跳过主要运算部分,剩余线程任务将迅速完成,变相达到中止多线程任务的目的。

    [ThreadPoolExecutor-1_0] 0 start
    [ThreadPoolExecutor-1_1] 1 start
    [ThreadPoolExecutor-1_2] 2 start
    [ThreadPoolExecutor-1_0] 0 end
    [ThreadPoolExecutor-1_0] 3 start
    [ThreadPoolExecutor-1_1] 1 end
    [ThreadPoolExecutor-1_0] 3 end
    [ThreadPoolExecutor-1_0] 4 start
    [ThreadPoolExecutor-1_2] 2 end
    [ThreadPoolExecutor-1_0] 4 end
    [ThreadPoolExecutor-1_1] 5 skip
    [ThreadPoolExecutor-1_2] 6 skip
    ......
    [ThreadPoolExecutor-1_0] 18 skip
    [ThreadPoolExecutor-1_2] 17 skip
    [ThreadPoolExecutor-1_1] 19 skip
    

    3. 接收 KeyboardInterrupt 异常取消线程任务

    from concurrent.futures import ThreadPoolExecutor
    import time
    import threading
    import random
    
    def worker(n):
        threading_name = threading.current_thread().name
        print(f'[{threading_name}] {n} start')
        time.sleep(random.randint(1, 6))  # 随机休眠1-6s
        print(f'[{threading_name}] {n} end')
    
    with ThreadPoolExecutor(max_workers=6) as executor:
        threadPool = []
        for q in range(20):
            task = executor.submit(worker, q)
            threadPool.append(task)  # task 任务加入 threadPool
        try:
            while not list(reversed(threadPool))[0].done():  # 判断最后一个任务是否取消/完成
                # 代替 wait(threadPool, return_when=FIRST_EXCEPTION)
                # 利用 while 堵塞且能够接收 KeyboardInterrupt 异常
                time.sleep(2)
        except KeyboardInterrupt:
            # 接收 KeyboardInterrupt 并取消剩余线程任务
            print('KeyboardInterrupt')
            for task in reversed(threadPool):
                task.cancel()
    

    提交给线程池的每个线程任务 task 加入 threadPool中,方便后续对 task 进行操作。当 for 循环内的 task 全部提交后,线程会再后台运行,而进程运行至 while 中堵塞,直至 threadPool 中最后一个线程是否 .done()。若进程堵塞在 while 中接收到 Ctrl+CKeyboardInterrupt 异常,则从后往前取消 threadPool 中所有任务,达到中止目的。

    [ThreadPoolExecutor-0_0] 0 start
    [ThreadPoolExecutor-0_1] 1 start
    [ThreadPoolExecutor-0_2] 2 start
    [ThreadPoolExecutor-0_1] 1 end
    [ThreadPoolExecutor-0_1] 3 start
    [ThreadPoolExecutor-0_0] 0 end
    [ThreadPoolExecutor-0_0] 4 start
    [ThreadPoolExecutor-0_1] 3 end
    [ThreadPoolExecutor-0_1] 5 start
    KeyboardInterrupt
    [ThreadPoolExecutor-0_2] 2 end
    [ThreadPoolExecutor-0_0] 4 end
    [ThreadPoolExecutor-0_1] 5 end
    

    相关文章

      网友评论

          本文标题:Python ThreadPoolExecutor 异常中止解决

          本文链接:https://www.haomeiwen.com/subject/yvebmrtx.html