Go Thrift RPC

作者: carlSQ | 来源:发表于2018-02-08 15:46 被阅读89次

    引言

    年底了,最近也闲了,顺便给自己接近一年的后端生涯,做一些知识总结。

    Thrift Server 架构流程图

    image.png

    Thrift Server

    Go 里面开启一个TCP Server 服务很简单,这得益于go net 库对底层socket的封装。一个典型的Go Server端程序大致如下:

    func handleConn(conn net.Conn) {
        defer conn.Close()
        defer func() {
            if e := recover(); e != nil {
                fmt.Printf("panic in processor: %s: %s", e, debug.Stack())
            }
        }()
        for {
            // read request data from the connection do something
            // write response data to the connection
        }
    }
    
    func main() {
        l, err := net.Listen("tcp", ":16888")
        if err != nil {
            fmt.Println("listen error:", err)
            return
        }
    
        for {
            conn, err := l.Accept()
            if err != nil {
                fmt.Println("accept error:", err)
                break
            }
           go handleConn(conn)
        }
    }
    

    而用Thrift库 RPC Server端程序大致如下:

        handler := &Sharpshooter.Sharpshooter{}
        processor := player.NewPlayerServerProcessor(handler)
        serverTransport, err := thrift.NewTServerSocket("0.0.0.0:80")
        if err != nil {
            log.Fatalln("Error:", err)
        }
        server := thrift.NewTSimpleServer2(processor, serverTransport)
        err  = server.Serve()
        if err != nil {
            log.Fatalln("Error:", err)
        }
    

    上面的代码到底做了啥?下面我们具体分析。
    其中Sharpshooter 提供如下接口供远程调用,对应上面架构流程图的Handler

    type PlayerServer interface {
        Ping() (r bool, err error)
    
        UploadMap(gamemap [][]int32) (err error)
    
        UploadParamters(arguments *Args_) (err error)
        
        AssignTanks(tanks []int32) (err error)
        
        LatestState(state *GameState) (err error)
        
        GetNewOrders() (r []*Order, err error)
    }
    

    而在Thrift 源码中,定义了一个 TServer interface如下:

    type TServer interface {
        ProcessorFactory() TProcessorFactory
        ServerTransport() TServerTransport
        InputTransportFactory() TTransportFactory
        OutputTransportFactory() TTransportFactory
        InputProtocolFactory() TProtocolFactory
        OutputProtocolFactory() TProtocolFactory
    
        // Starts the server
        Serve() error
        // Stops the server. This is optional on a per-implementation basis. Not
        // all servers are required to be cleanly stoppable.
        Stop() error
    }
    
    

    满足以上的接口都可以认为是一个Server(前提是相应方法的实现要正确)。
    TProcessorFactory 对应上面架构流程图的Processor 一个连接处理器。
    TServerTransport 对应流程图Socket。
    TTransportFactory 对应流程图Transport。
    TProtocolFactory 对应流程图Protocol。

    Thrift 定义了一个TSimpleServer 实现了个简单的Server

    type TSimpleServer struct {
        quit chan struct{}
    
        processorFactory       TProcessorFactory
        serverTransport        TServerTransport
        inputTransportFactory  TTransportFactory
        outputTransportFactory TTransportFactory
        inputProtocolFactory   TProtocolFactory
        outputProtocolFactory  TProtocolFactory
        sync.WaitGroup
    }
    

    TSimpleServer 的Serve() error 函数实现

    func (p *TSimpleServer) Listen() error {
        return p.serverTransport.Listen()
    }
    
    func (p *TSimpleServer) AcceptLoop() error {
        for {
            client, err := p.serverTransport.Accept()
            if err != nil {
                select {
                case <-p.quit:
                    return nil
                default:
                }
                return err
            }
            if client != nil {
                p.Add(1)
                go func() {
                    if err := p.processRequests(client); err != nil {
                        log.Println("error processing request:", err)
                    }
                }()
            }
        }
    }
    
    func (p *TSimpleServer) Serve() error {
        err := p.Listen()
        if err != nil {
            return err
        }
        p.AcceptLoop()
        return nil
    }
    

    其实就是通过serverTransport监听端口号,接受连接, processRequests 方法处理连接。

    TServerSocket

    上面简单的Thrift Server代码实例 serverTransport 初始化如下:
    serverTransport, err := thrift.NewTServerSocket("0.0.0.0:80")

    type TServerSocket struct {
        listener      net.Listener
        addr          net.Addr
        clientTimeout time.Duration
    
        // Protects the interrupted value to make it thread safe.
        mu          sync.RWMutex
        interrupted bool
    }
    

    其实就是一个TServerSocket 实例,实现了 TServerTransport interface ,监听端口号,接受连接,具体实现代码就是调用net 库,监听端口号,接受连接。

    type TServerTransport interface {
        Listen() error
        Accept() (TTransport, error)
        Close() error
    
        // Optional method implementation. This signals to the server transport
        // that it should break out of any accept() or listen() that it is currently
        // blocked on. This method, if implemented, MUST be thread safe, as it may
        // be called from a different thread context than the other TServerTransport
        // methods.
        Interrupt() error
    }
    

    这里的TServerSocket 不是 Posix标准里的socket,而是Thrift 对Go的net库网络操作的抽象用于Server端。

    Processor

    processor 处理接受的每个连接,看看 processRequests()函数的具体实现:

    func (p *TSimpleServer) processRequests(client TTransport) error {
        defer p.Done()
    
        processor := p.processorFactory.GetProcessor(client)
        inputTransport := p.inputTransportFactory.GetTransport(client)
        outputTransport := p.outputTransportFactory.GetTransport(client)
        inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
        outputProtocol := p.outputProtocolFactory.GetProtocol(outputTransport)
        defer func() {
            if e := recover(); e != nil {
                log.Printf("panic in processor: %s: %s", e, debug.Stack())
            }
        }()
    
        if inputTransport != nil {
            defer inputTransport.Close()
        }
        if outputTransport != nil {
            defer outputTransport.Close()
        }
        for {
            select {
            case <-p.quit:
                return nil
            default:
            }
    
            ok, err := processor.Process(inputProtocol, outputProtocol)
            if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
                return nil
            } else if err != nil {
                return err
            }
            if err, ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD {
                continue
            }
            if !ok {
                break
            }
        }
        return nil
    }
    

    方法主要获取处理器processor,获取输入io inputTransport, 输出io outputTransport, 输入数据协议inputProtocol,输出数据协议outputProtocol,最后由处理器 processor.Process(inputProtocol, outputProtocol)处理输入输出。
    上面的例子中构造了一个 processor := player.NewPlayerServerProcessor(handler) 处理器,看看代码具体实现:

    func NewPlayerServerProcessor(handler PlayerServer) *PlayerServerProcessor {
        self14 := &PlayerServerProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)}
        self14.processorMap["ping"] = &playerServerProcessorPing{handler: handler}
        self14.processorMap["uploadMap"] = &playerServerProcessorUploadMap{handler: handler}
        self14.processorMap["uploadParamters"] = &playerServerProcessorUploadParamters{handler: handler}
        self14.processorMap["assignTanks"] = &playerServerProcessorAssignTanks{handler: handler}
        self14.processorMap["latestState"] = &playerServerProcessorLatestState{handler: handler}
        self14.processorMap["getNewOrders"] = &playerServerProcessorGetNewOrders{handler: handler}
        return self14
    }
    

    构造processor时主要给RPC调用接口做了接口名到接口处理一个key-value 映射。
    接下来看看processor的Process(inputProtocol, outputProtocol)函数。

    func (p *PlayerServerProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
        name, _, seqId, err := iprot.ReadMessageBegin()
        if err != nil {
            return false, err
        }
        if processor, ok := p.GetProcessorFunction(name); ok {
            return processor.Process(seqId, iprot, oprot)
        }
        iprot.Skip(thrift.STRUCT)
        iprot.ReadMessageEnd()
        x15 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name)
        oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId)
        x15.Write(oprot)
        oprot.WriteMessageEnd()
        oprot.Flush()
        return false, x15
    
    }
    

    只要是从输入数据协议里面读取rpc 接口名字,和这次rpc 调用id,通过GetProcessorFunction获取对接接口的处理单元,处理这次请求调用。

    func (p *PlayerServerProcessor) GetProcessorFunction(key string) (processor thrift.TProcessorFunction, ok bool) {
        processor, ok = p.processorMap[key]
        return processor, ok
    }
    

    以getNewOrders 接口为例,

    type playerServerProcessorGetNewOrders struct {
        handler PlayerServer
    }
    
    func (p *playerServerProcessorGetNewOrders) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
        args := PlayerServerGetNewOrdersArgs{}
        if err = args.Read(iprot); err != nil {
            iprot.ReadMessageEnd()
            x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error())
            oprot.WriteMessageBegin("getNewOrders", thrift.EXCEPTION, seqId)
            x.Write(oprot)
            oprot.WriteMessageEnd()
            oprot.Flush()
            return false, err
        }
    
        iprot.ReadMessageEnd()
        result := PlayerServerGetNewOrdersResult{}
        var retval []*Order
        var err2 error
        if retval, err2 = p.handler.GetNewOrders(); err2 != nil {
            x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing getNewOrders: "+err2.Error())
            oprot.WriteMessageBegin("getNewOrders", thrift.EXCEPTION, seqId)
            x.Write(oprot)
            oprot.WriteMessageEnd()
            oprot.Flush()
            return true, err2
        } else {
            result.Success = retval
        }
        if err2 = oprot.WriteMessageBegin("getNewOrders", thrift.REPLY, seqId); err2 != nil {
            err = err2
        }
        if err2 = result.Write(oprot); err == nil && err2 != nil {
            err = err2
        }
        if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
            err = err2
        }
        if err2 = oprot.Flush(); err == nil && err2 != nil {
            err = err2
        }
        if err != nil {
            return
        }
        return true, err
    }
    

    从输入数据协议里面读取数据,反序列化构造getNewOrders 的接口参数,再调用用户业务层实现的p.handler.GetNewOrders()接口函数,将返回值通过输出数据协议序列化,通过TServerTransport 返回给客户端。

    Transport

    Transport 类似java里面的各种io,可以通过装饰器从一种io到另一种,到现在笔者还没搞懂java 的各种io 。

    // Encapsulates the I/O layer
    type TTransport interface {
        io.ReadWriteCloser
        Flusher
        ReadSizeProvider
    
        // Opens the transport for communication
        Open() error
    
        // Returns true if the transport is open
        IsOpen() bool
    }
    

    Thrift 里面提供了, TBufferedTransport,TFramedTransport,TMemoryBuffer,RichTransport,也可以自己扩展自己需要的Transport。

    Protocol

    Protocol 主要用来数据的序列化和反序列化。Thrift 里面提供了TBinaryProtocol,TCompactProtocol,TDebugProtocol,TJSONProtocol等,具体格式就不细说,笔者这篇博客简单描述了下TBinaryProtocol。

    
    type TProtocol interface {
        WriteMessageBegin(name string, typeId TMessageType, seqid int32) error
        WriteMessageEnd() error
        WriteStructBegin(name string) error
        WriteStructEnd() error
        WriteFieldBegin(name string, typeId TType, id int16) error
        WriteFieldEnd() error
        WriteFieldStop() error
        WriteMapBegin(keyType TType, valueType TType, size int) error
        WriteMapEnd() error
        WriteListBegin(elemType TType, size int) error
        WriteListEnd() error
        WriteSetBegin(elemType TType, size int) error
        WriteSetEnd() error
        WriteBool(value bool) error
        WriteByte(value int8) error
        WriteI16(value int16) error
        WriteI32(value int32) error
        WriteI64(value int64) error
        WriteDouble(value float64) error
        WriteString(value string) error
        WriteBinary(value []byte) error
    
        ReadMessageBegin() (name string, typeId TMessageType, seqid int32, err error)
        ReadMessageEnd() error
        ReadStructBegin() (name string, err error)
        ReadStructEnd() error
        ReadFieldBegin() (name string, typeId TType, id int16, err error)
        ReadFieldEnd() error
        ReadMapBegin() (keyType TType, valueType TType, size int, err error)
        ReadMapEnd() error
        ReadListBegin() (elemType TType, size int, err error)
        ReadListEnd() error
        ReadSetBegin() (elemType TType, size int, err error)
        ReadSetEnd() error
        ReadBool() (value bool, err error)
        ReadByte() (value int8, err error)
        ReadI16() (value int16, err error)
        ReadI32() (value int32, err error)
        ReadI64() (value int64, err error)
        ReadDouble() (value float64, err error)
        ReadString() (value string, err error)
        ReadBinary() (value []byte, err error)
    
        Skip(fieldType TType) (err error)
        Flush() (err error)
    
        Transport() TTransport
    }
    

    Thrift Client

    Client 端就简单多了,与服务端建立一个连接,再在连接上构建TTransport,选择与服务端对应的in Protocol 和 out Protocol。发送数据。以 getNewOrders为例。

    func (p *PlayerServerClient) GetNewOrders() (r []*Order, err error) {
        if err = p.sendGetNewOrders(); err != nil {
            return
        }
        return p.recvGetNewOrders()
    }
    
    func (p *PlayerServerClient) sendGetNewOrders() (err error) {
        oprot := p.OutputProtocol
        if oprot == nil {
            oprot = p.ProtocolFactory.GetProtocol(p.Transport)
            p.OutputProtocol = oprot
        }
        p.SeqId++
        if err = oprot.WriteMessageBegin("getNewOrders", thrift.CALL, p.SeqId); err != nil {
            return
        }
        args := PlayerServerGetNewOrdersArgs{}
        if err = args.Write(oprot); err != nil {
            return
        }
        if err = oprot.WriteMessageEnd(); err != nil {
            return
        }
        return oprot.Flush()
    }
    
    func (p *PlayerServerClient) recvGetNewOrders() (value []*Order, err error) {
        iprot := p.InputProtocol
        if iprot == nil {
            iprot = p.ProtocolFactory.GetProtocol(p.Transport)
            p.InputProtocol = iprot
        }
        method, mTypeId, seqId, err := iprot.ReadMessageBegin()
        if err != nil {
            return
        }
        if method != "getNewOrders" {
            err = thrift.NewTApplicationException(thrift.WRONG_METHOD_NAME, "getNewOrders failed: wrong method name")
            return
        }
        if p.SeqId != seqId {
            err = thrift.NewTApplicationException(thrift.BAD_SEQUENCE_ID, "getNewOrders failed: out of sequence response")
            return
        }
        if mTypeId == thrift.EXCEPTION {
            error12 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception")
            var error13 error
            error13, err = error12.Read(iprot)
            if err != nil {
                return
            }
            if err = iprot.ReadMessageEnd(); err != nil {
                return
            }
            err = error13
            return
        }
        if mTypeId != thrift.REPLY {
            err = thrift.NewTApplicationException(thrift.INVALID_MESSAGE_TYPE_EXCEPTION, "getNewOrders failed: invalid message type")
            return
        }
        result := PlayerServerGetNewOrdersResult{}
        if err = result.Read(iprot); err != nil {
            return
        }
        if err = iprot.ReadMessageEnd(); err != nil {
            return
        }
        value = result.GetSuccess()
        return
    }
    

    sendGetNewOrders 会将远程接口名,参数序列化之后发到Server. recvGetNewOrders 接受Server返回的数据反序列化。调用完成。

    总结

    先不评价Thrift 在众多RPC框架中怎么样(笔者用过的RPC太少)。但Thrift整个设计面向接口,层次清楚,很易于扩展,维护,可以学习下。

    相关文章

      网友评论

      本文标题:Go Thrift RPC

      本文链接:https://www.haomeiwen.com/subject/kugazxtx.html