下面代码模拟了一种经典的多线程调度处理常景,4个并行的调度器处理总长度为100的队列中的数据
- scheduleSources 信号量,用于控制获取调度器和调度器处理完后的释放
- testFunc 为调度器的处理函数,简单休眠5s后释放调度器资源
- dataQueue 队列数据 存储了 0 - 99
- myLogger 用于日志记录
- 基本逻辑:当队列中还有待处理数据时,就尝试获取调度器,进行处理,调度器处理结束时,释放调度器。
# -*- coding: utf-8 -*-
"""
待处理数据队列 长度N,M个在运行调度器处理
"""
from queue import Queue, Empty
from threading import Lock, Thread, BoundedSemaphore, current_thread, activeCount
import logging
import time
class AppLogger:
def __init__(self, moduleName, logfile):
self._logger = logging.getLogger(moduleName)
handler = logging.FileHandler(logfile)
fmt = "%(asctime)-15s %(levelname)s %(filename)s %(lineno)d %(message)s"
formatter = logging.Formatter(fmt)
handler.setFormatter(formatter)
self._logger.addHandler(handler)
self._logger.setLevel(logging.INFO)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
self._logger.addHandler(console)
self.warnning = self._logger.warning
self.error = self._logger.error
self.info = self._logger.info
self.debug = self._logger.debug
dataQueueMaxLen = 100
schedulerMaxCount = 4
scheduleSources = BoundedSemaphore(schedulerMaxCount)
dataQueue = Queue(dataQueueMaxLen)
myLogger = AppLogger("myapp","test.log")
class Scheduler:
def __init__(self, func, data):
self._thread = Thread(target=func, args=data)
self.record = "Scheduler(%s, %s)" % (func.__name__, *data)
def __str__(self):
return self.record
def start(self):
# myLogger.info(("START ", self.record))
self._thread.start()
dataList = range(dataQueueMaxLen)
for data in dataList:
dataQueue.put(data)
def testFunc(data):
thd = current_thread()
myLogger.info("Thread %s start testFunc %s sleep 5 seconds" % (thd , data))
time.sleep(5)
scheduleSources.release()
myLogger.info("Thread %s exit testFunc %s" % (thd, data))
def startScheduler(func, data):
if scheduleSources.acquire():
proc = Scheduler(func, [data])
proc.start()
if __name__ == "__main__":
while not dataQueue.empty():
data = dataQueue.get()
startScheduler(testFunc, data)
myLogger.info("current running thread count %d " % activeCount())
最终运行打印结果如下,可以看出每隔5秒启动4个线程处理,如此往复处理队列中数据
image.png
网友评论