美文网首页
Go RPC(一)

Go RPC(一)

作者: 小东班吉 | 来源:发表于2023-05-14 18:28 被阅读0次

    RPC(Remote Procedure Call,远程过程调用)是一种计算机通信协议,用于使不同计算机间的程序能够相互调用,就像调用本地程序一样。它的基本原理是将远程调用封装成本地调用的方式,使得远程的调用过程对于调用方来说是透明的。

    在 RPC 中,客户端通过发送一个请求消息给服务端,服务端接收到请求后执行相应的操作,并返回一个响应消息给客户端。通常情况下,RPC 协议会对请求和响应消息进行序列化和反序列化,以便在网络上传输。

    net/rpc

    在go中,net/rpc是这么说的,服务端注册一个对象使其作为具有对象类型名称的服务可见。该对象的可导出的方法可以支持远程调用,在同一个服务端可以注册多个不同类型的对象,但不能注册多个同样的对象。一个对象的方法要支持远程访问,必须满足以下几个标准:

    • the method's type is exported. // 方法类型是可导出的
    • the method is exported. // 方法必须是可导出的
    • the method has two arguments, both exported (or builtin) types. // 必须有2个参数,且类型是可导出的或者内置的类型
    • the method's second argument is a pointer. // 第二个参数必须是指针,因为要接收返回值。
    • the method has return type error. // 返回一个error
      例如:
    // 第一个参数表示客户端调用传递的参数,第二个参数是返回值
    func (t *T) MethodName(argType T1, replyType *T2) error
    

    go中的rpc官方给了个例子:
    服务端

    type Args struct {  
       A, B int  
    }  
      
    type Quotient struct {  
       Quo, Rem int  
    }  
      
    type Arith int  
      
    func (t *Arith) Multiply(args *Args, reply *int) error {  
       *reply = args.A * args.B  
       return nil  
    }  
      
    func (t *Arith) Divide(args *Args, quo *Quotient) error {  
       if args.B == 0 {  
          return errors.New("divide by zero")  
       }  
       quo.Quo = args.A / args.B  
       quo.Rem = args.A % args.B  
       return nil  
    }  
    
    func server() {  
       arith := new(Arith)  
       s := rpc.NewServer() 
       // s.RegisterName 和 s.Register比较,多了个自定义的命名空间,比如这里是`chujiu.Arith`,那客户端调用就是`chujiu.Arith.xxxx`,如果是Register则调用时服务名称就是`Arith.xxxx`
       s.RegisterName("chujiu.Arith", arith)  
       l, e := net.Listen("tcp", ":1234")  
       if e != nil {  
          log.Fatal("listen error:", e)  
       }  
       go s.Accept(l)  
    }
    

    对象 Arith的两个方法都是可以导出的,且有2个参数,第一个作为客户端调用传递过来的参数,第二个是指针作为返回值,返回一个error。
    new一个新的server后,然后将对象Arith注册到该server上,然后监听1234端口,然后等待连接。
    客户端

    func client() {  
       c, err := rpc.Dial("tcp", ":1234")  
       if err != nil {  
          log.Fatal("dialing:", err)  
       }  
       args := &Args{7, 8}  
       var reply int  
       err = c.Call("chujiu.Arith.Multiply", args, &reply)  
       if err != nil {  
          log.Fatal("arith error:", err)  
       }  
       fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)  
    }  
    

    客户端先链接上服务端,然后通过call调用远程服务的方法,看起来就向调用本地函数一样,第一个参数是服务名称,第二个是参数,第三个是返回值,还有一个异步调用的方法,返回一个call对象,然后通过对call对象的done channel读取返回值

    quotient := new(Quotient)  
    divCall := client.Go("Arith.Divide", args, quotient, nil)  
    replyCall := <-divCall.Done    // will be equal to divCall
    

    原理

    服务端
    rpc的实现原理很简单,就是在服务端启动的时候,通过一个map将注册的服务保存起来,然后客户端调用时,通过名称从服务端保存的map中找到真正的对象,然后调用对象的方法。
    register保存了注册的对象,第一个参数是注册的对象,第二个是服务的名称,第三个参数 是否使用自定义的name。server对象中保存了请求的参数以及注册的服务的map
    go/src/net/rpc/server.go:239

    // Server represents an RPC Server.  
    type Server struct {  
       serviceMap sync.Map   // map[string]*service  
       reqLock    sync.Mutex // protects freeReq  
       freeReq    *Request  
       respLock   sync.Mutex // protects freeResp  
       freeResp   *Response  
    }
    
    func (server *Server) register(rcvr any, name string, useName bool) error {  
       s := new(service)  
       s.typ = reflect.TypeOf(rcvr)  
       s.rcvr = reflect.ValueOf(rcvr)  
       sname := name  
       // 这里可以看到如果不是自定义的,将直接用注册的对象名称作为服务名称
       if !useName {  
          sname = reflect.Indirect(s.rcvr).Type().Name()  
       }  
       if sname == "" {  
          s := "rpc.Register: no service name for type " + s.typ.String()  
          log.Print(s)  
          return errors.New(s)  
       }  
       // 这里检查了对象是否是导出的,在go中就看首字母是否大写
       if !useName && !token.IsExported(sname) {  
          s := "rpc.Register: type " + sname + " is not exported"  
          log.Print(s)  
          return errors.New(s)  
       }  
       s.name = sname  
      
       // Install the methods 
       // 这里对对象的方法以及参数等做了校验 
       s.method = suitableMethods(s.typ, logRegisterError)  
      
       if len(s.method) == 0 {  
          str := ""  
      
          // To help the user, see if a pointer receiver would work.  
          // 这里第二次调用只是为了确保安装的方法都符合上面几个规范,提高注册方法的安全性和可靠性。
          // 比如需要导出的方法因为参数不符合规范而被第一次调研过滤掉了,看起来有点多余
          // 但第二次是为了确保被注册的所有方法都符合规范,而不是为了重新获取可导出的方法,这也是第二次导出的并不需要赋值的原因
          method := suitableMethods(reflect.PointerTo(s.typ), false)  
          if len(method) != 0 {  
             str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"  
          } else {  
             str = "rpc.Register: type " + sname + " has no exported methods of suitable type"  
          }  
          log.Print(str)  
          return errors.New(str)  
       }  
      
       if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {  
          return errors.New("rpc: service already defined: " + sname)  
       }  
       return nil  
    }
    

    可以看到先是通过suitableMethods方法做了检查后,将注册的对象都存储到server.serviceMap中了,值为服务名称,值为service,而service里存储了对象的名称,类型,方法等。
    go/src/net/rpc/server.go:161

    type service struct {  
       name   string                 // name of service  
       rcvr   reflect.Value          // receiver of methods for the service  
       typ    reflect.Type           // type of the receiver  
       method map[string]*methodType // registered methods  
    }
    

    go/src/net/rpc/server.go:284

    func suitableMethods(typ reflect.Type, logErr bool) map[string]*methodType {  
       methods := make(map[string]*methodType)  
       for m := 0; m < typ.NumMethod(); m++ {  
          method := typ.Method(m)  
          mtype := method.Type  
          mname := method.Name  
          // Method must be exported.  
          if !method.IsExported() {  
             continue  
          }  
          // Method needs three ins: receiver, *args, *reply.  
          if mtype.NumIn() != 3 {  
             if logErr {  
                log.Printf("rpc.Register: method %q has %d input parameters; needs exactly three\n", mname, mtype.NumIn())  
             }  
             continue  
          }  
          // First arg need not be a pointer.  
          argType := mtype.In(1)  
          if !isExportedOrBuiltinType(argType) {  
             if logErr {  
                log.Printf("rpc.Register: argument type of method %q is not exported: %q\n", mname, argType)  
             }  
             continue  
          }  
          // Second arg must be a pointer.  
          replyType := mtype.In(2)  
          if replyType.Kind() != reflect.Pointer {  
             if logErr {  
                log.Printf("rpc.Register: reply type of method %q is not a pointer: %q\n", mname, replyType)  
             }  
             continue  
          }  
          // Reply type must be exported.  
          if !isExportedOrBuiltinType(replyType) {  
             if logErr {  
                log.Printf("rpc.Register: reply type of method %q is not exported: %q\n", mname, replyType)  
             }  
             continue  
          }  
          // Method needs one out.  
          if mtype.NumOut() != 1 {  
             if logErr {  
                log.Printf("rpc.Register: method %q has %d output parameters; needs exactly one\n", mname, mtype.NumOut())  
             }  
             continue  
          }  
          // The return type of the method must be error.  
          if returnType := mtype.Out(0); returnType != typeOfError {  
             if logErr {  
                log.Printf("rpc.Register: return type of method %q is %q, must be error\n", mname, returnType)  
             }  
             continue  
          }  
          methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}  
       }  
       return methods  
    }
    

    该方法里主要是做了对方法是否导出,以及参数是否否和标准做了检查,最后将对象的方法存储到service.methods这个map中返回key是方法名称,value是一个methed的结构体,包含了可远程调用的方法名称,参数以及返回值。
    注册对象之后就是监听端口,l, e := net.Listen("tcp", ":1234")等待客户端链接调用。
    go/src/net/rpc/server.go:628

    func (server *Server) Accept(lis net.Listener) {  
       for {  
          conn, err := lis.Accept()  
          if err != nil {  
             log.Print("rpc.Serve: accept:", err.Error())  
             return  
          }  
          go server.ServeConn(conn)  
       }  
    }
    

    这里可以看到如果有链接进来,则启一个新的go程去处理。ServeConn使用的是gob这种二进制的编码解码方式, gob是go内置的二进制编码方式,相比较binary,它存储了一些额外的信息,不需要知道长度,就能解码,缺点就是占用的字节数会很多,儿binary需要自己定义好对象的长度,在解码时才知道读到哪些长度的字节。

    func (server *Server) ServeConn(conn io.ReadWriteCloser) {  
       buf := bufio.NewWriter(conn)  
       srv := &gobServerCodec{  
          rwc:    conn,  
          dec:    gob.NewDecoder(conn),  
          enc:    gob.NewEncoder(buf),  
          encBuf: buf,  
       }  
       server.ServeCodec(srv)  
    }
    

    ServeCodec方法的参数是一个接口 ServerCodec
    ServerCodec为RPC会话的服务器端实现RPC请求的读取和RPC响应的写入。而gobServerCodec实现了它。还有json编码的比如go/src/net/rpc/jsonrpc/server.go:17::serverCodec,都是实现了ServerCodec接口。
    go/src/net/rpc/server.go:459

    // ServeCodec is like ServeConn but uses the specified codec to// decode requests and encode responses.  
    func (server *Server) ServeCodec(codec ServerCodec) {  
       sending := new(sync.Mutex)  
       wg := new(sync.WaitGroup)  
       for {  
          service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)  
          if err != nil {  
             if debugLog && err != io.EOF {  
                log.Println("rpc:", err)  
             }  
             if !keepReading {  
                break  
             }  
             // send a response if we actually managed to read a header.  
             if req != nil {  
                server.sendResponse(sending, req, invalidRequest, codec, err.Error())  
                server.freeRequest(req)  
             }  
             continue  
          }  
          wg.Add(1)  
          go service.call(server, sending, wg, mtype, req, argv, replyv, codec)  
       }  
       // We've seen that there are no more requests.  
       // Wait for responses to be sent before closing codec.   wg.Wait()  
       codec.Close()  
    }
    

    这里面其实就实现了客户端调用后,服务端处理的过程。server.readRequest获取了客户端请求的参数以及存储服务方法的service。紧接着在下面 server.call,调用了真实对象的方法。
    go/src/net/rpc/server.go:585

    func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {  
       service, mtype, req, keepReading, err = server.readRequestHeader(codec)  
       。。。。
       // argv guaranteed to be a pointer now.  
       if err = codec.ReadRequestBody(argv.Interface()); err != nil {  
          return  
       }  
       。。。
    }
    
    
    func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {  
       // Grab the request header.  
       req = server.getRequest()  
       err = codec.ReadRequestHeader(req)  
       if err != nil {  
          req = nil  
          if err == io.EOF || err == io.ErrUnexpectedEOF {  
             return  
          }  
          err = errors.New("rpc: server cannot decode request: " + err.Error())  
          return  
       }  
      
       // We read the header successfully. If we see an error now,  
       // we can still recover and move on to the next request.   
       keepReading = true  
      
       dot := strings.LastIndex(req.ServiceMethod, ".")  
       if dot < 0 {  
          err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)  
          return  
       }  
       serviceName := req.ServiceMethod[:dot]  
       methodName := req.ServiceMethod[dot+1:]  
      
       // Look up the request.  
       svci, ok := server.serviceMap.Load(serviceName)  
       if !ok {  
          err = errors.New("rpc: can't find service " + req.ServiceMethod)  
          return  
       }  
       svc = svci.(*service)  
       mtype = svc.method[methodName]  
       if mtype == nil {  
          err = errors.New("rpc: can't find method " + req.ServiceMethod)  
       }  
       return  
    }
    

    server.getRequest()获取到request后,通过codec.ReadRequestHeader(req)解码,这里因为例子中是gob编码,所以会调用gobServerCodec::ReadRequestHeader 解码,然后获取到客户端调用服务的对象和方法名。接下来获取客户端调用的参数,最后调用call方法实现对象方法的调用。
    /go/src/net/rpc/server.go:373

    func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {  
       if wg != nil {  
          defer wg.Done()  
       }  
       mtype.Lock()  
       mtype.numCalls++  
       mtype.Unlock()  
       function := mtype.method.Func  
       // Invoke the method, providing a new value for the reply.  
       returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})  
       // The return value for the method is an error.  
       errInter := returnValues[0].Interface()  
       errmsg := ""  
       if errInter != nil {  
          errmsg = errInter.(error).Error()  
       }  
       server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)  
       server.freeRequest(req)  
    }
    

    这里通过反射call方法调用了真正的方法,返回值如果非nil,则返回一个错误的字符串,然后写入response。
    客户端
    客户端总共有2个方法调用call和go,call方法底层也是调用的go方法,go方法会把结果返回到call结构体的done channel上
    go/src/net/rpc/client.go:298

    type Client struct {  
       codec ClientCodec  //客户端也有一个编码的接口描述,和服务端的serverCodec是一样的,请求用什么编码,解码就用什么解
      
       reqMutex sync.Mutex // protects following  
       request  Request  
      
       mutex    sync.Mutex // protects following  
       seq      uint64  
       pending  map[uint64]*Call  // 这是待处理请求的map,key是序号seq,值是call对象,call里保存了本次请求的参数等
       closing  bool // user has called Close  
       shutdown bool // server has told us to stop  
    }
    
    // request 规定了service的格式
    type Request struct {  
       ServiceMethod string   // format: "Service.Method"  
       Seq           uint64   // sequence number chosen by client  
       next          *Request // for free list in Server  
    }
    
    type Call struct {  
       ServiceMethod string     // The name of the service and method to call.  
       Args          any        // The argument to the function (*struct).  
       Reply         any        // The reply from the function (*struct).  
       Error         error      // After completion, the error status.  
       Done          chan *Call // Receives *Call when Go is complete.}
    
    func (client *Client) Go(serviceMethod string, args any, reply any, done chan *Call) *Call {  
       call := new(Call)  
       call.ServiceMethod = serviceMethod  
       call.Args = args  
       call.Reply = reply  
       if done == nil {  
          done = make(chan *Call, 10) // buffered.  
       } else {  
          // If caller passes done != nil, it must arrange that  
          // done has enough buffer for the number of simultaneous      
          // RPCs that will be using that channel. If the channel      
          // is totally unbuffered, it's best not to run at all.  
          if cap(done) == 0 {  
             log.Panic("rpc: done channel is unbuffered")  
          }  
       }  
       call.Done = done  
       client.send(call)  
       return call  
    }
    
    func (client *Client) Call(serviceMethod string, args any, reply any) error {  
       call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done  
       return call.Error  
    }
    

    这里done的容量在同步调用时done的长度设置为了1,这里不用无缓冲的channel。因为无缓冲要求发送值的时候,需要设置接收者,否则就会一直阻塞住,会有性能问题,而有缓冲的就可以缓存一个值,可以立即返回,且可以保证有序性,就是因为同一时间只能缓存一个值。另一个好处就是可以避免死锁的问题,假设没有接收者或者发送方和接收方都处于阻塞中,无缓冲的很可能立马就dead lock了。
    client.send 方法是真正的调用服务的方法,看下代码

    func (client *Client) send(call *Call) {  
       client.reqMutex.Lock()  
       defer client.reqMutex.Unlock()  
      
       // Register this call.  
       client.mutex.Lock()  
       if client.shutdown || client.closing {  
          client.mutex.Unlock()  
          call.Error = ErrShutdown  
          call.done()  
          return  
       }  
       // 序号自增,然后存入pending中
       seq := client.seq  
       client.seq++  
       client.pending[seq] = call  
       client.mutex.Unlock()  
      
       // Encode and send the request.  
       client.request.Seq = seq  
       client.request.ServiceMethod = call.ServiceMethod  
       err := client.codec.WriteRequest(&client.request, call.Args)  
       if err != nil {  
          client.mutex.Lock()  
          call = client.pending[seq]  
          delete(client.pending, seq)  
          client.mutex.Unlock()  
          if call != nil {  
             call.Error = err  
             call.done()  
          }  
       }  
    }
    
    func (c *gobClientCodec) WriteRequest(r *Request, body any) (err error) {  
       if err = c.enc.Encode(r); err != nil {  
          return  
       }  
       if err = c.enc.Encode(body); err != nil {  
          return  
       }  
       return c.encBuf.Flush()  
    }
    

    client通过codec调用WriteRequest方法,WriteRequest方法就是用gob编码然后写入到底层io.writer接口等待服务端读取。
    上面说到 codec 是实现了接口 ClientCodec,NewClient默认是通过gob编码的。

    func NewClient(conn io.ReadWriteCloser) *Client {  
       encBuf := bufio.NewWriter(conn)  
       client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}  
       return NewClientWithCodec(client)  
    }  
      
    func NewClientWithCodec(codec ClientCodec) *Client {  
       client := &Client{  
          codec:   codec,  
          pending: make(map[uint64]*Call),  
       }  
       go client.input()  
       return client  
    }
    

    这里在创建一个新的客户端的同时,启了一个新的go程接收客户端的响应。

    func (client *Client) input() {  
       var err error  
       var response Response  
       for err == nil {  
          response = Response{}  
          err = client.codec.ReadResponseHeader(&response)  
          if err != nil {  
             break  
          }  
          seq := response.Seq  
          client.mutex.Lock()  
          call := client.pending[seq]  
          delete(client.pending, seq)  
          client.mutex.Unlock()  
      
          switch {  
          case call == nil:        
             err = client.codec.ReadResponseBody(nil)  
             if err != nil {  
                err = errors.New("reading error body: " + err.Error())  
             }  
          case response.Error != "":  
            call.Error = ServerError(response.Error)  
             err = client.codec.ReadResponseBody(nil)  
             if err != nil {  
                err = errors.New("reading error body: " + err.Error())  
             }  
             call.done()  
          default:  
             err = client.codec.ReadResponseBody(call.Reply)  
             if err != nil {  
                call.Error = errors.New("reading body " + err.Error())  
             }  
             call.done()  
          }  
       }  
       // Terminate pending calls.  
       client.reqMutex.Lock()  
       client.mutex.Lock()  
       client.shutdown = true  
       closing := client.closing  
       if err == io.EOF {  
          if closing {  
             err = ErrShutdown  
          } else {  
             err = io.ErrUnexpectedEOF  
          }  
       }  
       for _, call := range client.pending {  
          call.Error = err  
          call.done()  
       }  
       client.mutex.Unlock()  
       client.reqMutex.Unlock()  
       if debugLog && err != io.EOF && !closing {  
          log.Println("rpc: client protocol error:", err)  
       }  
    }
    

    可以看到服务端处理后发送请求,ReadResponseBody客户端这边接收到后写入call.Reply中,然后从将call结构体写入call.done中,为了防止done阻塞,select 给了一个空的default。最后客户端处理返回的结果。

    相关文章

      网友评论

          本文标题:Go RPC(一)

          本文链接:https://www.haomeiwen.com/subject/aovpsdtx.html