Queue
Consume messages when all arrive
first.py
import multiprocessing as mp
import os,time,signal
class Message(object):
def __init__(self,pid,mid):
self.pid=pid
self.mid=mid
def consumer(num,sinks,messages):
round=0
while round<messages:
count=0
for i in range(num):
if sinks[i].qsize()>0:
count+=1
if count==num:
print("r ",round)
for i in range(num):
m=sinks[i].get()
print(m.pid,m.mid)
round+=1
def provider(pid,source,messages):
mid=0
for i in range(messages):
m=Message(pid,mid)
source.put(m)
time.sleep(0.5)
mid+=1
def signal_handler(signum, frame):
os.killpg(os.getpgid(os.getpid()), signal.SIGKILL)
if __name__ == '__main__':
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler) # ctrl+c
signal.signal(signal.SIGTSTP, signal_handler) #ctrl+z
num=8
queues=[]
agents=[]
messages=100000
for i in range(num):
queues.append(mp.Queue(1))
coordinator= mp.Process(target=consumer,
args=(num,queues,messages))
coordinator.start()
for i in range(num):
p=mp.Process(target=provider,
args=(i+1,queues[i],messages))
agents.append(p)
p.start()
while True:
continue
Processing message independently
second.py
#https://stackoverflow.com/questions/40672264/python-dynamic-multiprocessing-and-signalling-issues
import multiprocessing as mp
import os,time,signal
class Message(object):
def __init__(self,pid,mid):
self.pid=pid
self.mid=mid
class Request(object):
def __init__(self,pid,mid):
self.pid=pid
self.mid=mid
class Consumer(mp.Process):
def __init__(self,num,recv_pads,send_pads):
self.terminate=False
self.num=num
self.recv_pads=recv_pads
self.send_pads=send_pads
mp.Process.__init__(self)
def handle_signal(self, signum, frame):
print('stop consumer (pid: {})'.format(self.pid))
self.terminate= True
def run(self):
signal.signal(signal.SIGINT,self.handle_signal)
signal.signal(signal.SIGTERM,self.handle_signal)
signal.signal(signal.SIGHUP, self.handle_signal)
signal.signal(signal.SIGTSTP,self.handle_signal)
round=0
mid=0
pid=os.getpid()
while not self.terminate:
for i in range(self.num):
req=Request(pid,mid)
if self.send_pads[i].qsize()==0:
self.send_pads[i].put(req)
mid+=1
for i in range(self.num):
if self.recv_pads[i].qsize()>0:
m=self.recv_pads[i].get()
print("msg",m.pid,m.mid)
print(' stop consumer run pid {}'.format(self.pid))
class Provider(mp.Process):
def __init__(self,id,send_pad,recv_pad):
self.terminate=False
self.id=id
self.send_pad=send_pad
self.recv_pad=recv_pad
mp.Process.__init__(self)
def handle_signal(self, signum, frame):
print('stop provider {} pid: {}'.format(self.id,self.pid))
self.terminate= True
def run(self):
signal.signal(signal.SIGINT,self.handle_signal)
signal.signal(signal.SIGTERM,self.handle_signal)
signal.signal(signal.SIGHUP, self.handle_signal)
signal.signal(signal.SIGTSTP,self.handle_signal)
mid=0
pid=os.getpid()
while not self.terminate:
if self.recv_pad.qsize()>0:
info=self.recv_pad.get()
print ("info ",self.id,info.pid,info.mid)
m=Message(pid,mid)
self.send_pad.put(m)
time.sleep(0.5)
mid+=1
print('stop provider run {}'.format(self.pid))
Terminate=False
def signal_handler(signum, frame):
global Terminate
Terminate =True
if __name__ == '__main__':
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler) # ctrl+c
signal.signal(signal.SIGTSTP, signal_handler) #ctrl+z
num=8
#from send pespective
consumer_pads=[]
provider_pads=[]
agents=[]
for i in range(num):
consumer_pads.append(mp.Queue(1))
provider_pads.append(mp.Queue(1))
coordinator=Consumer(num,provider_pads,consumer_pads)
coordinator.start()
for i in range(num):
p=Provider(i+1,provider_pads[i],consumer_pads[i])
agents.append(p)
p.start()
while not Terminate:
continue
for i in range(num):
agents[i].join()
coordinator.join()
Pipe example
pair.py
import multiprocessing
import time
def TimeMillis():
t=time.time()
millis=int(round(t * 1000))
return millis
def TimeMillis32():
now=TimeMillis()&0xffffffff
return now
def adder(pipe):
server_p,client_p=pipe
client_p.close()
count=0
while True:
s1,s2,s3,s4=[],[],[],[]
sum=0
if server_p.poll(0):
try:
s1,s2,s3,s4=server_p.recv()
sum=len(s1)+len(s2)+len(s3)+len(s4)
count+=1
except EOFError:
break
server_p.send((count,sum))
if __name__=="__main__":
(server_p,client_p)=multiprocessing.Pipe()
adder_p=multiprocessing.Process(target=adder,args=((server_p,client_p),))
adder_p.start()
server_p.close()
s1,s2,s3,s4=[],[],[],[]
for i in range (1500):
s1.append(i)
s2.append(i)
s3.append(i)
s4.append(i)
last=TimeMillis32()
ipc_time=0;
ipc_count=0;
for i in range(10):
t1=TimeMillis32()
client_p.send((s1,s2,s3,s4))
count=0
sum=0
try:
count,sum=client_p.recv()
except EOFError:
break
delta=TimeMillis32()-t1
ipc_time+=delta
ipc_count+=1
print(count,sum)
if ipc_count>0:
average=1.0*ipc_time/ipc_count
print('ipc time: {}'.format(average))
run_time=TimeMillis32()-last
print ('run time {}'.format(run_time))
client_p.close()
adder_p.join()
网友评论