这里以一种简单展示的在Python下的主从模式的多进程实现,并对一些实现过程中可能遇到的坑做了简单的说明
import uuid
import time
from multiprocessing import Process, Queue, Value
from queue import Empty
class Master(object):
def __init__(self, job_func, servant_count=5):
"""
御主类型
:param job_func: 任务函数
:type job_func: function
:param servant_count: 侍从数量
:type servant_count: int
"""
self.job_queue = Queue()
self.job_func = job_func
self.servant_list = [
Servant(str(i), job_func, self.job_queue)
for i in range(servant_count)
]
def start(self):
"""
开始运行
:return: None
"""
for servant in self.servant_list:
servant.start()
def end(self):
"""
结束运行, 关闭所有侍从
:return: None
"""
for servant in self.servant_list:
servant.inactive()
def add_job(self, *args, **kwargs):
"""
添加任务
:param args: 任务函数的参数
:param kwargs: 任务函数的参数
:return: 任务id
:rtype str
"""
job_id = str(uuid.uuid1())
self.job_queue.put((job_id, args, kwargs))
return job_id
class Servant(Process):
time_out = 1
def __init__(self, servant_name, job_func, job_queue):
"""
侍从类型
:param servant_name: 侍从名称(进程名称)
:type servant_name: str
:param job_func: 任务函数
:type job_func: function
:param job_queue: 任务队列
:type job_queue: Queue
"""
self.job_func = job_func
self.job_queue = job_queue
# 侍从状态, Value('b', 0)是C类型里的bite型
# 这里0表示关闭,1表示开启
# 该类型可以用于进程共享,效率比基于pickle的共享内存更具效率
self.active = Value('b', 0)
super(Servant, self).__init__(name=servant_name)
def inactive(self):
"""
关闭侍从
:return: None
"""
self.active.value = 0
def start(self):
"""
侍从启动
:return: None
"""
self.active.value = 1
super(Servant, self).start()
def run(self):
"""
子进程函数
:return: None
"""
# 如果仆从一直处于激活状态则持续完成任务
while self.active.value:
try:
# 获取任务
job_id, args, kwargs = self.job_queue.get(timeout=self.time_out)
except Empty:
# 如果没有获取到任务重复循环
continue
try:
# 执行任务获取结果
result = self.job_func(*args, **kwargs)
except Exception as e:
# 处理任务函数异常
error = str(e)
result = None
else:
error = None
# 处理任务结果,这里直接打印
print(
'<{servant_id}|{job_id}> result:{result},error:{error}'.format(
job_id=job_id,
result=result,
error=error,
servant_id=self.name
)
)
# 用于测试的函数
# 这里没有把该函数定义在 if __name__ == '__main__' 中,是为了兼容window
# 因为window不支持fork,在window平台执行时,
# 子进程创建运行时,会需要将父子进程共享的变量以pickle的方式复制一份,
# 如果定义在if __name__ == '__main__'中,
# 则会由于在子进程中引入当前模块时 __name__ != '__main__'
# 导致函数未被定义,最终pickle信息对应的对象路径不存在,无法还原对应函数
def test_func(wait_time=1, return_value=None):
"""
测试任务函数
:param wait_time: 等待的时间
:type wait_time: int / float
:param return_value: 返回值
:type return_value: object
:return: return_value
:rtype: object
"""
time.sleep(wait_time)
return return_value
if __name__ == '__main__':
import random
master = Master(job_func=test_func, servant_count=3)
master.start()
for x in range(20):
# 随机等待0~1秒,返回x
master.add_job(random.random(), x)
# 等待2秒关闭任务运行
time.sleep(2)
master.end()
# 这个时候正在执行的任务不受影响没执行的任务将不再被执行
# 执行结果
"""
<0|98bd686c-5077-11e9-acec-1002b543289d> result:0,error:None
<1|98bdb683-5077-11e9-8494-1002b543289d> result:2,error:None
<2|98bdb682-5077-11e9-9ffb-1002b543289d> result:1,error:None
<0|98bdb684-5077-11e9-9133-1002b543289d> result:3,error:None
<1|98bdb685-5077-11e9-9541-1002b543289d> result:4,error:None
<0|98bdb687-5077-11e9-afed-1002b543289d> result:6,error:None
<2|98bdb686-5077-11e9-9753-1002b543289d> result:5,error:None
<1|98bdb688-5077-11e9-85aa-1002b543289d> result:7,error:None
<2|98bdb68a-5077-11e9-8121-1002b543289d> result:9,error:None
<0|98bdb689-5077-11e9-8e73-1002b543289d> result:8,error:None
"""
网友评论