多进程通过Queue和Pipe交互数据的开销对比。相比Pipe,通过Queue交互数据,进程之间数据交互有更大的时延。
The average time of a single message transfer between processes by Queue is much larger than Pipe.
code
byte_codec.py
import struct
class DataReader(object):
def __init__(self):
self.buffer=b''
self.length=0
self.pos=0
def append(self,buffer):
self.buffer+=buffer
self.length=len(self.buffer)
def cursor(self):
return self.pos
def byte_remain(self):
return self.length-self.pos
def read_uint8(self):
result=0
success=False
if self.pos+1<=self.length:
temp=bytes(self.buffer[self.pos:self.pos+1])
result,=struct.unpack("B",temp)
self.pos+=1
success=True
return result,success
def read_uint16(self):
result=0
success=False
if self.pos+2<=self.length:
temp=self.buffer[self.pos:self.pos+2]
result,=struct.unpack("!H",temp)
self.pos+=2
success=True
return result,success
def read_uint32(self):
result=0
success=False
if self.pos+4<=self.length:
temp=self.buffer[self.pos:self.pos+4]
result,=struct.unpack("!I",temp)
self.pos+=4
success=True
return result,success
def read_uint64(self):
result=0
success=False
if self.pos+8<=self.length:
temp=self.buffer[self.pos:self.pos+8]
result,=struct.unpack("!Q",temp)
self.pos+=8
success=True
return result,success
def read_float(self):
result=0
success=False
if self.pos+4<=self.length:
temp=self.buffer[self.pos:self.pos+4]
result,=struct.unpack("!f",temp)
self.pos+=4
success=True
return result,success
def read_double(self):
result=0
success=False
if self.pos+8<=self.length:
temp=self.buffer[self.pos:self.pos+8]
result,=struct.unpack("!d",temp)
self.pos+=8
success=True
return result,success
def read_varint(self):
result=0
success=False
multi=1
length=self._varint_len()
if length>0:
for i in range(length):
temp=bytes(self.buffer[self.pos+i:self.pos+i+1])
v,=struct.unpack("B",temp)
v=v&127
result=result+v*multi
multi*=128
self.pos+=length
success=True
return result,success
def _varint_len(self):
length=0
remain=self.byte_remain()
decodable=False
for i in range(remain):
length+=1
temp=bytes(self.buffer[self.pos+i:self.pos+i+1])
v,=struct.unpack("B",temp)
if v&128==0:
decodable=True
break
if decodable is False:
length=0
return length
def varient_length(number):
length=0;
if number<=(0x7f):
length=1;
elif number<=(0x3fff):
length=2;
elif number<=(0x1fffff):
length=3;
elif number<=(0xfffffff):
length=4;
elif number<=(0x7ffffffff):
length=5;
elif number<=(0x3ffffffffff):
length=6;
elif number<=(0x1ffffffffffff):
length=7;
elif number<=(0xffffffffffffff):
length=8;
return length
def varint_encode(number):
s = b""
while True:
byte = number % 128
number = number // 128
# If there are more digits to encode, set the top bit of this digit
if number > 0:
byte = byte|0x80
s = s + struct.pack("!B", byte)
if number == 0:
return s
class DataWriter(object):
def __init__(self):
self.buffer=b''
def length(self):
return len(self.buffer)
def content(self):
return self.buffer
def write_uint8(self,v):
self.buffer+=struct.pack("B",v)
def write_uint16(self,v):
self.buffer+=struct.pack("!H",v)
def write_uint32(self,v):
self.buffer+=struct.pack("!I",v)
def write_uint64(self,v):
self.buffer+=struct.pack("!Q",v)
def write_float(self,v):
self.buffer+=struct.pack("!f",v)
def write_double(self,v):
self.buffer+=struct.pack("!d",v)
def write_varint(self,v):
length=varient_length(v)
if length>0:
self.buffer+=varint_encode(v)
else:
raise Exception("out of range")
message.py
import byte_codec as bc
import time
PADDING=0x00
TCP_MSG=0x00
CTL_MSG=0x01
CTL_MSG_SOCK_CLOSED=0x00
def TimeMillis():
t=time.time()
millis=int(round(t * 1000))
return millis
def TimeMillis32():
now=TimeMillis()&0xffffffff
return now
class Request(object):
def __init__(self,sz,uuid,mid,send_ts,pz):
self.sz=sz
self.uuid=uuid
self.mid=mid
self.send_ts=send_ts
self.pz=pz
def encode2bytes(self):
writer=bc.DataWriter()
writer.write_uint16(self.sz)
writer.write_uint16(self.uuid)
writer.write_uint32(self.mid)
writer.write_uint32(self.send_ts)
return writer.content()
def deserialize_request(buffer):
reader=bc.DataReader()
reader.append(buffer)
sz,_=reader.read_uint16()
uuid,_=reader.read_uint16()
mid,_=reader.read_uint32()
sent_ts,_=reader.read_uint32()
request=Request(sz,uuid,mid,sent_ts,0)
return request
class Response(object):
def __init__(self,uuid,pz):
self.sz=2+4+4
if pz>0:
self.sz+=pz
self.uuid=uuid
self.mid=0
self.send_ts=TimeMillis32()
self.pz=pz
self.close=False
def encode2bytes(self):
writer=bc.DataWriter()
close=0
if self.close:
close=1
writer.write_uint8(close)
writer.write_uint16(self.sz)
writer.write_uint16(self.uuid)
writer.write_uint32(self.mid)
writer.write_uint32(self.send_ts)
for i in range(self.pz):
writer.write_uint8(PADDING)
return writer.content()
consumer.py
#chans[0] provider---->consumer
#chans[1] consumer--->provider
import message as msg
import multiprocessing as mp
import os,time,signal
CHAN_PIPE=0x00
CHAN_QUEUE=0x01
class ChannelManager(object):
def __init__(self,chty):
self.chty=chty
self.channels={}
def register(self,uuid,chans):
self.channels.update({uuid:chans})
def unregister(self,uuid):
self.channels.pop(uuid,None)
def get_chans(self,uuid):
chans=self.channels.get(uuid)
return chans
class Consumer(mp.Process):
def __init__(self,uuid,chans,clty):
self.terminate=False
self.uuid=uuid
self.chans=chans
self.clty=clty
mp.Process.__init__(self)
def handle_signal(self, signum, frame):
self.terminate= True
def run(self):
signal.signal(signal.SIGINT,self.handle_signal)
signal.signal(signal.SIGTERM,self.handle_signal)
signal.signal(signal.SIGHUP, self.handle_signal)
signal.signal(signal.SIGTSTP,self.handle_signal)
if self.clty==CHAN_PIPE:
self.chans[0].close()
if self.chans:
while not self.terminate:
if self.clty==CHAN_QUEUE:
self.check_queue()
elif self.clty==CHAN_PIPE:
self.check_pipe()
else:
break
print(' stop consumer pid {}'.format(self.pid))
def check_queue(self):
if self.chans[0].qsize()>0:
type,data=self.chans[0].get()
if type==msg.TCP_MSG:
self.handle_request(data)
if type==msg.CTL_MSG:
if data==msg.CTL_MSG_SOCK_CLOSED:
print('{} sock closd'.format(self.uuid))
def check_pipe(self):
type=-1
data=None
if self.chans[1].poll(0):
try:
type,data=self.chans[1].recv()
except EOFError:
pass
if data:
if type==msg.TCP_MSG:
self.handle_request(data)
if type==msg.CTL_MSG:
if data==msg.CTL_MSG_SOCK_CLOSED:
print('{} sock closd'.format(self.uuid))
def handle_request(self,data):
request=msg.deserialize_request(data)
res=msg.Response(self.uuid,0)
res.mid=request.mid
if self.clty==CHAN_QUEUE:
self.chans[1].put([msg.TCP_MSG,res.encode2bytes()])
elif self.clty==CHAN_PIPE:
self.chans[1].send((msg.TCP_MSG,res.encode2bytes()))
tcp_server.py
import signal, os
import threading
import multiprocessing as mp
import socket
import select
import errno
import numpy as np
import time
import byte_codec as bc
import message as msg
import consumer
import argparse
class TcpPeer(object):
def __init__(self,cm,conn):
self.cm=cm
self.conn=conn
self.msg_count=0
self.last_sent=0
self.intra_delay=0
self.trans_delay=0
self.buffer=b''
self.uuid=-1
self.chans=None
self.dead=False
self.now=0
self.ipc_time=0
self.ipc_count=0
def __del__(self):
average_intra_delay=0.0
average_trans_delay=0.0
average_ipc_delay=0.0
if self.msg_count>1:
average_intra_delay=1.0*self.intra_delay/(self.msg_count-1)
if self.msg_count>0:
average_trans_delay=1.0*self.trans_delay/(self.msg_count)
if self.ipc_count>0:
average_ipc_delay=1.0*self.ipc_time/self.ipc_count
print(self.msg_count,average_intra_delay,average_trans_delay,average_ipc_delay)
if self.chans:
if self.cm.chty==consumer.CHAN_QUEUE:
self.chans[0].put([msg.CTL_MSG,msg.CTL_MSG_SOCK_CLOSED])
elif self.cm.chty==consumer.CHAN_PIPE:
self.chans[0].send((msg.CTL_MSG,msg.CTL_MSG_SOCK_CLOSED))
self.cm.register(self.uuid,self.chans)
self.chans=None
def incoming_data(self,buffer):
self.buffer+=buffer
all=len(self.buffer)
close=False
sz_byte=2
hz=10
if all>=sz_byte+hz:
reader=bc.DataReader()
reader.append(self.buffer)
sz,_=reader.read_uint16()
uuid,_=reader.read_uint16()
mid,_=reader.read_uint32()
sent_ts,_=reader.read_uint32()
pz=0
if sz>hz:
pz=sz-hz
msg_sz=sz_byte+hz+pz
if all>=msg_sz:
data=self.buffer[0:msg_sz]
remain=b''
if all>msg_sz:
remain=self.buffer[msg_sz:all]
self.buffer=remain
req=msg.Request(sz,uuid,mid,sent_ts,pz)
if self.chans is None:
self.chans=self.cm.get_chans(uuid)
self.uuid=uuid
self.cm.unregister(uuid)
if self.chans:
if self.cm.chty==consumer.CHAN_QUEUE:
self.chans[0].put([msg.TCP_MSG,data])
elif self.cm.chty==consumer.CHAN_PIPE:
self.chans[0].send((msg.TCP_MSG,data))
else:
close=True
now=msg.TimeMillis32()
self.now=now
if self.msg_count>0:
self.intra_delay+=sent_ts-self.last_sent
delta=0
if now>sent_ts:
delta=now-sent_ts
self.trans_delay+=delta
self.msg_count+=1
self.last_sent=sent_ts
return close
def send_responce(self,data):
now=msg.TimeMillis32()
self.ipc_time+=now-self.now
self.ipc_count+=1
self.conn.sendall(data)
def check_chan_data(self):
type=-1
data=None
if self.cm.chty==consumer.CHAN_QUEUE:
if self.chans and self.chans[1].qsize()>0:
type,data=self.chans[1].get()
elif self.cm.chty==consumer.CHAN_PIPE:
if self.chans and self.chans[0].poll(0):
try:
type,data=self.chans[0].recv()
except EOFError:
pass
if data:
reader=bc.DataReader()
reader.append(data)
close,_=reader.read_uint8()
if type==msg.TCP_MSG:
self.send_responce(data[1:])
if close:
self.dead=True
def read_event(self):
close=False
buffer=b''
length=0
try:
while True:
msg=self.conn.recv(1500)
length+=len(msg)
if msg:
buffer+=msg
else:
if buffer:
self.incoming_data(buffer)
buffer=b''
#print("only close")
close=True
break
except socket.error as e:
err = e.args[0]
#print ("error: "+str(err))
if buffer:
ret=self.incoming_data(buffer)
if ret:
close=True
if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
pass
else:
close=True
return close
def close_fd(self):
self.dead=True
self.conn.close()
class TcpServer():
def __init__(self,cm,mode, port):
self.cm=cm
self._thread = None
self._thread_terminate = False
if mode == "localhost":
self.ip = mode
elif mode == "public":
self.ip ="0.0.0.0"
else:
self.ip ="127.0.0.1"
self.port = port
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self._socket.bind((self.ip, self.port))
self._socket.setblocking(False)
self.peers={}
self._socket.listen(128)
self._epl= select.epoll()
self._epl.register(self._socket.fileno(),select.EPOLLIN)
def loop_start(self):
if self._thread is not None:
return
self._thread_terminate = False
self._thread = threading.Thread(target=self._thread_main)
#self._thread.daemon = True
self._thread.start()
def loop_stop(self, force=False):
if self._thread is None:
return
self._thread_terminate = True
if threading.current_thread() != self._thread:
self._thread.join()
self._thread = None
def loop_once(self):
epoll_list = self._epl.poll(0)
for fd,events in epoll_list:
if fd == self._socket.fileno():
conn,addr =self._socket.accept()
conn.setblocking(False)
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
peer=TcpPeer(self.cm,conn)
self.peers.update({conn.fileno():peer})
self._epl.register(conn.fileno(), select.EPOLLIN)
elif events == select.EPOLLIN:
ret=self.peers[fd].read_event()
if ret:
#print("close")
self._close(fd)
for fd in list(self.peers.keys()):
peer=self.peers[fd]
peer.check_chan_data()
if peer.dead:
self._close(fd)
def _thread_main(self):
while True:
self.loop_once()
if self._thread_terminate is True:
self.shutdown()
break
def _close(self,fd):
if fd==self._socket.fileno():
self._epl.unregister(fd)
self._socket.close()
elif fd in self.peers:
print("fd "+str(fd))
self._epl.unregister(fd)
self.peers[fd].close_fd()
self.peers.pop(fd)
def shutdown(self):
for fd, peer in self.peers.items():
self._epl.unregister(fd)
peer.close_fd()
self.peers.clear()
self._close(self._socket.fileno())
self._epl.close()
Terminate=False
def signal_handler(signum, frame):
global Terminate
Terminate =True
if __name__ == '__main__':
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler) # ctrl+c
signal.signal(signal.SIGTSTP, signal_handler) #ctrl+z
parser = argparse.ArgumentParser(description='manual to this script')
parser.add_argument('--chty', type=str, default ='p')
args = parser.parse_args()
chan_type=consumer.CHAN_PIPE
if args.chty=='q':
chan_type=consumer.CHAN_QUEUE
cm=consumer.ChannelManager(chan_type)
clients=2
consumers=[]
for i in range(clients):
chans=[]
if chan_type==consumer.CHAN_QUEUE:
chans.append(mp.Queue(10))
chans.append(mp.Queue(10))
elif chan_type==consumer.CHAN_PIPE:
parent_conn,child_conn =mp.Pipe()
chans.append(parent_conn)
chans.append(child_conn)
cm.register(i,chans)
agent=consumer.Consumer(i,chans,chan_type)
consumers.append(agent)
agent.start()
if chan_type==consumer.CHAN_PIPE:
chans[1].close()
tcp_server=TcpServer(cm,"localhost",5555)
while True:
tcp_server.loop_once()
if Terminate:
tcp_server.shutdown()
break
for i in range(clients):
consumers[i].join()
Data is sent by a c++ tcp client. Build it first: g++ -o client tcp_client.cc.
tcp_client.cc
#include <stdio.h>
#include <assert.h>
#include <signal.h>
#include <iostream>
#include <string>
#include <memory.h>
#include <chrono>
#include <errno.h> // for errno and strerror_r
#include <unistd.h>
#include <netdb.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h> //for sockaddr_in
#include <arpa/inet.h> //inet_addr
#include <netinet/tcp.h> //TCP_NODELAY
#include <unistd.h>
inline int64_t WallTimeNowInUsec(){
std::chrono::system_clock::duration d = std::chrono::system_clock::now().time_since_epoch();
std::chrono::microseconds mic = std::chrono::duration_cast<std::chrono::microseconds>(d);
return mic.count();
}
inline int64_t TimeMillis(){
return WallTimeNowInUsec()/1000;
}
static uint16_t HostToNet16(uint16_t x) { return __builtin_bswap16(x); }
static uint32_t HostToNet32(uint32_t x) { return __builtin_bswap32(x); }
static uint64_t HostToNet64(uint64_t x) { return __builtin_bswap64(x); }
static uint16_t NetToHost16(uint16_t x) { return HostToNet16(x); }
static uint32_t NetToHost32(uint32_t x) { return HostToNet32(x); }
static uint64_t NetToHost64(uint64_t x) { return HostToNet64(x); }
const int kPaddingSize=100;
const int kBufferSize=1500;
class TcpClient{
public:
TcpClient(int uuid,int many);
~TcpClient();
bool Init(const char *ip,uint16_t port);
bool Loop();
private:
int SendMessage();
int WaitReply();
void ParserBuffer();
void CloseFd();
int uuid_=-1;
int many_=0;
int sockfd_=-1;
int mid_=0;
int counter_=0;
int32_t last_send_delay_=0;
int64_t inter_send_delay_=0;
int64_t trans_delay_=0;
std::string rb_;
};
TcpClient::TcpClient(int uuid,int many):uuid_(uuid),many_(many){}
TcpClient::~TcpClient(){
CloseFd();
double average_inter_delay=0;
double average_trans_delay=0;
double temp=0.0;
if(counter_>1){
average_inter_delay=1.0*inter_send_delay_/(counter_-1);
}
if(counter_>0){
average_trans_delay=1.0*trans_delay_/counter_;
}
std::cout<<"dtor "<<counter_<<" "<<average_inter_delay<<" "<<average_trans_delay<<std::endl;
}
bool TcpClient::Init(const char *ip,uint16_t port){
bool success=true;
struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = inet_addr(ip);
servaddr.sin_port = htons(port);
int flag = 1;
if ((sockfd_= socket(AF_INET, SOCK_STREAM, 0)) < 0){
std::cout<<"Error : Could not create socket"<<std::endl;
success=false;
return success;
}
setsockopt (sockfd_, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
if (connect(sockfd_,(struct sockaddr *)&servaddr,sizeof(servaddr)) != 0) {
std::cout<<"connection with the server failed"<<std::endl;
CloseFd();
success=false;
return success;
}
return success;
}
bool TcpClient::Loop(){
bool done=true;
if(mid_<many_&&sockfd_>0){
int sent=SendMessage();
if(sent>0&&WaitReply()>0){
done=false;
}
}
return done;
}
int TcpClient::SendMessage(){
char buffer[kBufferSize]={0};
int off=0;
uint16_t sz=sizeof(uint16_t)+2*sizeof(uint32_t);
uint16_t uuid=uuid_;
uint32_t mid=mid_;
uint32_t sent_ts=TimeMillis();
sz=HostToNet16(sz);
uuid=HostToNet16(uuid);
mid=HostToNet32(mid);
sent_ts=HostToNet32(sent_ts);
mid_++;
memcpy(buffer+off,(void*)&sz,sizeof(sz));
off+=sizeof(sz);
memcpy(buffer+off,(void*)&uuid,sizeof(uuid));
off+=sizeof(uuid);
memcpy(buffer+off,(void*)&mid,sizeof(mid));
off+=sizeof(mid);
memcpy(buffer+off,(void*)&sent_ts,sizeof(sent_ts));
off+=sizeof(sent_ts);
return write(sockfd_,buffer,off);
}
int TcpClient::WaitReply(){
char buffer[kBufferSize]={0};
int ret=recv(sockfd_, buffer, kBufferSize, 0);
if(ret>0){
int old=rb_.size();
rb_.resize(old+ret);
memcpy(&rb_[old],buffer,ret);
ParserBuffer();
}
return ret;
}
void TcpClient::ParserBuffer(){
uint16_t sz_byte=2;
int16_t hz=sizeof(uint16_t)+2*sizeof(uint32_t);
int off=0;
uint16_t sz=0;
uint16_t uuid=0;
uint32_t mid=0;
uint32_t sent_ts=0;
uint16_t pz=0;
if (rb_.size()>=sz_byte+hz){
memcpy(&sz,&rb_[off],sizeof(sz));
off+=sizeof(sz);
memcpy(&uuid,&rb_[off],sizeof(uuid));
off+=sizeof(uuid);
memcpy(&mid,&rb_[off],sizeof(mid));
off+=sizeof(mid);
memcpy(&sent_ts,&rb_[off],sizeof(sent_ts));
off+=sizeof(sent_ts);
sz=NetToHost16(sz);
uuid=NetToHost16(uuid);
mid=NetToHost32(mid);
sent_ts=NetToHost32(sent_ts);
if(sz>hz){
pz=sz-hz;
}
off+=pz;
if(rb_.size()>=off){
int remain=rb_.size()-off;
int32_t now=TimeMillis();
if(counter_>0){
inter_send_delay_+=sent_ts-last_send_delay_;
}
if(now>sent_ts){
trans_delay_+=now-sent_ts;
}
counter_++;
last_send_delay_=sent_ts;
if(remain>0){
const char *data=&rb_[off];
std::string copy(data,remain);
copy.swap(rb_);
}else{
std::string null_str;
null_str.swap(rb_);
}
}
}
}
void TcpClient::CloseFd(){
if(sockfd_>0){
close(sockfd_);
sockfd_=-1;
}
}
static volatile bool g_running=true;
void signal_exit_handler(int sig)
{
g_running=false;
}
/*usage ./client uuid 0*/
int main(int argc, char *argv[]){
signal(SIGTERM, signal_exit_handler);
signal(SIGINT, signal_exit_handler);
signal(SIGHUP, signal_exit_handler);//ctrl+c
signal(SIGTSTP, signal_exit_handler);//ctrl+z
std::string server_ip="127.0.0.1";
uint16_t server_port=5555;
if(argc!=3){
return 0;
}
int uuid=0;
uuid=std::stoi(argv[2]);
int many=1000;
TcpClient client(uuid,many);
int32_t last=TimeMillis();
if(client.Init(server_ip.c_str(),server_port)){
while(!client.Loop()&&g_running){}
}
int32_t delta=TimeMillis()-last;
std::cout<<"run time:"<<delta<<std::endl;
return 0;
}
Result:
client sends1000 pieces small messages to python server. The tcp server process will send message (receipt time t1) to consumer through Pipe or Queue. After Processing, consumer will send response message to tcp server, then tcp server will send this responce data (time t2) to client. The delta time (t2-t1) is recoreded and is named as ipc time.
Pipe
python tcp_server.py ----chty p
client uuid 1
client run time:521 milliseconds. The average ipc time of procesing a piece of meaage is 0.327 milliseconds.
Queue
python tcp_server.py ----chty q
client uuid 1
client run time:16015 milliseconds. The average ipc time of procesing a piece of meaage is 14.511 milliseconds.
网友评论