美文网首页
一个基于多进程主从模式的简单实现(Python)

一个基于多进程主从模式的简单实现(Python)

作者: slords | 来源:发表于2019-03-27 18:14 被阅读0次

    这里以一种简单展示的在Python下的主从模式的多进程实现,并对一些实现过程中可能遇到的坑做了简单的说明

    import uuid
    import time
    from multiprocessing import Process, Queue, Value
    from queue import Empty
    
    
    class Master(object):
    
        def __init__(self, job_func, servant_count=5):
            """
            御主类型
            :param job_func: 任务函数
            :type job_func: function
            :param servant_count: 侍从数量
            :type servant_count: int
            """
            self.job_queue = Queue()
            self.job_func = job_func
            self.servant_list = [
                Servant(str(i), job_func, self.job_queue)
                for i in range(servant_count)
            ]
    
        def start(self):
            """
            开始运行
            :return: None
            """
            for servant in self.servant_list:
                servant.start()
    
        def end(self):
            """
            结束运行, 关闭所有侍从
            :return: None
            """
            for servant in self.servant_list:
                servant.inactive()
    
        def add_job(self, *args, **kwargs):
            """
            添加任务
            :param args: 任务函数的参数
            :param kwargs: 任务函数的参数
            :return: 任务id
            :rtype str
            """
            job_id = str(uuid.uuid1())
            self.job_queue.put((job_id, args, kwargs))
            return job_id
    
    
    class Servant(Process):
    
        time_out = 1
    
        def __init__(self, servant_name, job_func, job_queue):
            """
            侍从类型
            :param servant_name: 侍从名称(进程名称)
            :type servant_name: str
            :param job_func: 任务函数
            :type job_func: function
            :param job_queue: 任务队列
            :type job_queue: Queue
            """
            self.job_func = job_func
            self.job_queue = job_queue
            # 侍从状态, Value('b', 0)是C类型里的bite型
            # 这里0表示关闭,1表示开启
            # 该类型可以用于进程共享,效率比基于pickle的共享内存更具效率
            self.active = Value('b', 0)
            super(Servant, self).__init__(name=servant_name)
    
        def inactive(self):
            """
            关闭侍从
            :return: None
            """
            self.active.value = 0
    
        def start(self):
            """
            侍从启动
            :return: None
            """
            self.active.value = 1
            super(Servant, self).start()
    
        def run(self):
            """
            子进程函数
            :return: None
            """
            # 如果仆从一直处于激活状态则持续完成任务
            while self.active.value:
                try:
                    # 获取任务
                    job_id, args, kwargs = self.job_queue.get(timeout=self.time_out)
                except Empty:
                    # 如果没有获取到任务重复循环
                    continue
                try:
                    # 执行任务获取结果
                    result = self.job_func(*args, **kwargs)
                except Exception as e:
                    # 处理任务函数异常
                    error = str(e)
                    result = None
                else:
                    error = None
                # 处理任务结果,这里直接打印
                print(
                    '<{servant_id}|{job_id}> result:{result},error:{error}'.format(
                        job_id=job_id,
                        result=result,
                        error=error,
                        servant_id=self.name
                    )
                )
    
    
    # 用于测试的函数
    # 这里没有把该函数定义在 if __name__ == '__main__' 中,是为了兼容window
    # 因为window不支持fork,在window平台执行时,
    # 子进程创建运行时,会需要将父子进程共享的变量以pickle的方式复制一份,
    # 如果定义在if __name__ == '__main__'中,
    # 则会由于在子进程中引入当前模块时 __name__ != '__main__'
    # 导致函数未被定义,最终pickle信息对应的对象路径不存在,无法还原对应函数
    def test_func(wait_time=1, return_value=None):
        """
        测试任务函数
        :param wait_time: 等待的时间
        :type wait_time: int / float
        :param return_value: 返回值
        :type return_value: object
        :return: return_value
        :rtype: object
        """
        time.sleep(wait_time)
        return return_value
    
    
    if __name__ == '__main__':
        import random
    
        master = Master(job_func=test_func, servant_count=3)
        master.start()
        for x in range(20):
            # 随机等待0~1秒,返回x
            master.add_job(random.random(), x)
        # 等待2秒关闭任务运行
        time.sleep(2)
        master.end()
        # 这个时候正在执行的任务不受影响没执行的任务将不再被执行
        
    # 执行结果
    """
    <0|98bd686c-5077-11e9-acec-1002b543289d> result:0,error:None
    <1|98bdb683-5077-11e9-8494-1002b543289d> result:2,error:None
    <2|98bdb682-5077-11e9-9ffb-1002b543289d> result:1,error:None
    <0|98bdb684-5077-11e9-9133-1002b543289d> result:3,error:None
    <1|98bdb685-5077-11e9-9541-1002b543289d> result:4,error:None
    <0|98bdb687-5077-11e9-afed-1002b543289d> result:6,error:None
    <2|98bdb686-5077-11e9-9753-1002b543289d> result:5,error:None
    <1|98bdb688-5077-11e9-85aa-1002b543289d> result:7,error:None
    <2|98bdb68a-5077-11e9-8121-1002b543289d> result:9,error:None
    <0|98bdb689-5077-11e9-8e73-1002b543289d> result:8,error:None
    """
    

    相关文章

      网友评论

          本文标题:一个基于多进程主从模式的简单实现(Python)

          本文链接:https://www.haomeiwen.com/subject/ujozvqtx.html