Python ThreadPoolExecutor 异常中止解决

Python ThreadPoolExecutor 异常中止解决

作者: Simple丶Plan | 来源:发表于2022-06-10 16:59 被阅读0次

1. 原始方案

from concurrent.futures import ThreadPoolExecutor
import time
import threading
import random

def worker(n):
    threading_name = threading.current_thread().name
    print(f'[{threading_name}] {n} start')
    time.sleep(random.randint(1, 6))  # 随机休眠1-6s
    print(f'[{threading_name}] {n} end')

def loop_worker():
    threadPool = ThreadPoolExecutor(max_workers=3)
    for q in range(20):
        threadPool.submit(worker, q)

if __name__ in '__main__':
[ThreadPoolExecutor-1_0] 0 start
[ThreadPoolExecutor-1_1] 1 start
[ThreadPoolExecutor-1_2] 2 start
[ThreadPoolExecutor-1_0] 0 end
[ThreadPoolExecutor-1_0] 3 start
[ThreadPoolExecutor-1_1] 1 end
[ThreadPoolExecutor-1_0] 3 end
[ThreadPoolExecutor-1_0] 4 start
[ThreadPoolExecutor-1_1] 5 start
[ThreadPoolExecutor-1_2] 2 end
[ThreadPoolExecutor-1_2] 6 start
[ThreadPoolExecutor-1_0] 4 end
[ThreadPoolExecutor-1_0] 7 start
[ThreadPoolExecutor-1_1] 5 end
[ThreadPoolExecutor-1_2] 6 end
[ThreadPoolExecutor-1_0] 7 end

通常情况,我们利用 Ctrl+C 让程序触发 KeyboardInterrupt 异常,中止程序运行。线程池方案下,Ctrl-C 失效,当线程池里的线程任务跑完后,才会触发 KeyboardInterrupt

1.1 上下文管理器协议格式

def loop_worker():
    with ThreadPoolExecutor(max_workers=3) as executor:
        for q in range(20):
            executor.submit(worker, q)

上下文管理协议相当于隐性地省略了 threadPool.shutdown(wait=True),同时,程序正常执行完成或出现异常中断的时候,就会调用 __exit__() 方法,接下来进行异常中止的基础。

2. 以全局变量或事务为标识进行判断

适用于 Django 等 WEB 应用框架,本身自带多线程,修改全局变量简单,但要注意线程安全。

from concurrent.futures import ThreadPoolExecutor
import time
import threading
import random

sign = 0
exiting = threading.Event()

def worker(n):
    threading_name = threading.current_thread().name
    if sign == 1 or exiting.is_set():
        # 满足则直接跳过主程序
        print(f'[{threading_name}] {n} skip')
    print(f'[{threading_name}] {n} start')
    time.sleep(random.randint(1, 6))  # 随机休眠1-6s
    print(f'[{threading_name}] {n} end')

def loop_worker():
    with ThreadPoolExecutor(max_workers=3) as executor:
        for q in range(20):
            executor.submit(worker, q)

if __name__ in '__main__':

程序运行中,只需 sign = 1 或者 exiting.set() ,worker 函数则跳过主要运算部分,剩余线程任务将迅速完成,变相达到中止多线程任务的目的。

[ThreadPoolExecutor-1_0] 0 start
[ThreadPoolExecutor-1_1] 1 start
[ThreadPoolExecutor-1_2] 2 start
[ThreadPoolExecutor-1_0] 0 end
[ThreadPoolExecutor-1_0] 3 start
[ThreadPoolExecutor-1_1] 1 end
[ThreadPoolExecutor-1_0] 3 end
[ThreadPoolExecutor-1_0] 4 start
[ThreadPoolExecutor-1_2] 2 end
[ThreadPoolExecutor-1_0] 4 end
[ThreadPoolExecutor-1_1] 5 skip
[ThreadPoolExecutor-1_2] 6 skip
[ThreadPoolExecutor-1_0] 18 skip
[ThreadPoolExecutor-1_2] 17 skip
[ThreadPoolExecutor-1_1] 19 skip

3. 接收 KeyboardInterrupt 异常取消线程任务

from concurrent.futures import ThreadPoolExecutor
import time
import threading
import random

def worker(n):
    threading_name = threading.current_thread().name
    print(f'[{threading_name}] {n} start')
    time.sleep(random.randint(1, 6))  # 随机休眠1-6s
    print(f'[{threading_name}] {n} end')

with ThreadPoolExecutor(max_workers=6) as executor:
    threadPool = []
    for q in range(20):
        task = executor.submit(worker, q)
        threadPool.append(task)  # task 任务加入 threadPool
        while not list(reversed(threadPool))[0].done():  # 判断最后一个任务是否取消/完成
            # 代替 wait(threadPool, return_when=FIRST_EXCEPTION)
            # 利用 while 堵塞且能够接收 KeyboardInterrupt 异常
    except KeyboardInterrupt:
        # 接收 KeyboardInterrupt 并取消剩余线程任务
        for task in reversed(threadPool):

提交给线程池的每个线程任务 task 加入 threadPool中,方便后续对 task 进行操作。当 for 循环内的 task 全部提交后,线程会再后台运行,而进程运行至 while 中堵塞,直至 threadPool 中最后一个线程是否 .done()。若进程堵塞在 while 中接收到 Ctrl+CKeyboardInterrupt 异常,则从后往前取消 threadPool 中所有任务,达到中止目的。

[ThreadPoolExecutor-0_0] 0 start
[ThreadPoolExecutor-0_1] 1 start
[ThreadPoolExecutor-0_2] 2 start
[ThreadPoolExecutor-0_1] 1 end
[ThreadPoolExecutor-0_1] 3 start
[ThreadPoolExecutor-0_0] 0 end
[ThreadPoolExecutor-0_0] 4 start
[ThreadPoolExecutor-0_1] 3 end
[ThreadPoolExecutor-0_1] 5 start
[ThreadPoolExecutor-0_2] 2 end
[ThreadPoolExecutor-0_0] 4 end
[ThreadPoolExecutor-0_1] 5 end


  • Python ThreadPoolExecutor 异常中止解决

    1. 原始方案 通常情况,我们利用 Ctrl+C 让程序触发 KeyboardInterrupt 异常,中止程序运...

  • python基础2

    Python的异常处理机制 Bug的常见类型 被动掉坑的解决方案 python提供了异常处理机制,可以在异常出现时...

  • Crash 日志


  • 1.1.3 正确的线程中止方法

    什么是线程中止 线程运行、抛出异常、或者人为地结束,都会导致线程进入中止状态。 错误的人为中止线程 调用stop方...

  • Python3 常见错误和异常处理

    Python3常见错误 异常名称描述解决方法BaseException所有异常的父类Python所有的错误都是从B...

  • 解决 ThreadPoolExecutor

    1、任务开始,线程池为空; 2、先后有7个任务加入到线程池中,正在运行1个,已完成6个 3、关闭其他线程,关闭线程...

  • python ThreadPoolExecutor


  • python线程池ThreadPoolExecutor.subm


  • PyCharm Python3 Debug环境搭建

    切换默认配置Python2.7到Python3.5后,debug功能异常,依次解决。 1,执行debug,报错:w...

  • 2019-04-29



      本文标题:Python ThreadPoolExecutor 异常中止解决
