python multiprocessing模块实现多进程任务中运行多进程子任务,并实现并发控制。
起因是想使用celery+ansible做任务执行与回收,代码写好后,发现卧槽 celery不允许他执行的任务再建立子进程,这就比较尴尬了,封装好的ansible接口不能用?? 那怎么做多任务自动执行呢?
研究celery 发现他的默认执行的多进程机制是multiprocessing模块的Pool,通过代码测试这个模块,发现他也不允许自己的任务再建立子进程,于是乎大概明白什么回事。 后边可以研究下如何修改celery worker的默认并发机制。
不过本次想要绕过他,于是测试了multiprocessing的Process模块,发现这个模块是允许任务中建立子进程的。
然后就开始自己写任务执行器,先拿mysql简单做个任务队列,测通后再换kafka或者其他。
测试完成后,任务可以并行执行了,但是节奏得控制啊,要不好几万并发 自己不就挂了?翻了半天文档发现multiprocessing的Semaphore模块可以做到,测试后,代码如下。
#!/bin/python
from multiprocessing import Pool
from multiprocessing import Process,Semaphore,current_process
import sys, os, time, random, json
project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath('__file__'))))
sys.path.append(project_dir)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "wk_api.settings")
import django
django.setup()
from wkexe.models import WorkTables
from wkexe.models import ExecuteTables
from utils.ansible_api import ANSRunner
def RunPlaybook(ip, ymldir):
s.acquire()
print(time.strftime('%H:%M:%S'), current_process().name + " 获得锁运行");
time.sleep(random.random() * 5)
rbt = ANSRunner(ip)
rbt.run_playbook(playbook_path='%s' % (ymldir))
result = json.dumps(rbt.get_playbook_result(), indent=4)
print(result)
s.release()
print(time.strftime('%H:%M:%S'), current_process().name + " 释放锁结束");
def RunModel(ip, model, module_args):
s.acquire()
print(time.strftime('%H:%M:%S'), current_process().name + " 获得锁运行");
rbt = ANSRunner(ip)
rbt.run_model(model, module_args)
result = json.dumps(rbt.get_model_result(), indent=4)
print(result)
s.release()
print(time.strftime('%H:%M:%S'), current_process().name + " 释放锁结束");
if __name__ == '__main__':
while True:
p_list = []
works = WorkTables.objects.all().filter(status=10)
for work in works:
concurrent = work.concurrent # 并发参数
executeInfo = work.executetables_set.all() # 需要执行的设备
taskType = work.taskname.tasktype # 任务类型
p = Pool(1)
for ip in executeInfo:
ipAdd = ip.ip
ipId = ip.id
if taskType == 0:
ymlDir = work.taskname.taskymldir
print(ymlDir)
p = Process(target=RunPlaybook, args=(ipAdd, ymlDir))
# p.start()
# p.join()
# p.apply_async(RunPlaybook, args=(ipAdd, ymlDir))
p_list.append(p)
elif taskType == 1:
model = work.taskname.taskmodel
modelArgs = work.taskname.taskargs
p = Process(target=RunModel, args=(ipAdd, model, modelArgs))
# p.start()
# p.join()
# p.apply_async(RunModel, args=(ipAdd, model, modelArgs))
p_list.append(p)
else:
print("未定义的任务类型")
s = Semaphore(concurrent) #用来控制对共享资源的访问数量,可以控制同一时刻并发的进程数
for p in p_list:
p.start()
#
for p in p_list:
# p.close()
p.join()
print("执行完成")
网友评论