美文网首页
python多线程示例 -- 有限调度器处理队列数据

python多线程示例 -- 有限调度器处理队列数据

作者: 打出了枫采 | 来源:发表于2020-04-04 09:36 被阅读0次

    下面代码模拟了一种经典的多线程调度处理常景,4个并行的调度器处理总长度为100的队列中的数据

    • scheduleSources 信号量,用于控制获取调度器和调度器处理完后的释放
    • testFunc 为调度器的处理函数,简单休眠5s后释放调度器资源
    • dataQueue 队列数据 存储了 0 - 99
    • myLogger 用于日志记录
    • 基本逻辑:当队列中还有待处理数据时,就尝试获取调度器,进行处理,调度器处理结束时,释放调度器。
    # -*- coding: utf-8 -*-
    """
    待处理数据队列 长度N,M个在运行调度器处理
    """
    
    from queue import Queue, Empty
    from threading import Lock, Thread, BoundedSemaphore, current_thread, activeCount
    import logging
    import time
    
    
    class AppLogger:
        def __init__(self, moduleName, logfile):
            self._logger = logging.getLogger(moduleName)
            handler = logging.FileHandler(logfile)
            fmt = "%(asctime)-15s %(levelname)s %(filename)s %(lineno)d %(message)s"
            formatter = logging.Formatter(fmt)
            handler.setFormatter(formatter)
            self._logger.addHandler(handler)
            self._logger.setLevel(logging.INFO)
    
            console = logging.StreamHandler()
            console.setLevel(logging.INFO)
            console.setFormatter(formatter)
            self._logger.addHandler(console)
    
            self.warnning = self._logger.warning
            self.error = self._logger.error
            self.info = self._logger.info
            self.debug = self._logger.debug
    
    
    dataQueueMaxLen = 100
    schedulerMaxCount = 4
    
    scheduleSources = BoundedSemaphore(schedulerMaxCount)
    
    dataQueue = Queue(dataQueueMaxLen)
    
    myLogger = AppLogger("myapp","test.log")
    
    class Scheduler:
        def __init__(self, func, data):
            self._thread = Thread(target=func, args=data)
            self.record = "Scheduler(%s, %s)" % (func.__name__, *data)
    
        def __str__(self):
            return self.record
    
        def start(self):
            # myLogger.info(("START ", self.record))
            self._thread.start()
    
    dataList = range(dataQueueMaxLen)
    
    for data in dataList:
        dataQueue.put(data)
    
    def testFunc(data):
        thd = current_thread()
        myLogger.info("Thread %s start testFunc %s sleep 5 seconds" % (thd , data))
    
        time.sleep(5)
        scheduleSources.release()
        myLogger.info("Thread %s exit testFunc %s" % (thd, data))
    
    def startScheduler(func, data):
        if scheduleSources.acquire():
            proc = Scheduler(func, [data])
            proc.start()
    
    if __name__ == "__main__":
        while not dataQueue.empty():
            data = dataQueue.get()
            startScheduler(testFunc, data)
            myLogger.info("current running thread count %d " % activeCount())
    
    
    最终运行打印结果如下,可以看出每隔5秒启动4个线程处理,如此往复处理队列中数据 image.png

    相关文章

      网友评论

          本文标题:python多线程示例 -- 有限调度器处理队列数据

          本文链接:https://www.haomeiwen.com/subject/ijldphtx.html