使用python的struct模块,将浮点数据编码成网络字节序,便于在网络进行数据交换。
byte_codec.py
import struct
# data types
INT8_TYPE=0x01
INT16_TYPE=0x02
INT32_TYPE=0x03
FLOAT_TYPE=0x04
DOUBLE_TYPE=0x05
STRING_TYPE=0x06
def varint_encode(number):
s = b""
while True:
byte = number % 128
number = number // 128
# If there are more digits to encode, set the top bit of this digit
if number > 0:
byte = byte|0x80
s = s + struct.pack("!B", byte)
if number == 0:
return s
def varint_decode(buffer):
occupy=0
multy=1
remain=b''
number=0
l=len(buffer)
while occupy<l:
if buffer[occupy]&128:
number=number+(buffer[occupy]&127)*multy
occupy=occupy+1
multy*=128
else:
number=number+(buffer[occupy]&127)*multy
occupy=occupy+1
break
if occupy<l:
remain+=buffer[occupy:]
return number,remain
def peek_int_encode_len(data):
length=-1;
if data<0:
raise Exception("data less zero")
if data>=0 and data<=255:
length=2
elif data<=65535:
length=3
elif data<=4294967295:
length=5
else:
raise Exception("out of range")
return length
def encode_int(data):
result=b''
if data<0:
raise Exception("data less zero")
if data>=0 and data<=255:
result+=struct.pack("B",INT8_TYPE)
result+=struct.pack("B",data)
elif data<=65535:
result+=struct.pack("B",INT16_TYPE)
result+=struct.pack("!H",data)
elif data<=4294967295:
result+=struct.pack("B",INT32_TYPE)
result+=struct.pack("!I",data)
else:
raise Exception("out of range")
return result
def encode_float(data):
result=struct.pack("B",FLOAT_TYPE)
result+=struct.pack("!f",data)
return result
def encode_double(data):
result=struct.pack("B",DOUBLE_TYPE)
result+=struct.pack("!d",data)
return result
def encode_string(content):
result=struct.pack("B",STRING_TYPE)
l=len(content)
temp=bytes(content, 'utf-8')
result+=varint_encode(l)
format=str(l)+"s"
result+=struct.pack(format,temp)
return result
def decode_one_element(buffer):
l=len(buffer)
type=buffer[0]
remain=b''
if type==INT8_TYPE:
v=buffer[1]
if l>2:
remain=buffer[2:]
return v,remain
elif type==INT16_TYPE:
temp=buffer[1:3]
v,=struct.unpack("!H",temp)
if l>3:
remain=buffer[3:]
return v,remain
elif type==INT32_TYPE:
temp=buffer[1:5]
v,=struct.unpack("!I",temp)
if l>5:
remain=buffer[5:]
return v,remain
elif type==FLOAT_TYPE:
temp=buffer[1:5]
v,=struct.unpack("!f",temp)
if l>5:
remain=buffer[5:]
return v,remain
elif type==DOUBLE_TYPE:
temp=buffer[1:9]
v,=struct.unpack("!d",temp)
if l>9:
remain=buffer[9:]
return v,remain
elif type==STRING_TYPE:
n,temp=varint_decode(buffer[1:])
remain_len=len(temp)-n
content=temp[0:n]
if remain_len>0:
remain=temp[n:]
return content,remain
py_codec.py
import struct
import byte_codec as bc
def test_data_deserialization(buffer):
remain=buffer
while len(remain)>0:
v,remain=bc.decode_one_element(remain)
print (v)
def test_data_serialization():
a=1234
b=456.123456
c="1234567890"
d=12
temp=b''
temp+=bc.encode_int(a)
temp+=bc.encode_double(b)
temp+=bc.encode_string(c)
temp+=bc.encode_int(d)
length=len(temp)
result=bc.varint_encode(length)
result+=temp
return result
if __name__ == '__main__':
v=1234.45
buffer= struct.pack( ">f" ,v)
result=buffer.hex()
print (result)
v=4321.391
buffer= struct.pack( ">d" ,v)
result=buffer.hex()
print (result)
packet=b'\x44\x9a\x4e\x66\x40\xb0\xe1\x64\x18\x93\x74\xbc'
length=len(packet)
format=">f"+str(length-4)+"s"
a,remain= struct.unpack(format,packet)
print (type(a))
b,=struct.unpack( ">d" ,remain)
print (b)
c,=struct.unpack( ">d" ,packet[4:])
print (c)
result=test_data_serialization()
print(result.hex())
n,remain=bc.varint_decode(result)
test_data_deserialization(remain)
我也实现C语言版本,便于交叉验证。
pack.cpp
//https://www.cnblogs.com/luxiaoxun/archive/2012/09/05/2671697.html
#include <iostream>
#include <stdio.h>
#include <memory.h>
#include <stdint.h>
static uint16_t HostToNet16(uint16_t x) { return __builtin_bswap16(x); }
static uint32_t HostToNet32(uint32_t x) { return __builtin_bswap32(x); }
static uint64_t HostToNet64(uint64_t x) { return __builtin_bswap64(x); }
static uint16_t NetToHost16(uint16_t x) { return HostToNet16(x); }
static uint32_t NetToHost32(uint32_t x) { return HostToNet32(x); }
static uint64_t NetToHost64(uint64_t x) { return HostToNet64(x); }
using namespace std;
void pack_float(float *v,char *buffer){
int a=0;
memcpy(&a,v,sizeof(int));
a=HostToNet32(a);
memcpy(buffer,&a,sizeof(int));
}
float unpack_float(char *buffer){
float v=0;
int a=0;
memcpy(&a,buffer,sizeof(int));
int tmp=NetToHost32(a);
memcpy(&v,&tmp,sizeof(int));
return v;
}
void pack_double(double *v,char*buffer){
uint64_t a=0;
memcpy(&a,v,sizeof(uint64_t));
a=HostToNet64(a);
memcpy(buffer,&a,sizeof(uint64_t));
}
double unpack_double(char *buffer){
double v=0;
uint64_t a=0;
memcpy(&a,buffer,sizeof(uint64_t));
uint64_t tmp=NetToHost64(a);
memcpy(&v,&tmp,sizeof(uint64_t));
return v;
}
void test_int(){
int a=0x12345678;
int b=HostToNet32(a);
char *p =(char*)(&b);
for(int i=0;i<4;i++){
printf("%hhx",p[i]);
}
printf("\n");
}
void test_float_codec(){
float v=1234.45;
char buffer[4];
pack_float(&v,buffer);
float decode=unpack_float(buffer);
std::cout<<decode<<std::endl;
for(int i=0;i<4;i++){
printf("%hhx",buffer[i]);
}
printf("\n");
}
void test_double_codec(){
double v=1234.45;
char buffer[8];
pack_double(&v,buffer);
double decode=unpack_double(buffer);
std::cout<<decode<<std::endl;
for(int i=0;i<8;i++){
printf("%hhx",buffer[i]);
}
printf("\n");
}
int main()
{
test_float_codec();
test_double_codec();
return 0;
}
网友评论