先来个最简单的例子:
把1-10000每个数求平方
服务器server:
用两个队列存储任务、结果
定义两个函数
要实现分布式得继承multiprocessing.managers.BaseManager
在主函数里multiprocessing.freeze_support()开启分布式支持
注册两个函数给客户端调用
创建管理器,设置ip地址和开启端口、链接密码。
用两个队列加任务、收结果。用刚刚注册的函数
把1-10000压入队列,
把结果压入队列
最后完成关闭服务器
客户端client:
也需要继承multiprocessing.managers.BaseManager
定义一个协程处理一个数据,同时把结果压入结果队列
定义一个线程处理10个数据,开启10个协程
定义一个进程,进程驱动10个线程
主函数:同客户端注册两个函数
同客户端创建管理器,设置ip地址和开启端口、链接密码。
链接服务器
同客户端调用注册的函数,两个队列
套四层循环:10个进程、100个线程、1000个协程
循环进程函数
上代码:
服务器server:
#coding:utf-8
import multiprocessing #分布式进程
import multiprocessing.managers #分布式进程管理器
import random,time #随机数,时间
import Queue #队列
task_queue=Queue.Queue() #任务
result_queue=Queue.Queue() #结果
def return_task(): #返回任务队列
return task_queue
def return_result(): #返回结果队列
return result_queue
class QueueManger(multiprocessing.managers.BaseManager):#继承,进程管理共享数据
pass
if __name__=="__main__":
multiprocessing.freeze_support()#开启分布式支持
QueueManger.register("get_task",callable=return_task)#注册函数给客户端调用
QueueManger.register("get_result", callable=return_result)
manger=QueueManger(address=("192.168.112.11",8848),authkey="123456") #创建一个管理器,设置地址与密码
manger.start() #开启
task,result=manger.get_task(),manger.get_result() #任务,结果
for i in range(10000):
print "task add data",i
task.put(i)
print "waitting for------"
for i in range(10000):
res=result.get(timeout=100)
print "get data",res
manger.shutdown()#关闭服务器
客户端client:
#coding:utf-8
import multiprocessing #分布式进程
import multiprocessing.managers # 分布式进程管理器
import random,time #随机数,时间
import Queue #队列
import threading
import gevent
import gevent.monkey
class QueueManger(multiprocessing.managers.BaseManager):# 继承,进程管理共享数据
pass
def gevetygo(num ,result): #协程处理一个数据
print num*num
result.put(num*num)
def threadgo(datalist,result): # 线程处理10个数据,开启10个协程
tasklist=[]
for data in datalist:
tasklist.append(gevent.spawn(gevetygo, data,result))
gevent.joinall(tasklist)
def processgo(ddatalist,result): # [[1,2,3],[4,5,6]] 进程驱动了10个线程
threadlist=[]
for datalist in ddatalist:
mythread=threading.Thread(target=threadgo,args=(datalist,result))
mythread.start()
threadlist.append(mythread)
for mythread in threadlist:
mythread.join()
if __name__=="__main__":
QueueManger.register("get_task") # 注册函数调用服务器
QueueManger.register("get_result")
manger=QueueManger(address=("192.168.112.11",8848),authkey="123456")
manger.connect() # 链接服务器
task= manger.get_task()
result =manger.get_result() # 任务,结果
# 1000
# 10个进程
# 100个线程
# 1000个协程
for i in range(10):
cubelist = [] # [[[1],[2]]]
for j in range(10):
arealist = []
for k in range(10):
linelist = []
for l in range(10):
data = task.get()
linelist.append(data)
arealist.append(linelist)
cubelist.append(arealist)
processlist = []
for myarealist in cubelist:
process = multiprocessing.Process(target=processgo, args=(myarealist, result))
process.start()
processlist.append(process)
for process in processlist:
process.join()
遇到的坑:一个月之前弄分布式的时候写ip地址怎么都开启不了,后来换了台电脑就支持了= =。
如果只是在自己电脑上弄的话,写127.0.0.1也可以运行,如果你也遇到ip地址怎么都开启不了的情况
整理不易,如果觉得有所帮助,希望可以留下您的精彩言论再走。赶快为你们最喜欢的框架打Call吧。
大家如果需要Python的学习资料可以加我的Qun:834179111,小编整理了,从Python入门零基础到项目实战的资料。欢迎还没有找到方向的小伙伴来学习。
本文转自网络 如有侵权 请联系小编删除
网友评论