美文网首页
python multiprocessing Queue and

python multiprocessing Queue and

作者: help_youself | 来源:发表于2021-03-15 22:35 被阅读0次

 多进程通过Queue和Pipe交互数据的开销对比。相比Pipe,通过Queue交互数据,进程之间数据交互有更大的时延。
 The average time of a single message transfer between processes by Queue is much larger than Pipe.

code

byte_codec.py

import struct
class DataReader(object):
    def __init__(self):
        self.buffer=b''
        self.length=0
        self.pos=0
    def append(self,buffer):
        self.buffer+=buffer
        self.length=len(self.buffer)
    def cursor(self):
        return self.pos
    def byte_remain(self):
        return self.length-self.pos
    def read_uint8(self):
        result=0
        success=False
        if self.pos+1<=self.length:
            temp=bytes(self.buffer[self.pos:self.pos+1])
            result,=struct.unpack("B",temp)
            self.pos+=1
            success=True
        return result,success
    def read_uint16(self):
        result=0
        success=False
        if self.pos+2<=self.length:
            temp=self.buffer[self.pos:self.pos+2]
            result,=struct.unpack("!H",temp)
            self.pos+=2
            success=True
        return result,success
    def read_uint32(self):
        result=0
        success=False
        if self.pos+4<=self.length:
            temp=self.buffer[self.pos:self.pos+4]
            result,=struct.unpack("!I",temp)
            self.pos+=4
            success=True
        return result,success
    def read_uint64(self):
        result=0
        success=False
        if self.pos+8<=self.length:
            temp=self.buffer[self.pos:self.pos+8]
            result,=struct.unpack("!Q",temp)
            self.pos+=8
            success=True
        return result,success
    def read_float(self):
        result=0
        success=False
        if self.pos+4<=self.length:
            temp=self.buffer[self.pos:self.pos+4]
            result,=struct.unpack("!f",temp)
            self.pos+=4
            success=True
        return result,success
    def read_double(self):
        result=0
        success=False
        if self.pos+8<=self.length:
            temp=self.buffer[self.pos:self.pos+8]
            result,=struct.unpack("!d",temp)
            self.pos+=8
            success=True
        return result,success
    def read_varint(self):
        result=0
        success=False
        multi=1
        length=self._varint_len()
        if length>0:
            for i in range(length):
                temp=bytes(self.buffer[self.pos+i:self.pos+i+1])
                v,=struct.unpack("B",temp)
                v=v&127
                result=result+v*multi
                multi*=128
            self.pos+=length
            success=True
        return result,success
    def _varint_len(self):
        length=0
        remain=self.byte_remain()
        decodable=False
        for i in range(remain):
            length+=1
            temp=bytes(self.buffer[self.pos+i:self.pos+i+1])
            v,=struct.unpack("B",temp)
            if v&128==0:
                decodable=True
                break
        if decodable is False:
            length=0
        return length
def varient_length(number):
    length=0;
    if number<=(0x7f):
        length=1;
    elif number<=(0x3fff):
        length=2;
    elif number<=(0x1fffff):
       length=3;
    elif number<=(0xfffffff):
       length=4;
    elif number<=(0x7ffffffff):
       length=5;
    elif number<=(0x3ffffffffff):
       length=6;
    elif number<=(0x1ffffffffffff):
       length=7;
    elif number<=(0xffffffffffffff):
        length=8;
    return length
def varint_encode(number):
    s = b""
    while True:
        byte = number % 128
        number = number // 128
        # If there are more digits to encode, set the top bit of this digit
        if number > 0:
            byte = byte|0x80
        s = s + struct.pack("!B", byte)
        if number == 0:
            return s
class DataWriter(object):
    def __init__(self):
        self.buffer=b''
    def length(self):
        return len(self.buffer)
    def content(self):
        return self.buffer
    def write_uint8(self,v):
        self.buffer+=struct.pack("B",v)
    def write_uint16(self,v):
        self.buffer+=struct.pack("!H",v)
    def write_uint32(self,v):
        self.buffer+=struct.pack("!I",v)
    def write_uint64(self,v):
        self.buffer+=struct.pack("!Q",v)
    def write_float(self,v):
        self.buffer+=struct.pack("!f",v)
    def write_double(self,v):
        self.buffer+=struct.pack("!d",v)
    def write_varint(self,v):
        length=varient_length(v)
        if length>0:
            self.buffer+=varint_encode(v)
        else:
            raise Exception("out of range")

message.py

import byte_codec as bc
import time
PADDING=0x00
TCP_MSG=0x00
CTL_MSG=0x01
CTL_MSG_SOCK_CLOSED=0x00
def TimeMillis():
    t=time.time()
    millis=int(round(t * 1000))
    return millis
def TimeMillis32():
    now=TimeMillis()&0xffffffff
    return now
class Request(object):
    def __init__(self,sz,uuid,mid,send_ts,pz):
        self.sz=sz
        self.uuid=uuid
        self.mid=mid
        self.send_ts=send_ts
        self.pz=pz
    def encode2bytes(self):
        writer=bc.DataWriter()
        writer.write_uint16(self.sz)
        writer.write_uint16(self.uuid)
        writer.write_uint32(self.mid)
        writer.write_uint32(self.send_ts)
        return writer.content()
def deserialize_request(buffer):
    reader=bc.DataReader()
    reader.append(buffer)
    sz,_=reader.read_uint16()
    uuid,_=reader.read_uint16()
    mid,_=reader.read_uint32()
    sent_ts,_=reader.read_uint32()
    request=Request(sz,uuid,mid,sent_ts,0)
    return request
class Response(object):
    def __init__(self,uuid,pz):
        self.sz=2+4+4
        if pz>0:
            self.sz+=pz
        self.uuid=uuid
        self.mid=0
        self.send_ts=TimeMillis32()
        self.pz=pz
        self.close=False
    def encode2bytes(self):
        writer=bc.DataWriter()
        close=0
        if self.close:
            close=1
        writer.write_uint8(close)
        writer.write_uint16(self.sz)
        writer.write_uint16(self.uuid)
        writer.write_uint32(self.mid)
        writer.write_uint32(self.send_ts)
        for i in range(self.pz):
            writer.write_uint8(PADDING)
        return writer.content()

consumer.py

#chans[0] provider---->consumer
#chans[1] consumer--->provider
import message as msg
import multiprocessing as mp
import os,time,signal
CHAN_PIPE=0x00
CHAN_QUEUE=0x01
class ChannelManager(object):
    def __init__(self,chty):
        self.chty=chty
        self.channels={}
    def register(self,uuid,chans):
        self.channels.update({uuid:chans})
    def unregister(self,uuid):
        self.channels.pop(uuid,None)
    def get_chans(self,uuid):
        chans=self.channels.get(uuid)
        return chans
class Consumer(mp.Process):
    def __init__(self,uuid,chans,clty):
        self.terminate=False
        self.uuid=uuid
        self.chans=chans
        self.clty=clty
        mp.Process.__init__(self)
    def handle_signal(self, signum, frame):
        self.terminate= True
    def run(self):
        signal.signal(signal.SIGINT,self.handle_signal)
        signal.signal(signal.SIGTERM,self.handle_signal)
        signal.signal(signal.SIGHUP, self.handle_signal) 
        signal.signal(signal.SIGTSTP,self.handle_signal) 
        if self.clty==CHAN_PIPE:
            self.chans[0].close()
        if self.chans:
            while not self.terminate:
                if self.clty==CHAN_QUEUE:
                    self.check_queue()
                elif self.clty==CHAN_PIPE:
                    self.check_pipe()
                else:
                    break
        print(' stop consumer pid {}'.format(self.pid))
    def check_queue(self):
        if self.chans[0].qsize()>0:
            type,data=self.chans[0].get()
            if type==msg.TCP_MSG:
                self.handle_request(data)
            if type==msg.CTL_MSG:
                if data==msg.CTL_MSG_SOCK_CLOSED:
                    print('{} sock closd'.format(self.uuid))
    def check_pipe(self):
        type=-1
        data=None
        if self.chans[1].poll(0):
            try:
                type,data=self.chans[1].recv()
            except EOFError:
                pass
            if data:
                if type==msg.TCP_MSG:
                    self.handle_request(data)
            if type==msg.CTL_MSG:
                if data==msg.CTL_MSG_SOCK_CLOSED:
                    print('{} sock closd'.format(self.uuid))
    def handle_request(self,data):
        request=msg.deserialize_request(data)
        res=msg.Response(self.uuid,0)
        res.mid=request.mid
        if self.clty==CHAN_QUEUE:
            self.chans[1].put([msg.TCP_MSG,res.encode2bytes()])
        elif self.clty==CHAN_PIPE:
            self.chans[1].send((msg.TCP_MSG,res.encode2bytes()))

tcp_server.py

import signal, os
import threading
import multiprocessing as mp
import socket
import select
import errno
import numpy as np
import time
import byte_codec as bc
import message as msg
import consumer
import argparse
class TcpPeer(object):
    def __init__(self,cm,conn):
        self.cm=cm
        self.conn=conn
        self.msg_count=0
        self.last_sent=0
        self.intra_delay=0
        self.trans_delay=0
        self.buffer=b''
        self.uuid=-1
        self.chans=None
        self.dead=False
        self.now=0
        self.ipc_time=0
        self.ipc_count=0
    def __del__(self):
        average_intra_delay=0.0
        average_trans_delay=0.0
        average_ipc_delay=0.0
        if self.msg_count>1:
            average_intra_delay=1.0*self.intra_delay/(self.msg_count-1)
        if self.msg_count>0:
            average_trans_delay=1.0*self.trans_delay/(self.msg_count)
        if self.ipc_count>0:
            average_ipc_delay=1.0*self.ipc_time/self.ipc_count
        print(self.msg_count,average_intra_delay,average_trans_delay,average_ipc_delay)
        if self.chans:
            if  self.cm.chty==consumer.CHAN_QUEUE:
                self.chans[0].put([msg.CTL_MSG,msg.CTL_MSG_SOCK_CLOSED])
            elif self.cm.chty==consumer.CHAN_PIPE:
                self.chans[0].send((msg.CTL_MSG,msg.CTL_MSG_SOCK_CLOSED))
            self.cm.register(self.uuid,self.chans)
            self.chans=None
    def incoming_data(self,buffer):
        self.buffer+=buffer
        all=len(self.buffer)
        close=False
        sz_byte=2
        hz=10
        if all>=sz_byte+hz:
            reader=bc.DataReader()
            reader.append(self.buffer)
            sz,_=reader.read_uint16()
            uuid,_=reader.read_uint16()
            mid,_=reader.read_uint32()
            sent_ts,_=reader.read_uint32()
            pz=0
            if sz>hz:
                pz=sz-hz
            msg_sz=sz_byte+hz+pz
            if all>=msg_sz:
                data=self.buffer[0:msg_sz]
                remain=b''
                if all>msg_sz:
                    remain=self.buffer[msg_sz:all]
                self.buffer=remain
                req=msg.Request(sz,uuid,mid,sent_ts,pz)
                if self.chans is None:
                    self.chans=self.cm.get_chans(uuid)
                    self.uuid=uuid
                    self.cm.unregister(uuid)
                if self.chans:
                    if self.cm.chty==consumer.CHAN_QUEUE:
                        self.chans[0].put([msg.TCP_MSG,data])
                    elif self.cm.chty==consumer.CHAN_PIPE:
                        self.chans[0].send((msg.TCP_MSG,data))
                else:
                    close=True
                now=msg.TimeMillis32()
                self.now=now
                if self.msg_count>0:
                    self.intra_delay+=sent_ts-self.last_sent
                delta=0
                if now>sent_ts:
                    delta=now-sent_ts
                self.trans_delay+=delta
                self.msg_count+=1
                self.last_sent=sent_ts
        return close
    def send_responce(self,data):
        now=msg.TimeMillis32()
        self.ipc_time+=now-self.now
        self.ipc_count+=1
        self.conn.sendall(data)
    def check_chan_data(self):
        type=-1
        data=None
        if self.cm.chty==consumer.CHAN_QUEUE:
            if self.chans and self.chans[1].qsize()>0:
                type,data=self.chans[1].get()
        elif self.cm.chty==consumer.CHAN_PIPE:
            if self.chans and self.chans[0].poll(0):
                try:
                    type,data=self.chans[0].recv()
                except EOFError:
                    pass            
        if data:
            reader=bc.DataReader()
            reader.append(data)
            close,_=reader.read_uint8()
            if type==msg.TCP_MSG:
                self.send_responce(data[1:])
                if close:
                    self.dead=True
    def read_event(self):
        close=False
        buffer=b''
        length=0
        try:
            while True:
                msg=self.conn.recv(1500)
                length+=len(msg)
                if msg:
                    buffer+=msg
                else:
                    if buffer:
                        self.incoming_data(buffer)
                        buffer=b''
                    #print("only close")
                    close=True
                    break
        except socket.error as e:
            err = e.args[0]
            #print ("error: "+str(err))
            if buffer:
                ret=self.incoming_data(buffer)
                if ret:
                   close=True
            if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
                pass
            else:
                close=True
        return close
    def close_fd(self):
        self.dead=True
        self.conn.close()
class TcpServer():
    def __init__(self,cm,mode, port):
        self.cm=cm
        self._thread = None
        self._thread_terminate = False
        if mode == "localhost":
            self.ip = mode
        elif mode == "public":
            self.ip ="0.0.0.0"
        else:
            self.ip ="127.0.0.1"
        self.port = port
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        self._socket.bind((self.ip, self.port))
        self._socket.setblocking(False)
        self.peers={}
        self._socket.listen(128)
        self._epl= select.epoll()
        self._epl.register(self._socket.fileno(),select.EPOLLIN)
    def loop_start(self):
        if self._thread is not None:
            return 
        self._thread_terminate = False
        self._thread = threading.Thread(target=self._thread_main)
        #self._thread.daemon = True
        self._thread.start()
    def loop_stop(self, force=False):
        if self._thread is None:
            return
        self._thread_terminate = True
        if threading.current_thread() != self._thread:
            self._thread.join()
            self._thread = None
    def loop_once(self):
        epoll_list = self._epl.poll(0)
        for fd,events in epoll_list:
            if fd == self._socket.fileno():
                conn,addr =self._socket.accept()
                conn.setblocking(False)
                conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
                peer=TcpPeer(self.cm,conn)
                self.peers.update({conn.fileno():peer})
                self._epl.register(conn.fileno(), select.EPOLLIN)
            elif events == select.EPOLLIN:
                ret=self.peers[fd].read_event()
                if ret:
                    #print("close")
                    self._close(fd)
        for fd in list(self.peers.keys()):
            peer=self.peers[fd]
            peer.check_chan_data()
            if peer.dead:
                self._close(fd)
    def _thread_main(self):
        while True:
            self.loop_once()
            if self._thread_terminate is True:
                self.shutdown()
                break
    def _close(self,fd):
        if fd==self._socket.fileno():
            self._epl.unregister(fd)
            self._socket.close()
        elif fd in self.peers:
            print("fd "+str(fd))
            self._epl.unregister(fd)
            self.peers[fd].close_fd()
            self.peers.pop(fd)
    def shutdown(self):
        for fd, peer in self.peers.items():
            self._epl.unregister(fd)
            peer.close_fd()
        self.peers.clear()
        self._close(self._socket.fileno())
        self._epl.close()
Terminate=False
def signal_handler(signum, frame):
    global Terminate
    Terminate =True
if __name__ == '__main__':
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGHUP, signal_handler) # ctrl+c
    signal.signal(signal.SIGTSTP, signal_handler) #ctrl+z
    parser = argparse.ArgumentParser(description='manual to this script')
    parser.add_argument('--chty', type=str, default ='p')
    args = parser.parse_args()
    chan_type=consumer.CHAN_PIPE
    if args.chty=='q':
        chan_type=consumer.CHAN_QUEUE       
    cm=consumer.ChannelManager(chan_type)
    clients=2
    consumers=[]
    for i in range(clients):
        chans=[]
        if chan_type==consumer.CHAN_QUEUE:
            chans.append(mp.Queue(10))
            chans.append(mp.Queue(10))
        elif chan_type==consumer.CHAN_PIPE:
            parent_conn,child_conn =mp.Pipe()
            chans.append(parent_conn)
            chans.append(child_conn)
        cm.register(i,chans)
        agent=consumer.Consumer(i,chans,chan_type)
        consumers.append(agent)
        agent.start()
        if chan_type==consumer.CHAN_PIPE:
            chans[1].close()
    tcp_server=TcpServer(cm,"localhost",5555)
    while True:
        tcp_server.loop_once()
        if Terminate:
            tcp_server.shutdown()
            break
    for i in range(clients):
        consumers[i].join()

 Data is sent by a c++ tcp client. Build it first: g++ -o client tcp_client.cc.
tcp_client.cc

#include <stdio.h>
#include <assert.h>
#include <signal.h>
#include <iostream>
#include <string>
#include <memory.h>
#include <chrono>
#include <errno.h>   // for errno and strerror_r
#include <unistd.h>
#include <netdb.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h> //for sockaddr_in
#include <arpa/inet.h>  //inet_addr  
#include <netinet/tcp.h> //TCP_NODELAY
#include <unistd.h>
inline int64_t WallTimeNowInUsec(){
    std::chrono::system_clock::duration d = std::chrono::system_clock::now().time_since_epoch();    
    std::chrono::microseconds mic = std::chrono::duration_cast<std::chrono::microseconds>(d);
    return mic.count(); 
}
inline int64_t TimeMillis(){
    return WallTimeNowInUsec()/1000;
}
static uint16_t HostToNet16(uint16_t x) { return __builtin_bswap16(x); }
static uint32_t HostToNet32(uint32_t x) { return __builtin_bswap32(x); }
static uint64_t HostToNet64(uint64_t x) { return __builtin_bswap64(x); }
static uint16_t NetToHost16(uint16_t x) { return HostToNet16(x); }
static uint32_t NetToHost32(uint32_t x) { return HostToNet32(x); }
static uint64_t NetToHost64(uint64_t x) { return HostToNet64(x); }
const int kPaddingSize=100;
const int kBufferSize=1500;
class TcpClient{
public:
    TcpClient(int uuid,int many);
    ~TcpClient();
    bool Init(const char *ip,uint16_t port);
    bool Loop();
private:
    int SendMessage();
    int WaitReply();
    void ParserBuffer();
    void CloseFd();
    int uuid_=-1;
    int many_=0;
    int sockfd_=-1;
    int mid_=0;
    int counter_=0;
    int32_t last_send_delay_=0;
    int64_t inter_send_delay_=0;
    int64_t trans_delay_=0;
    std::string rb_;
};
TcpClient::TcpClient(int uuid,int many):uuid_(uuid),many_(many){}
TcpClient::~TcpClient(){
    CloseFd();
    double average_inter_delay=0;
    double average_trans_delay=0;
    double temp=0.0;
    if(counter_>1){
        average_inter_delay=1.0*inter_send_delay_/(counter_-1);
    }
    if(counter_>0){
     average_trans_delay=1.0*trans_delay_/counter_;
    }

    std::cout<<"dtor "<<counter_<<" "<<average_inter_delay<<" "<<average_trans_delay<<std::endl;
}
bool TcpClient::Init(const char *ip,uint16_t port){
    bool success=true;
    struct sockaddr_in servaddr;
    servaddr.sin_family = AF_INET; 
    servaddr.sin_addr.s_addr = inet_addr(ip);
    servaddr.sin_port = htons(port);
    int flag = 1; 
    if ((sockfd_= socket(AF_INET, SOCK_STREAM, 0)) < 0){
        std::cout<<"Error : Could not create socket"<<std::endl;
        success=false;
        return success;
    }
    setsockopt (sockfd_, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
    if (connect(sockfd_,(struct sockaddr *)&servaddr,sizeof(servaddr)) != 0) { 
        std::cout<<"connection with the server failed"<<std::endl; 
        CloseFd();
        success=false;
        return success;
    }
    return success;
}
bool TcpClient::Loop(){
    bool done=true;
    if(mid_<many_&&sockfd_>0){
        int sent=SendMessage();
        if(sent>0&&WaitReply()>0){
            done=false;
        }
    }
    return done;
}
int TcpClient::SendMessage(){
    char buffer[kBufferSize]={0};
    int off=0;
    uint16_t sz=sizeof(uint16_t)+2*sizeof(uint32_t);
    uint16_t uuid=uuid_;
    uint32_t mid=mid_;
    uint32_t sent_ts=TimeMillis();
    sz=HostToNet16(sz);
    uuid=HostToNet16(uuid);
    mid=HostToNet32(mid);
    sent_ts=HostToNet32(sent_ts);
    mid_++;
    memcpy(buffer+off,(void*)&sz,sizeof(sz));
    off+=sizeof(sz);
    memcpy(buffer+off,(void*)&uuid,sizeof(uuid));
    off+=sizeof(uuid);
    memcpy(buffer+off,(void*)&mid,sizeof(mid));
    off+=sizeof(mid);
    memcpy(buffer+off,(void*)&sent_ts,sizeof(sent_ts));
    off+=sizeof(sent_ts);    
    return write(sockfd_,buffer,off);
}
int TcpClient::WaitReply(){
    char buffer[kBufferSize]={0};
    int ret=recv(sockfd_, buffer, kBufferSize, 0);
    if(ret>0){
        int old=rb_.size();
        rb_.resize(old+ret);
        memcpy(&rb_[old],buffer,ret);
        ParserBuffer();
    }
    return ret;
}
void TcpClient::ParserBuffer(){
    uint16_t sz_byte=2;
    int16_t hz=sizeof(uint16_t)+2*sizeof(uint32_t);
    int off=0;
    uint16_t sz=0;
    uint16_t uuid=0;
    uint32_t mid=0;
    uint32_t sent_ts=0;
    uint16_t pz=0;
    if (rb_.size()>=sz_byte+hz){
        memcpy(&sz,&rb_[off],sizeof(sz));
        off+=sizeof(sz);
        memcpy(&uuid,&rb_[off],sizeof(uuid));
        off+=sizeof(uuid);
        memcpy(&mid,&rb_[off],sizeof(mid));
        off+=sizeof(mid);
        memcpy(&sent_ts,&rb_[off],sizeof(sent_ts));
        off+=sizeof(sent_ts);
        sz=NetToHost16(sz);
        uuid=NetToHost16(uuid);
        mid=NetToHost32(mid);
        sent_ts=NetToHost32(sent_ts);
        if(sz>hz){
            pz=sz-hz;
        }
        off+=pz;
        if(rb_.size()>=off){
            int remain=rb_.size()-off;
            int32_t now=TimeMillis();
            if(counter_>0){
                inter_send_delay_+=sent_ts-last_send_delay_;
            }
            if(now>sent_ts){
                trans_delay_+=now-sent_ts;
            }
            counter_++;
            last_send_delay_=sent_ts;
            if(remain>0){
                const char *data=&rb_[off];
                std::string copy(data,remain);
                copy.swap(rb_);
            }else{
                std::string null_str;
                null_str.swap(rb_);
            }
        }
    }
}
void TcpClient::CloseFd(){
    if(sockfd_>0){
        close(sockfd_);
        sockfd_=-1;
    }
}
static volatile bool g_running=true;
void signal_exit_handler(int sig)
{
    g_running=false;
}
/*usage ./client uuid 0*/
int main(int argc, char *argv[]){
    signal(SIGTERM, signal_exit_handler);
    signal(SIGINT, signal_exit_handler);
    signal(SIGHUP, signal_exit_handler);//ctrl+c
    signal(SIGTSTP, signal_exit_handler);//ctrl+z   
    std::string server_ip="127.0.0.1";
    uint16_t server_port=5555;
    if(argc!=3){
        return 0;
    }
    int uuid=0;
    uuid=std::stoi(argv[2]);
    int many=1000;
    TcpClient client(uuid,many);
    int32_t last=TimeMillis();
    if(client.Init(server_ip.c_str(),server_port)){
        while(!client.Loop()&&g_running){}
    }
    int32_t delta=TimeMillis()-last;
    std::cout<<"run time:"<<delta<<std::endl;
    return 0;
}

Result:

 client sends1000 pieces small messages to python server. The tcp server process will send message (receipt time t1) to consumer through Pipe or Queue. After Processing, consumer will send response message to tcp server, then tcp server will send this responce data (time t2) to client. The delta time (t2-t1) is recoreded and is named as ipc time.

Pipe

python tcp_server.py ----chty p
client uuid 1

 client run time:521 milliseconds. The average ipc time of procesing a piece of meaage is 0.327 milliseconds.

Queue

python tcp_server.py ----chty q
client uuid 1

 client run time:16015 milliseconds. The average ipc time of procesing a piece of meaage is 14.511 milliseconds.

相关文章

网友评论

      本文标题:python multiprocessing Queue and

      本文链接:https://www.haomeiwen.com/subject/thblcltx.html