架构图
业务层:根据业务逻辑,实现thrift文件中接口
接口层:根据thrift文件,生成框架代码
协议层:对数据流进行序列化(二进制、json)
传输层:负责网络传输
C/S模型
Client端
在Client端,用户首先需要依次指定transport类型、protocol类型和client对象。client对象负责将函数名以及参数发送给server端,并且解析server端返回的结果。
Server端
- 在Processor的初始化过程,绑定用户实现的handler。
class MyService:
def func(self, n1, n2):
pass
handler = MyService()
processor = MyService.Processor(handler)
- Server端建立transport,并设置protocol。
transport = TSocket.TServerSocket(port=9090)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
- 创建server对象并开始服务。
server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
server.serve()
Thrift内部流程
序列化支持
- ttypes.py
thrift文件中定义的数据格式,编译后的存放在ttypes.py中。Thrift将用户自定义的struct、enum和exception都转换为python class。
class SharedStruct {
thrift_spec = (
None, #0
(1, TType.I32, 'key', None, None, ) #1
(2, TType.STRING, 'value', None, None, ) # 2
)
def __init__(self, key=None, value=None):
self.key = key
self.value = value
def read(self, iprot):
.....
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
if fid == 1:
if ftype == TType.I32:
self.key = iprot.readI32()
else:
iprot.skip(ftype)
elif:
if ftype == TType.STRING:
self.value = iprot.readString()
else:
iport.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
iprort.readStructEnd()
def write(self, oprot):
oprot.writeStructBegin('SharedStruct')
if self.key is not None:
oprot.writeFieldBegin('key', TType.I32, 1)
oprot.wirteI32(self.key)
oprot.writeFieldEnd()
....
可以看出,Thrift为自定义类型生成了spec结构、read方法和write方法。在之后的分析中,spec结构对应于自定义类型的内部数据参数,
对数据结构的解析将起到指导性的作用。read方法和write方法是互逆的过程,write方法按照struct name、'key'、key_type、key_value、value_tag、value_type、value_value的顺序将SharedStruct的内容写入output protocol中,而read方法按照相应的顺序,从input protocol中读取各个字段。
注意:在TBinaryProtocol中,readStructBegin和writeStructBegin都是空操作,所以虽然传参不同,但是实际上是互斥空操作。
Client端
client.open()
连接指定的ip地址、端口。
sum_ = client.add(1, 1)
调用client对象的send_xxx和recv_xxx方法。在send_xxx中,使用xxx_args的write方法,将参数发送给Server端;在recv_xxx中,使用xxx_result的read方法来解析Server端返回的结果。Server端会在返回结果的末尾设置TType.Stop标识来表征消息的结束。
Server端
- 当设置了Server端的server对象后,会调用其serve方法,该方法会建立监听,等待Client端的连接。
self.serverTransport.listen()
- 当检测到Client端发起rpc调用后,会建立相应的数据对象,并调动Processor的process进行处理。
while True:
client = self.serverTransport.accept()
if not client:
continue
itrans = self.inputTransportFactory.getTransport(client)
otrans = self.outputTransportFactory.getTransport(client)
iprot = self.inputProtocolFactory.getProtocol(itrans)
oprot = self.outputProtocolFactory.getProtocol(otrans)
try:
while True:
self.processor.process(iprot, oprot)
itrans.close()
otrans.close()
itrans、otrans、iprot、oprot都是通过工厂方法实例化。itrans、otrans负责Server端与Client端的数据传输,iprot、oprot负责解码工作(应该相同)。
- process会依据Client端发送的函数标识(add)进行分发,交由process_add方法进行处理。process_add方法会解析参数并调用handler的add方法,并使用xxx_result的write方法来返回结果。
序列化的流程
- 先writeMessageBegin表示开始传输消息了,写消息头。Message里面定义了方法名,调用的类型,消息seqId
- 写消息体。如果参数是一个类,就writeStructBegin
- 接下来写字段,writeFieldBegin, 这个方法会写接下来的字段的数据类型和顺序号。这个顺序号是Thrfit对要传输的字段的一个编码,从1开始
- 如果是一个集合就writeListBegin/writeMapBegin,如果是一个基本数据类型,比如int, 就直接writeI32
- 每个复杂数据类型写完都调用writeXXXEnd,直到writeMessageEnd结束
- 读消息时根据数据类型读取相应的长度
Protocol
用于信息的序列化过程,分为write和read对称的两部分。
方法/字段 | 含义 | 内容 |
---|---|---|
Message | 消息传输的头部 | name(方法名)+ type + seqid |
FieldB/E/Stop | type + id(IDL中的索引) / None / STOP | |
Struct | name | |
Map | ktype + vtype + size | |
List | etype + size | |
Set | etype + size | |
Bool/Byte/I16/I32/I64/Double/String/Binary | bool_val | |
skip | ttype |
STRUCT
struct_begin
while True:
field_begin
if ttype == STOP:
break
skip(ttype)
field_end
struct_end
MAP
map_begin
for i in range(size):
skip(ktype)
skip(vtype)
map_end
SET / LIST
begin
for i in range(size):
skip(ttype)
end
HelloService.py
-
Iface
service的接口描述类; -
Client
客户端发送请求 + 接收回馈的方法类;
class Client(Iface):
def __init__(self, iprot, oprot=None):
self._iport = self._oprot = None
if oport is not None:
self._oprot = oprot
self._seqid = 0
def getStruct(self, key):
self.send_getStruct(key)
return self.recv_getStruct()
def send_getStruct(self, key):
self._oprot.writeMessageBegin('getStruct', TMessageType.CALL, self._seqid)
args = getStruct_args()
args.key = key
args.write(self._oprot)
self._oprot.writeMessageEnd()
self._oprot.trans.flush()
def recv_getStruct(self):
iprot = self._iprot
(fname, mtype, rseqid) = iprot.readMessageBegin()
if mtype == TMessageType.EXCEPTION:
...
result = getStruct_result()
result.read(iprot)
iprot.readMessageEnd()
if result.success is not None:
return result.success
raise TApplicationException(...)
Client实现的用户声明方法的方式为send_xxx然后recv_xxx,这个模式符合rpc的调用思想,先将请求发出去,然后等待接收远端的回应。
send_xxx && recv_xxx
在send_xxx中,首先会写入name、ttyp和seqid的信息。然后通过获取getStruct方法的参数信息,并写入到输出流中。
获取参数信息的方式如下:
class getStruct_args:
thrift_spec = (
None, # 0
(1, TType.I32, 'key', None, None, ), # 1
)
def __init__(self, key=None,):
self.key = key
def read(self, iprot):
....
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
if fid == 1:
if ftype == TType.I32:
self.key = iprot.readI32()
else:
iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
iprot.readStructEnd()
def write(self, oprot):
....
oprot.writeStructBegin('getStruct_args')
if self.key is not None:
oprot.writeFieldBegin('key', TType.I32, 1)
oprot.writeI32(self.key)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
send_xxx通过调用getStruct_args的write方法来发送参数信息到远端,可以猜想到,server端会调用getStruct_args的read方法来解析参数信息。
当server端处理完成后,会发送结果到client端。对结果的处理类似于参数处理的逆过程。由getStruct_result负责:
class getStruct_result:
thrift_spec = (
(0, TType.STRUCT, 'success', (SharedStruct, SharedStruct.thrift_spec), None, ), # 0
)
def __init__(self, success=None,):
self.success = success
def read(self, iprot):
....
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
if fid == 0:
if ftype == TType.STRUCT:
self.success = SharedStruct()
self.success.read(iprot)
else:
iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
iprot.readStructEnd()
def write(self, oprot):
....
oprot.writeStructBegin('getStruct_result')
if self.success is not None:
oprot.writeFieldBegin('success', TType.STRUCT, 0)
self.success.write(oprot)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
server端使用getStruct_result的write来发送结果,client端会调用read来响应的解析结果。从代码中可以看出,结果success为TType.STRUCT类型,也就是SharedStruct结构体,而对SharedStruct结构体的解释就位于SharedStruct类中的thrift_spec字段。
-
Processor
服务端接收请求 + 调用处理函数 + 返回结果的方法类;
在Processor中注册有函数名所对应的内部处理方法,当调用process时,会执行预先给定的handler并返回处理结果。
对于执行被调用函数的情况,会返回INTERNAL_ERROR的错误。- process function
name, type, seqid = readMessageBegin 校验name是否存在 call process_name
- process_xxx
调用Get_args来读取输入 调用处理函数并获取返回值 作为TMessageType.REPLY写回
-
Get_args
- write
writeStructBeigin('Get_args') writeField(参数名称,TType.STRUCT, 1(索引)) 自定义类型的write
网友评论