python线程池控制

作者: 迷途老鹰 | 来源:发表于2016-07-31 22:19 被阅读357次

    最近一个项目上使用线程池,设定处理项1W,10线程,但是需要检测线程进行状态。出现错误N次,就自动终止线程。实现如下:

    线程池代码

    ERRORCOUNT=0
    IS_EXIT=True
    class Worker(Thread):
        worker_count=0
        timeout=1
        def __init__(self, workQueue,resultQueue,**kwds):
            Thread.__init__(self,**kwds)
            self.id=Worker.worker_count
            Worker.worker_count+=1
            self.setDaemon(True)
            self.workQueue=workQueue
            self.resultQueue=resultQueue
            self.start()
    
        def run(self):
            #the get-some-work,do-some-work main loop of worker threads
            while IS_EXIT:
                try:
                    callable,args,kwds=self.workQueue.get(timeout=Worker.timeout)
                    res=callable(*args,**kwds)
                    self.resultQueue.put(res)
                except Queue.Empty:
                    break
                except:
                    pass
    
    class WorkerManager:
        def __init__(self, num_of_workers=10,timeout=2):
            self.workQueue=Queue.Queue()
            self.resultQueue=Queue.Queue()
            self.workers=[]
            self.timeout=timeout
            self._recruitThreads(num_of_workers)
    
        def _recruitThreads(self,num_of_workers):
            for i in range(num_of_workers):
                worker=Worker(self.workQueue,self.resultQueue)
                self.workers.append(worker)
    
        def wait_for_complete(self):
            #then,wait for each of them to terminate
            while len(self.workers):
                worker=self.workers.pop()
                worker.join(10)
                if worker.isAlive() and not self.workQueue.empty():
                    self.workers.append(worker)
    
        def add_job(self,callable,*args,**kwds):
            self.workQueue.put((callable,args,kwds))
    
        def get_result(self,*args,**kwds):
            return self.resultQueue.get(*args,**kwds)
    

    在WorkerManager中使用到一个全局变量IS_EXIT用来判断是否需要退出线程

    调用线程代码

    wm=WorkerManager(10)#10线程
    for i in range(10000):
        wm.add_job(do_job)
    wm.wait_for_complete()
    

    具体工作代码

    def do_job():
        global ERRORCOUNT
        global IS_EXIT
        try:
            do anything
        except:
            ERRORCOUNT+=1
                if ERRORCOUNT>5:
                    IS_EXIT=False
    

    此处使用了全局变量ERRORCOUNT统计错误数量,超过指定次数,则设置IS_EXIT=False通知线程停止执行。
    至此基本上满足项目所需,但并不友好,应有更好的方式。

    相关文章

      网友评论

        本文标题:python线程池控制

        本文链接:https://www.haomeiwen.com/subject/xflxsttx.html