美文网首页
python 多进程(multiprocessing)

python 多进程(multiprocessing)

作者: help_youself | 来源:发表于2021-03-15 10:52 被阅读0次

Queue

Consume messages when all arrive

first.py

import multiprocessing as mp
import os,time,signal
class Message(object):
    def __init__(self,pid,mid):
        self.pid=pid
        self.mid=mid
def consumer(num,sinks,messages):
    round=0
    while round<messages:
        count=0
        for i in range(num):
            if sinks[i].qsize()>0:
                count+=1
        if count==num:
            print("r ",round)
            for i in range(num):
                m=sinks[i].get()
                print(m.pid,m.mid)
            round+=1
def provider(pid,source,messages):
    mid=0
    for i in range(messages):
        m=Message(pid,mid)
        source.put(m)
        time.sleep(0.5)
        mid+=1
def signal_handler(signum, frame):
    os.killpg(os.getpgid(os.getpid()), signal.SIGKILL)
if __name__ == '__main__':
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGHUP, signal_handler) # ctrl+c
    signal.signal(signal.SIGTSTP, signal_handler) #ctrl+z
    num=8
    queues=[]
    agents=[]
    messages=100000
    for i in range(num):
        queues.append(mp.Queue(1))
    coordinator= mp.Process(target=consumer,
                             args=(num,queues,messages))
    coordinator.start()
    for i in range(num):
        p=mp.Process(target=provider,
                             args=(i+1,queues[i],messages))
        agents.append(p)
        p.start()
    while True:
        continue

Processing message independently

second.py

#https://stackoverflow.com/questions/40672264/python-dynamic-multiprocessing-and-signalling-issues
import multiprocessing as mp
import os,time,signal
class Message(object):
    def __init__(self,pid,mid):
        self.pid=pid
        self.mid=mid
class Request(object):
    def __init__(self,pid,mid):
        self.pid=pid
        self.mid=mid    
class Consumer(mp.Process):
    def __init__(self,num,recv_pads,send_pads):
        self.terminate=False
        self.num=num
        self.recv_pads=recv_pads
        self.send_pads=send_pads
        mp.Process.__init__(self)
    def handle_signal(self, signum, frame):
        print('stop consumer (pid: {})'.format(self.pid))
        self.terminate= True
    def run(self):
        signal.signal(signal.SIGINT,self.handle_signal)
        signal.signal(signal.SIGTERM,self.handle_signal)
        signal.signal(signal.SIGHUP, self.handle_signal) 
        signal.signal(signal.SIGTSTP,self.handle_signal) 
        round=0
        mid=0
        pid=os.getpid()
        while not self.terminate:
            for i in range(self.num):
                req=Request(pid,mid)
                if self.send_pads[i].qsize()==0:
                    self.send_pads[i].put(req)
                    mid+=1
            for i in range(self.num):
                if self.recv_pads[i].qsize()>0:
                    m=self.recv_pads[i].get()
                    print("msg",m.pid,m.mid)
        print(' stop consumer run pid {}'.format(self.pid))
class Provider(mp.Process):
    def __init__(self,id,send_pad,recv_pad):
        self.terminate=False
        self.id=id
        self.send_pad=send_pad
        self.recv_pad=recv_pad
        mp.Process.__init__(self)
    def handle_signal(self, signum, frame):
        print('stop provider {} pid: {}'.format(self.id,self.pid))
        self.terminate= True
    def run(self):
        signal.signal(signal.SIGINT,self.handle_signal)
        signal.signal(signal.SIGTERM,self.handle_signal)
        signal.signal(signal.SIGHUP, self.handle_signal) 
        signal.signal(signal.SIGTSTP,self.handle_signal)
        mid=0
        pid=os.getpid()
        while not self.terminate:
            if self.recv_pad.qsize()>0:
                info=self.recv_pad.get()
                print ("info ",self.id,info.pid,info.mid)
                m=Message(pid,mid)
                self.send_pad.put(m)
                time.sleep(0.5)
                mid+=1
        print('stop provider run {}'.format(self.pid))
Terminate=False
def signal_handler(signum, frame):
    global Terminate
    Terminate =True
if __name__ == '__main__':
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGHUP, signal_handler) # ctrl+c
    signal.signal(signal.SIGTSTP, signal_handler) #ctrl+z
    num=8
    #from send pespective
    consumer_pads=[]
    provider_pads=[]
    agents=[]
    for i in range(num):
        consumer_pads.append(mp.Queue(1))
        provider_pads.append(mp.Queue(1))
    coordinator=Consumer(num,provider_pads,consumer_pads)
    coordinator.start()
    for i in range(num):
        p=Provider(i+1,provider_pads[i],consumer_pads[i])
        agents.append(p)
        p.start()
    while not Terminate:
        continue
    for i in range(num):
        agents[i].join()
    coordinator.join()

Pipe example

pair.py

import multiprocessing
import time
def TimeMillis():
    t=time.time()
    millis=int(round(t * 1000))
    return millis
def TimeMillis32():
    now=TimeMillis()&0xffffffff
    return now
def adder(pipe):
    server_p,client_p=pipe
    client_p.close()
    count=0
    while True:
        s1,s2,s3,s4=[],[],[],[]
        sum=0
        if server_p.poll(0):
            try:
                s1,s2,s3,s4=server_p.recv()
                sum=len(s1)+len(s2)+len(s3)+len(s4)
                count+=1
            except EOFError:
                break
            server_p.send((count,sum))
if __name__=="__main__":
    (server_p,client_p)=multiprocessing.Pipe()
    adder_p=multiprocessing.Process(target=adder,args=((server_p,client_p),))
    adder_p.start()
    server_p.close()
    s1,s2,s3,s4=[],[],[],[]
    for i in range (1500):
        s1.append(i)
        s2.append(i)
        s3.append(i)
        s4.append(i)
    last=TimeMillis32()
    ipc_time=0;
    ipc_count=0;
    for i in range(10):
        t1=TimeMillis32()
        client_p.send((s1,s2,s3,s4))
        count=0
        sum=0
        try:
            count,sum=client_p.recv()
        except EOFError:
            break
        delta=TimeMillis32()-t1
        ipc_time+=delta
        ipc_count+=1
        print(count,sum)
    if ipc_count>0:
        average=1.0*ipc_time/ipc_count
        print('ipc time: {}'.format(average))
    run_time=TimeMillis32()-last
    print ('run time {}'.format(run_time))
    client_p.close()
    adder_p.join()

相关文章

网友评论

      本文标题:python 多进程(multiprocessing)

      本文链接:https://www.haomeiwen.com/subject/jjghcltx.html