美文网首页任务编排
python multiprocessing模块实现多进程任务中

python multiprocessing模块实现多进程任务中

作者: 鸟它鸟 | 来源:发表于2019-03-13 22:11 被阅读0次

    python multiprocessing模块实现多进程任务中运行多进程子任务,并实现并发控制。
    起因是想使用celery+ansible做任务执行与回收,代码写好后,发现卧槽 celery不允许他执行的任务再建立子进程,这就比较尴尬了,封装好的ansible接口不能用?? 那怎么做多任务自动执行呢?

    研究celery 发现他的默认执行的多进程机制是multiprocessing模块的Pool,通过代码测试这个模块,发现他也不允许自己的任务再建立子进程,于是乎大概明白什么回事。 后边可以研究下如何修改celery worker的默认并发机制。

    不过本次想要绕过他,于是测试了multiprocessing的Process模块,发现这个模块是允许任务中建立子进程的。
    然后就开始自己写任务执行器,先拿mysql简单做个任务队列,测通后再换kafka或者其他。
    测试完成后,任务可以并行执行了,但是节奏得控制啊,要不好几万并发 自己不就挂了?翻了半天文档发现multiprocessing的Semaphore模块可以做到,测试后,代码如下。

    #!/bin/python
    
    from multiprocessing import Pool
    from multiprocessing import Process,Semaphore,current_process
    import sys, os, time, random, json
    
    project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath('__file__'))))
    sys.path.append(project_dir)
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "wk_api.settings")
    
    import django
    
    django.setup()
    
    from wkexe.models import WorkTables
    from wkexe.models import ExecuteTables
    from utils.ansible_api import ANSRunner
    
    
    def RunPlaybook(ip, ymldir):
        s.acquire()
        print(time.strftime('%H:%M:%S'), current_process().name + " 获得锁运行");
        time.sleep(random.random() * 5)
        rbt = ANSRunner(ip)
        rbt.run_playbook(playbook_path='%s' % (ymldir))
        result = json.dumps(rbt.get_playbook_result(), indent=4)
        print(result)
        s.release()
        print(time.strftime('%H:%M:%S'), current_process().name + " 释放锁结束");
    
    
    def RunModel(ip, model, module_args):
        s.acquire()
        print(time.strftime('%H:%M:%S'), current_process().name + " 获得锁运行");
        rbt = ANSRunner(ip)
        rbt.run_model(model, module_args)
        result = json.dumps(rbt.get_model_result(), indent=4)
        print(result)
        s.release()
        print(time.strftime('%H:%M:%S'), current_process().name + " 释放锁结束");
    
    
    
    if __name__ == '__main__':
        while True:
            p_list = []
            works = WorkTables.objects.all().filter(status=10)
            for work in works:
                concurrent = work.concurrent  # 并发参数
                executeInfo = work.executetables_set.all()  # 需要执行的设备
                taskType = work.taskname.tasktype  # 任务类型
                p = Pool(1)
                for ip in executeInfo:
                    ipAdd = ip.ip
                    ipId = ip.id
                    if taskType == 0:
                        ymlDir = work.taskname.taskymldir
                        print(ymlDir)
                        p = Process(target=RunPlaybook, args=(ipAdd, ymlDir))
                        # p.start()
                        # p.join()
                        # p.apply_async(RunPlaybook, args=(ipAdd, ymlDir))
                        p_list.append(p)
                    elif taskType == 1:
                        model = work.taskname.taskmodel
                        modelArgs = work.taskname.taskargs
                        p = Process(target=RunModel, args=(ipAdd, model, modelArgs))
                        # p.start()
                        # p.join()
                        # p.apply_async(RunModel, args=(ipAdd, model, modelArgs))
                        p_list.append(p)
                    else:
                        print("未定义的任务类型")
    
            s = Semaphore(concurrent) #用来控制对共享资源的访问数量,可以控制同一时刻并发的进程数
            for p in p_list:
                p.start()
    
            #
            for p in p_list:
                # p.close()
                p.join()
            print("执行完成")
    

    相关文章

      网友评论

        本文标题:python multiprocessing模块实现多进程任务中

        本文链接:https://www.haomeiwen.com/subject/oopxmqtx.html