Go Thrift RPC

作者: carlSQ | 来源:发表于2018-02-08 15:46 被阅读89次

引言

年底了,最近也闲了,顺便给自己接近一年的后端生涯,做一些知识总结。

Thrift Server 架构流程图

image.png

Thrift Server

Go 里面开启一个TCP Server 服务很简单,这得益于go net 库对底层socket的封装。一个典型的Go Server端程序大致如下:

func handleConn(conn net.Conn) {
    defer conn.Close()
    defer func() {
        if e := recover(); e != nil {
            fmt.Printf("panic in processor: %s: %s", e, debug.Stack())
        }
    }()
    for {
        // read request data from the connection do something
        // write response data to the connection
    }
}

func main() {
    l, err := net.Listen("tcp", ":16888")
    if err != nil {
        fmt.Println("listen error:", err)
        return
    }

    for {
        conn, err := l.Accept()
        if err != nil {
            fmt.Println("accept error:", err)
            break
        }
       go handleConn(conn)
    }
}

而用Thrift库 RPC Server端程序大致如下:

    handler := &Sharpshooter.Sharpshooter{}
    processor := player.NewPlayerServerProcessor(handler)
    serverTransport, err := thrift.NewTServerSocket("0.0.0.0:80")
    if err != nil {
        log.Fatalln("Error:", err)
    }
    server := thrift.NewTSimpleServer2(processor, serverTransport)
    err  = server.Serve()
    if err != nil {
        log.Fatalln("Error:", err)
    }

上面的代码到底做了啥?下面我们具体分析。
其中Sharpshooter 提供如下接口供远程调用,对应上面架构流程图的Handler

type PlayerServer interface {
    Ping() (r bool, err error)

    UploadMap(gamemap [][]int32) (err error)

    UploadParamters(arguments *Args_) (err error)
    
    AssignTanks(tanks []int32) (err error)
    
    LatestState(state *GameState) (err error)
    
    GetNewOrders() (r []*Order, err error)
}

而在Thrift 源码中,定义了一个 TServer interface如下:

type TServer interface {
    ProcessorFactory() TProcessorFactory
    ServerTransport() TServerTransport
    InputTransportFactory() TTransportFactory
    OutputTransportFactory() TTransportFactory
    InputProtocolFactory() TProtocolFactory
    OutputProtocolFactory() TProtocolFactory

    // Starts the server
    Serve() error
    // Stops the server. This is optional on a per-implementation basis. Not
    // all servers are required to be cleanly stoppable.
    Stop() error
}

满足以上的接口都可以认为是一个Server(前提是相应方法的实现要正确)。
TProcessorFactory 对应上面架构流程图的Processor 一个连接处理器。
TServerTransport 对应流程图Socket。
TTransportFactory 对应流程图Transport。
TProtocolFactory 对应流程图Protocol。

Thrift 定义了一个TSimpleServer 实现了个简单的Server

type TSimpleServer struct {
    quit chan struct{}

    processorFactory       TProcessorFactory
    serverTransport        TServerTransport
    inputTransportFactory  TTransportFactory
    outputTransportFactory TTransportFactory
    inputProtocolFactory   TProtocolFactory
    outputProtocolFactory  TProtocolFactory
    sync.WaitGroup
}

TSimpleServer 的Serve() error 函数实现

func (p *TSimpleServer) Listen() error {
    return p.serverTransport.Listen()
}

func (p *TSimpleServer) AcceptLoop() error {
    for {
        client, err := p.serverTransport.Accept()
        if err != nil {
            select {
            case <-p.quit:
                return nil
            default:
            }
            return err
        }
        if client != nil {
            p.Add(1)
            go func() {
                if err := p.processRequests(client); err != nil {
                    log.Println("error processing request:", err)
                }
            }()
        }
    }
}

func (p *TSimpleServer) Serve() error {
    err := p.Listen()
    if err != nil {
        return err
    }
    p.AcceptLoop()
    return nil
}

其实就是通过serverTransport监听端口号,接受连接, processRequests 方法处理连接。

TServerSocket

上面简单的Thrift Server代码实例 serverTransport 初始化如下:
serverTransport, err := thrift.NewTServerSocket("0.0.0.0:80")

type TServerSocket struct {
    listener      net.Listener
    addr          net.Addr
    clientTimeout time.Duration

    // Protects the interrupted value to make it thread safe.
    mu          sync.RWMutex
    interrupted bool
}

其实就是一个TServerSocket 实例,实现了 TServerTransport interface ,监听端口号,接受连接,具体实现代码就是调用net 库,监听端口号,接受连接。

type TServerTransport interface {
    Listen() error
    Accept() (TTransport, error)
    Close() error

    // Optional method implementation. This signals to the server transport
    // that it should break out of any accept() or listen() that it is currently
    // blocked on. This method, if implemented, MUST be thread safe, as it may
    // be called from a different thread context than the other TServerTransport
    // methods.
    Interrupt() error
}

这里的TServerSocket 不是 Posix标准里的socket,而是Thrift 对Go的net库网络操作的抽象用于Server端。

Processor

processor 处理接受的每个连接,看看 processRequests()函数的具体实现:

func (p *TSimpleServer) processRequests(client TTransport) error {
    defer p.Done()

    processor := p.processorFactory.GetProcessor(client)
    inputTransport := p.inputTransportFactory.GetTransport(client)
    outputTransport := p.outputTransportFactory.GetTransport(client)
    inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
    outputProtocol := p.outputProtocolFactory.GetProtocol(outputTransport)
    defer func() {
        if e := recover(); e != nil {
            log.Printf("panic in processor: %s: %s", e, debug.Stack())
        }
    }()

    if inputTransport != nil {
        defer inputTransport.Close()
    }
    if outputTransport != nil {
        defer outputTransport.Close()
    }
    for {
        select {
        case <-p.quit:
            return nil
        default:
        }

        ok, err := processor.Process(inputProtocol, outputProtocol)
        if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
            return nil
        } else if err != nil {
            return err
        }
        if err, ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD {
            continue
        }
        if !ok {
            break
        }
    }
    return nil
}

方法主要获取处理器processor,获取输入io inputTransport, 输出io outputTransport, 输入数据协议inputProtocol,输出数据协议outputProtocol,最后由处理器 processor.Process(inputProtocol, outputProtocol)处理输入输出。
上面的例子中构造了一个 processor := player.NewPlayerServerProcessor(handler) 处理器,看看代码具体实现:

func NewPlayerServerProcessor(handler PlayerServer) *PlayerServerProcessor {
    self14 := &PlayerServerProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)}
    self14.processorMap["ping"] = &playerServerProcessorPing{handler: handler}
    self14.processorMap["uploadMap"] = &playerServerProcessorUploadMap{handler: handler}
    self14.processorMap["uploadParamters"] = &playerServerProcessorUploadParamters{handler: handler}
    self14.processorMap["assignTanks"] = &playerServerProcessorAssignTanks{handler: handler}
    self14.processorMap["latestState"] = &playerServerProcessorLatestState{handler: handler}
    self14.processorMap["getNewOrders"] = &playerServerProcessorGetNewOrders{handler: handler}
    return self14
}

构造processor时主要给RPC调用接口做了接口名到接口处理一个key-value 映射。
接下来看看processor的Process(inputProtocol, outputProtocol)函数。

func (p *PlayerServerProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
    name, _, seqId, err := iprot.ReadMessageBegin()
    if err != nil {
        return false, err
    }
    if processor, ok := p.GetProcessorFunction(name); ok {
        return processor.Process(seqId, iprot, oprot)
    }
    iprot.Skip(thrift.STRUCT)
    iprot.ReadMessageEnd()
    x15 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name)
    oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId)
    x15.Write(oprot)
    oprot.WriteMessageEnd()
    oprot.Flush()
    return false, x15

}

只要是从输入数据协议里面读取rpc 接口名字,和这次rpc 调用id,通过GetProcessorFunction获取对接接口的处理单元,处理这次请求调用。

func (p *PlayerServerProcessor) GetProcessorFunction(key string) (processor thrift.TProcessorFunction, ok bool) {
    processor, ok = p.processorMap[key]
    return processor, ok
}

以getNewOrders 接口为例,

type playerServerProcessorGetNewOrders struct {
    handler PlayerServer
}

func (p *playerServerProcessorGetNewOrders) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
    args := PlayerServerGetNewOrdersArgs{}
    if err = args.Read(iprot); err != nil {
        iprot.ReadMessageEnd()
        x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error())
        oprot.WriteMessageBegin("getNewOrders", thrift.EXCEPTION, seqId)
        x.Write(oprot)
        oprot.WriteMessageEnd()
        oprot.Flush()
        return false, err
    }

    iprot.ReadMessageEnd()
    result := PlayerServerGetNewOrdersResult{}
    var retval []*Order
    var err2 error
    if retval, err2 = p.handler.GetNewOrders(); err2 != nil {
        x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing getNewOrders: "+err2.Error())
        oprot.WriteMessageBegin("getNewOrders", thrift.EXCEPTION, seqId)
        x.Write(oprot)
        oprot.WriteMessageEnd()
        oprot.Flush()
        return true, err2
    } else {
        result.Success = retval
    }
    if err2 = oprot.WriteMessageBegin("getNewOrders", thrift.REPLY, seqId); err2 != nil {
        err = err2
    }
    if err2 = result.Write(oprot); err == nil && err2 != nil {
        err = err2
    }
    if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
        err = err2
    }
    if err2 = oprot.Flush(); err == nil && err2 != nil {
        err = err2
    }
    if err != nil {
        return
    }
    return true, err
}

从输入数据协议里面读取数据,反序列化构造getNewOrders 的接口参数,再调用用户业务层实现的p.handler.GetNewOrders()接口函数,将返回值通过输出数据协议序列化,通过TServerTransport 返回给客户端。

Transport

Transport 类似java里面的各种io,可以通过装饰器从一种io到另一种,到现在笔者还没搞懂java 的各种io 。

// Encapsulates the I/O layer
type TTransport interface {
    io.ReadWriteCloser
    Flusher
    ReadSizeProvider

    // Opens the transport for communication
    Open() error

    // Returns true if the transport is open
    IsOpen() bool
}

Thrift 里面提供了, TBufferedTransport,TFramedTransport,TMemoryBuffer,RichTransport,也可以自己扩展自己需要的Transport。

Protocol

Protocol 主要用来数据的序列化和反序列化。Thrift 里面提供了TBinaryProtocol,TCompactProtocol,TDebugProtocol,TJSONProtocol等,具体格式就不细说,笔者这篇博客简单描述了下TBinaryProtocol。


type TProtocol interface {
    WriteMessageBegin(name string, typeId TMessageType, seqid int32) error
    WriteMessageEnd() error
    WriteStructBegin(name string) error
    WriteStructEnd() error
    WriteFieldBegin(name string, typeId TType, id int16) error
    WriteFieldEnd() error
    WriteFieldStop() error
    WriteMapBegin(keyType TType, valueType TType, size int) error
    WriteMapEnd() error
    WriteListBegin(elemType TType, size int) error
    WriteListEnd() error
    WriteSetBegin(elemType TType, size int) error
    WriteSetEnd() error
    WriteBool(value bool) error
    WriteByte(value int8) error
    WriteI16(value int16) error
    WriteI32(value int32) error
    WriteI64(value int64) error
    WriteDouble(value float64) error
    WriteString(value string) error
    WriteBinary(value []byte) error

    ReadMessageBegin() (name string, typeId TMessageType, seqid int32, err error)
    ReadMessageEnd() error
    ReadStructBegin() (name string, err error)
    ReadStructEnd() error
    ReadFieldBegin() (name string, typeId TType, id int16, err error)
    ReadFieldEnd() error
    ReadMapBegin() (keyType TType, valueType TType, size int, err error)
    ReadMapEnd() error
    ReadListBegin() (elemType TType, size int, err error)
    ReadListEnd() error
    ReadSetBegin() (elemType TType, size int, err error)
    ReadSetEnd() error
    ReadBool() (value bool, err error)
    ReadByte() (value int8, err error)
    ReadI16() (value int16, err error)
    ReadI32() (value int32, err error)
    ReadI64() (value int64, err error)
    ReadDouble() (value float64, err error)
    ReadString() (value string, err error)
    ReadBinary() (value []byte, err error)

    Skip(fieldType TType) (err error)
    Flush() (err error)

    Transport() TTransport
}

Thrift Client

Client 端就简单多了,与服务端建立一个连接,再在连接上构建TTransport,选择与服务端对应的in Protocol 和 out Protocol。发送数据。以 getNewOrders为例。

func (p *PlayerServerClient) GetNewOrders() (r []*Order, err error) {
    if err = p.sendGetNewOrders(); err != nil {
        return
    }
    return p.recvGetNewOrders()
}

func (p *PlayerServerClient) sendGetNewOrders() (err error) {
    oprot := p.OutputProtocol
    if oprot == nil {
        oprot = p.ProtocolFactory.GetProtocol(p.Transport)
        p.OutputProtocol = oprot
    }
    p.SeqId++
    if err = oprot.WriteMessageBegin("getNewOrders", thrift.CALL, p.SeqId); err != nil {
        return
    }
    args := PlayerServerGetNewOrdersArgs{}
    if err = args.Write(oprot); err != nil {
        return
    }
    if err = oprot.WriteMessageEnd(); err != nil {
        return
    }
    return oprot.Flush()
}

func (p *PlayerServerClient) recvGetNewOrders() (value []*Order, err error) {
    iprot := p.InputProtocol
    if iprot == nil {
        iprot = p.ProtocolFactory.GetProtocol(p.Transport)
        p.InputProtocol = iprot
    }
    method, mTypeId, seqId, err := iprot.ReadMessageBegin()
    if err != nil {
        return
    }
    if method != "getNewOrders" {
        err = thrift.NewTApplicationException(thrift.WRONG_METHOD_NAME, "getNewOrders failed: wrong method name")
        return
    }
    if p.SeqId != seqId {
        err = thrift.NewTApplicationException(thrift.BAD_SEQUENCE_ID, "getNewOrders failed: out of sequence response")
        return
    }
    if mTypeId == thrift.EXCEPTION {
        error12 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception")
        var error13 error
        error13, err = error12.Read(iprot)
        if err != nil {
            return
        }
        if err = iprot.ReadMessageEnd(); err != nil {
            return
        }
        err = error13
        return
    }
    if mTypeId != thrift.REPLY {
        err = thrift.NewTApplicationException(thrift.INVALID_MESSAGE_TYPE_EXCEPTION, "getNewOrders failed: invalid message type")
        return
    }
    result := PlayerServerGetNewOrdersResult{}
    if err = result.Read(iprot); err != nil {
        return
    }
    if err = iprot.ReadMessageEnd(); err != nil {
        return
    }
    value = result.GetSuccess()
    return
}

sendGetNewOrders 会将远程接口名,参数序列化之后发到Server. recvGetNewOrders 接受Server返回的数据反序列化。调用完成。

总结

先不评价Thrift 在众多RPC框架中怎么样(笔者用过的RPC太少)。但Thrift整个设计面向接口,层次清楚,很易于扩展,维护,可以学习下。

相关文章

网友评论

本文标题:Go Thrift RPC

本文链接:https://www.haomeiwen.com/subject/kugazxtx.html