Thrift

作者: 暂且听风吟一曲 | 来源:发表于2017-11-06 23:05 被阅读0次

    架构图

    业务层:根据业务逻辑,实现thrift文件中接口
    接口层:根据thrift文件,生成框架代码
    协议层:对数据流进行序列化(二进制、json)
    传输层:负责网络传输

    C/S模型

    Client端

    在Client端,用户首先需要依次指定transport类型、protocol类型和client对象。client对象负责将函数名以及参数发送给server端,并且解析server端返回的结果。

    Server端
    1. 在Processor的初始化过程,绑定用户实现的handler。
    class MyService:
      def func(self, n1, n2):
            pass
    handler = MyService()
    processor = MyService.Processor(handler)
    
    1. Server端建立transport,并设置protocol。
    transport = TSocket.TServerSocket(port=9090)
    tfactory = TTransport.TBufferedTransportFactory()
    pfactory = TBinaryProtocol.TBinaryProtocolFactory()
    
    1. 创建server对象并开始服务。
    server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
    server.serve()
    

    Thrift内部流程

    序列化支持
    1. ttypes.py
      thrift文件中定义的数据格式,编译后的存放在ttypes.py中。Thrift将用户自定义的struct、enum和exception都转换为python class。
    class SharedStruct {
      thrift_spec = (
        None, #0
        (1, TType.I32, 'key', None, None, ) #1
        (2, TType.STRING, 'value', None, None, ) # 2
      )
      
      def __init__(self, key=None, value=None):
        self.key = key
        self.value = value
    
      def read(self, iprot):
        .....
        iprot.readStructBegin()
        while True:
          (fname, ftype, fid) = iprot.readFieldBegin()
          if ftype == TType.STOP:
            break
          if fid == 1:
            if ftype == TType.I32:
              self.key = iprot.readI32()
            else:
              iprot.skip(ftype)
          elif:
            if ftype == TType.STRING:
              self.value = iprot.readString()
            else:
              iport.skip(ftype)  
          else:
            iprot.skip(ftype)
          iprot.readFieldEnd()
        iprort.readStructEnd()
    
        def write(self, oprot):
          oprot.writeStructBegin('SharedStruct')
          if self.key is not  None:
            oprot.writeFieldBegin('key', TType.I32, 1)
            oprot.wirteI32(self.key)
            oprot.writeFieldEnd()
          ....
    

    可以看出,Thrift为自定义类型生成了spec结构、read方法和write方法。在之后的分析中,spec结构对应于自定义类型的内部数据参数,
    对数据结构的解析将起到指导性的作用。read方法和write方法是互逆的过程,write方法按照struct name、'key'、key_type、key_value、value_tag、value_type、value_value的顺序将SharedStruct的内容写入output protocol中,而read方法按照相应的顺序,从input protocol中读取各个字段。
    注意:在TBinaryProtocol中,readStructBegin和writeStructBegin都是空操作,所以虽然传参不同,但是实际上是互斥空操作。

    Client端
    client.open()
    

    连接指定的ip地址、端口。

    sum_ = client.add(1, 1)
    

    调用client对象的send_xxx和recv_xxx方法。在send_xxx中,使用xxx_args的write方法,将参数发送给Server端;在recv_xxx中,使用xxx_result的read方法来解析Server端返回的结果。Server端会在返回结果的末尾设置TType.Stop标识来表征消息的结束。

    Server端
    • 当设置了Server端的server对象后,会调用其serve方法,该方法会建立监听,等待Client端的连接。
        self.serverTransport.listen()
    
    • 当检测到Client端发起rpc调用后,会建立相应的数据对象,并调动Processor的process进行处理。
        while True:
          client = self.serverTransport.accept()
          if not client:
            continue
          itrans = self.inputTransportFactory.getTransport(client)
          otrans = self.outputTransportFactory.getTransport(client)
          iprot = self.inputProtocolFactory.getProtocol(itrans)
          oprot = self.outputProtocolFactory.getProtocol(otrans)
          try:
            while True:
              self.processor.process(iprot, oprot)
          itrans.close()
          otrans.close()
    

    itrans、otrans、iprot、oprot都是通过工厂方法实例化。itrans、otrans负责Server端与Client端的数据传输,iprot、oprot负责解码工作(应该相同)。

    • process会依据Client端发送的函数标识(add)进行分发,交由process_add方法进行处理。process_add方法会解析参数并调用handler的add方法,并使用xxx_result的write方法来返回结果。
    序列化的流程
    1. 先writeMessageBegin表示开始传输消息了,写消息头。Message里面定义了方法名,调用的类型,消息seqId
    2. 写消息体。如果参数是一个类,就writeStructBegin
    3. 接下来写字段,writeFieldBegin, 这个方法会写接下来的字段的数据类型和顺序号。这个顺序号是Thrfit对要传输的字段的一个编码,从1开始
    4. 如果是一个集合就writeListBegin/writeMapBegin,如果是一个基本数据类型,比如int, 就直接writeI32
    5. 每个复杂数据类型写完都调用writeXXXEnd,直到writeMessageEnd结束
    6. 读消息时根据数据类型读取相应的长度
    Protocol

    用于信息的序列化过程,分为write和read对称的两部分。

    方法/字段 含义 内容
    Message 消息传输的头部 name(方法名)+ type + seqid
    FieldB/E/Stop type + id(IDL中的索引) / None / STOP
    Struct name
    Map ktype + vtype + size
    List etype + size
    Set etype + size
    Bool/Byte/I16/I32/I64/Double/String/Binary bool_val
    skip ttype

    STRUCT

    struct_begin
    while True:  
          field_begin
          if ttype == STOP:
              break
         skip(ttype)
        field_end
    struct_end
    

    MAP

    map_begin
    for i in range(size):
      skip(ktype)
      skip(vtype)
    map_end
    

    SET / LIST

    begin
    for i in range(size):
      skip(ttype)
    end
    
    HelloService.py
    • Iface
      service的接口描述类;

    • Client
      客户端发送请求 + 接收回馈的方法类;

    class Client(Iface):
      def __init__(self, iprot, oprot=None):
        self._iport = self._oprot = None
        if oport is not None:
          self._oprot = oprot
        self._seqid = 0
      def getStruct(self, key):
        self.send_getStruct(key)
        return self.recv_getStruct()
    
      def send_getStruct(self, key):
        self._oprot.writeMessageBegin('getStruct', TMessageType.CALL, self._seqid)
        args = getStruct_args()
        args.key = key
        args.write(self._oprot)
        self._oprot.writeMessageEnd()
        self._oprot.trans.flush()
    
      def recv_getStruct(self):
        iprot = self._iprot
        (fname, mtype, rseqid) = iprot.readMessageBegin()
        if mtype == TMessageType.EXCEPTION:
          ...
        result = getStruct_result()
        result.read(iprot)
        iprot.readMessageEnd()
        if result.success is not None:
          return result.success
        raise TApplicationException(...)
    

    Client实现的用户声明方法的方式为send_xxx然后recv_xxx,这个模式符合rpc的调用思想,先将请求发出去,然后等待接收远端的回应。

    send_xxx && recv_xxx

    在send_xxx中,首先会写入name、ttyp和seqid的信息。然后通过获取getStruct方法的参数信息,并写入到输出流中。
    获取参数信息的方式如下:

    class getStruct_args:
      thrift_spec = (
        None, # 0
        (1, TType.I32, 'key', None, None, ), # 1
      )
    
      def __init__(self, key=None,):
        self.key = key
    
      def read(self, iprot):
        ....
        iprot.readStructBegin()
        while True:
          (fname, ftype, fid) = iprot.readFieldBegin()
          if ftype == TType.STOP:
            break
          if fid == 1:
            if ftype == TType.I32:
              self.key = iprot.readI32()
            else:
              iprot.skip(ftype)
          else:
            iprot.skip(ftype)
          iprot.readFieldEnd()
        iprot.readStructEnd()
    
      def write(self, oprot):
        ....
        oprot.writeStructBegin('getStruct_args')
        if self.key is not None:
          oprot.writeFieldBegin('key', TType.I32, 1)
          oprot.writeI32(self.key)
          oprot.writeFieldEnd()
        oprot.writeFieldStop()
        oprot.writeStructEnd()
    

    send_xxx通过调用getStruct_args的write方法来发送参数信息到远端,可以猜想到,server端会调用getStruct_args的read方法来解析参数信息。
    当server端处理完成后,会发送结果到client端。对结果的处理类似于参数处理的逆过程。由getStruct_result负责:

    class getStruct_result:
      thrift_spec = (
        (0, TType.STRUCT, 'success', (SharedStruct, SharedStruct.thrift_spec), None, ), # 0
      )
    
      def __init__(self, success=None,):
        self.success = success
    
      def read(self, iprot):
        ....
        iprot.readStructBegin()
        while True:
          (fname, ftype, fid) = iprot.readFieldBegin()
          if ftype == TType.STOP:
            break
          if fid == 0:
            if ftype == TType.STRUCT:
              self.success = SharedStruct()
              self.success.read(iprot)
            else:
              iprot.skip(ftype)
          else:
            iprot.skip(ftype)
          iprot.readFieldEnd()
        iprot.readStructEnd()
    
      def write(self, oprot):
        ....
        oprot.writeStructBegin('getStruct_result')
        if self.success is not None:
          oprot.writeFieldBegin('success', TType.STRUCT, 0)
          self.success.write(oprot)
          oprot.writeFieldEnd()
        oprot.writeFieldStop()
        oprot.writeStructEnd()
    

    server端使用getStruct_result的write来发送结果,client端会调用read来响应的解析结果。从代码中可以看出,结果success为TType.STRUCT类型,也就是SharedStruct结构体,而对SharedStruct结构体的解释就位于SharedStruct类中的thrift_spec字段。

    • Processor
      服务端接收请求 + 调用处理函数 + 返回结果的方法类;
      在Processor中注册有函数名所对应的内部处理方法,当调用process时,会执行预先给定的handler并返回处理结果。
      对于执行被调用函数的情况,会返回INTERNAL_ERROR的错误。

      1. process function
        name, type, seqid = readMessageBegin
        校验name是否存在
        call process_name
      
      1. process_xxx
        调用Get_args来读取输入
        调用处理函数并获取返回值
        作为TMessageType.REPLY写回
      
    • Get_args

      1. write
        writeStructBeigin('Get_args')
        writeField(参数名称,TType.STRUCT, 1(索引))
        自定义类型的write
      

    相关文章

      网友评论

          本文标题:Thrift

          本文链接:https://www.haomeiwen.com/subject/njbdvttx.html