美文网首页
net/rpc源码学习

net/rpc源码学习

作者: 第八共同体 | 来源:发表于2019-07-10 10:53 被阅读0次

几个结构体类型

const (
    // 被用于HandleHTTP方法
    DefaultRPCPath   = "/_goRPC_"
    DefaultDebugPath = "/debug/rpc"
)

// 验证error的reflect type.不能直接使用error
//因为TypeOf方法得到一个空指针值是不被允许的。
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()

type methodType struct {
    sync.Mutex // protects counters
    method     reflect.Method
    ArgType    reflect.Type
    ReplyType  reflect.Type
    numCalls   uint
}

type service struct {
    name   string                 // name of service
    rcvr   reflect.Value          // receiver of methods for the service
    typ    reflect.Type           // type of the receiver
    method map[string]*methodType // registered methods
}

// 内部使用,每个rpc调用之前的header写入
type Request struct {
    ServiceMethod string   // format: "Service.Method"
    Seq           uint64   // sequence number chosen by client
    next          *Request // for free list in Server
}

//内部使用,每个Rpc返回之前的header写入
type Response struct {
    ServiceMethod string    // echoes that of the Request
    Seq           uint64    // echoes that of the request
    Error         string    // error, if any.
    next          *Response // for free list in Server
}

// 代表rpc服务端
type Server struct {
    serviceMap sync.Map   // map[string]*service
    reqLock    sync.Mutex // protects freeReq
    freeReq    *Request
    respLock   sync.Mutex // protects freeResp
    freeResp   *Response
}

// NewServer returns a new Server.
func NewServer() *Server {
    return &Server{}
}

// DefaultServer is the default instance of *Server.
var DefaultServer = NewServer()

// Register publishes the receiver's methods in the DefaultServer.
func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }

// RegisterName is like Register but uses the provided name for the type
// instead of the receiver's concrete type.
func RegisterName(name string, rcvr interface{}) error {
    return DefaultServer.RegisterName(name, rcvr)
}

// A ServerCodec implements reading of RPC requests and writing of
// RPC responses for the server side of an RPC session.
// The server calls ReadRequestHeader and ReadRequestBody in pairs
// to read requests from the connection, and it calls WriteResponse to
// write a response back. The server calls Close when finished with the
// connection. ReadRequestBody may be called with a nil
// argument to force the body of the request to be read and discarded.
// See NewClient's comment for information about concurrent access.
type ServerCodec interface {
    ReadRequestHeader(*Request) error
    ReadRequestBody(interface{}) error
    WriteResponse(*Response, interface{}) error

    // Close can be called multiple times and must be idempotent.
    Close() error
}

// ServeConn runs the DefaultServer on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
// The caller typically invokes ServeConn in a go statement.
// ServeConn uses the gob wire format (see package gob) on the
// connection. To use an alternate codec, use ServeCodec.
// See NewClient's comment for information about concurrent access.
func ServeConn(conn io.ReadWriteCloser) {
    DefaultServer.ServeConn(conn)
}

// ServeCodec is like ServeConn but uses the specified codec to
// decode requests and encode responses.
func ServeCodec(codec ServerCodec) {
    DefaultServer.ServeCodec(codec)
}

// ServeRequest is like ServeCodec but synchronously serves a single request.
// It does not close the codec upon completion.
func ServeRequest(codec ServerCodec) error {
    return DefaultServer.ServeRequest(codec)
}

// Accept accepts connections on the listener and serves requests
// to DefaultServer for each incoming connection.
// Accept blocks; the caller typically invokes it in a go statement.
func Accept(lis net.Listener) { DefaultServer.Accept(lis) }

// Can connect to RPC service using HTTP CONNECT to rpcPath.
var connected = "200 Connected to Go RPC"

// ServeHTTP implements an http.Handler that answers RPC requests.
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    if req.Method != "CONNECT" {
        w.Header().Set("Content-Type", "text/plain; charset=utf-8")
        w.WriteHeader(http.StatusMethodNotAllowed)
        io.WriteString(w, "405 must CONNECT\n")
        return
    }
    conn, _, err := w.(http.Hijacker).Hijack()
    if err != nil {
        log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
        return
    }
    io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
    server.ServeConn(conn)
}

// HandleHTTP registers an HTTP handler for RPC messages on rpcPath,
// and a debugging handler on debugPath.
// It is still necessary to invoke http.Serve(), typically in a go statement.
func (server *Server) HandleHTTP(rpcPath, debugPath string) {
    http.Handle(rpcPath, server)
    http.Handle(debugPath, debugHTTP{server})
}

// HandleHTTP registers an HTTP handler for RPC messages to DefaultServer
// on DefaultRPCPath and a debugging handler on DefaultDebugPath.
// It is still necessary to invoke http.Serve(), typically in a go statement.
func HandleHTTP() {
    DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
}

1、调用包的Register方法将接受者方法集发布到默认的服务端上
2、ServerCodec实现读取RPC请求和写入RPC响应在服务器端的会话中。服务端调用ReadRequestHeader和ReadRequestBody读取请求,调用WriteResponse写入响应,调用Close方法结束连接。调用带一个nil参数的ReadRequestBody方法强制读取或者丢弃请求体。查看NewClient的注释查看并发访问的详情。
3、

gob编码

net/rpc默认使用的是该编码方式,在实际使用中, 可以根据情况,使用第三方的组件,如protobuf proto

type gobServerCodec struct {
    rwc    io.ReadWriteCloser
    dec    *gob.Decoder
    enc    *gob.Encoder
    encBuf *bufio.Writer
    closed bool
}

func (c *gobServerCodec) ReadRequestHeader(r *Request) error {
    return c.dec.Decode(r)
}

func (c *gobServerCodec) ReadRequestBody(body interface{}) error {
    return c.dec.Decode(body)
}

func (c *gobServerCodec) WriteResponse(r *Response, body interface{}) (err error) {
    if err = c.enc.Encode(r); err != nil {
        if c.encBuf.Flush() == nil {
            // Gob couldn't encode the header. Should not happen, so if it does,
            // shut down the connection to signal that the connection is broken.
            log.Println("rpc: gob error encoding response:", err)
            c.Close()
        }
        return
    }
    if err = c.enc.Encode(body); err != nil {
        if c.encBuf.Flush() == nil {
            // Was a gob problem encoding the body but the header has been written.
            // Shut down the connection to signal that the connection is broken.
            log.Println("rpc: gob error encoding body:", err)
            c.Close()
        }
        return
    }
    return c.encBuf.Flush()
}

func (c *gobServerCodec) Close() error {
    if c.closed {
        // Only call c.rwc.Close once; otherwise the semantics are undefined.
        return nil
    }
    c.closed = true
    return c.rwc.Close()
}

导出方法的注册发布

// Register publishes in the server the set of methods of the
// receiver value that satisfy the following conditions:
//  - exported method of exported type
//  - two arguments, both of exported type
//  - the second argument is a pointer
//  - one return value, of type error
// It returns an error if the receiver is not an exported type or has
// no suitable methods. It also logs the error using package log.
// The client accesses each method using a string of the form "Type.Method",
// where Type is the receiver's concrete type.
func (server *Server) Register(rcvr interface{}) error {
    return server.register(rcvr, "", false)
}

// RegisterName is like Register but uses the provided name for the type
// instead of the receiver's concrete type.
func (server *Server) RegisterName(name string, rcvr interface{}) error {
    return server.register(rcvr, name, true)
}

func (server *Server) register(rcvr interface{}, name string, useName bool) error {
    s := new(service)
    s.typ = reflect.TypeOf(rcvr)
    s.rcvr = reflect.ValueOf(rcvr)
    sname := reflect.Indirect(s.rcvr).Type().Name()
    if useName {
        sname = name
    }
    if sname == "" {
        s := "rpc.Register: no service name for type " + s.typ.String()
        log.Print(s)
        return errors.New(s)
    }
    if !isExported(sname) && !useName {
        s := "rpc.Register: type " + sname + " is not exported"
        log.Print(s)
        return errors.New(s)
    }
    s.name = sname

    // Install the methods
    s.method = suitableMethods(s.typ, true)

    if len(s.method) == 0 {
        str := ""

        // To help the user, see if a pointer receiver would work.
        method := suitableMethods(reflect.PtrTo(s.typ), false)
        if len(method) != 0 {
            str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
        } else {
            str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
        }
        log.Print(str)
        return errors.New(str)
    }

    if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
        return errors.New("rpc: service already defined: " + sname)
    }
    return nil
}

// suitableMethods returns suitable Rpc methods of typ, it will report
// error using log if reportErr is true.
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
    methods := make(map[string]*methodType)
    for m := 0; m < typ.NumMethod(); m++ {
        method := typ.Method(m)
        mtype := method.Type
        mname := method.Name
        // Method must be exported.
        if method.PkgPath != "" {
            continue
        }
        // Method needs three ins: receiver, *args, *reply.
        if mtype.NumIn() != 3 {
            if reportErr {
                log.Printf("rpc.Register: method %q has %d input parameters; needs exactly three\n", mname, mtype.NumIn())
            }
            continue
        }
        // First arg need not be a pointer.
        argType := mtype.In(1)
        if !isExportedOrBuiltinType(argType) {
            if reportErr {
                log.Printf("rpc.Register: argument type of method %q is not exported: %q\n", mname, argType)
            }
            continue
        }
        // Second arg must be a pointer.
        replyType := mtype.In(2)
        if replyType.Kind() != reflect.Ptr {
            if reportErr {
                log.Printf("rpc.Register: reply type of method %q is not a pointer: %q\n", mname, replyType)
            }
            continue
        }
        // Reply type must be exported.
        if !isExportedOrBuiltinType(replyType) {
            if reportErr {
                log.Printf("rpc.Register: reply type of method %q is not exported: %q\n", mname, replyType)
            }
            continue
        }
        // Method needs one out.
        if mtype.NumOut() != 1 {
            if reportErr {
                log.Printf("rpc.Register: method %q has %d output parameters; needs exactly one\n", mname, mtype.NumOut())
            }
            continue
        }
        // The return type of the method must be error.
        if returnType := mtype.Out(0); returnType != typeOfError {
            if reportErr {
                log.Printf("rpc.Register: return type of method %q is %q, must be error\n", mname, returnType)
            }
            continue
        }
        methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
    }
    return methods
}

查看源码,我们可以得知:
1、注册服务的一组方法,接受参数需满足以下条件

  • 方法是可导出类型
  • 方法的两个参数都是可导出类型
  • 方法的第二个参数是一个指针类型
  • 方法有一个返回值,类型为error
    2、客户端访问每个方法通过Type.Method的形式。其中Type是接收方的具体类型,示例:
err = client.Call("Arith.Multiply", args, &reply)
    if err != nil {
        log.Fatal("arith error:", err)
    }

3、RegisterName方法可以指定类型的形式,而不是使用默认的Type.Method形式调用方法,其他等同于Regitster
4、一个服务器可以注册多个不同类型的对象(或者服务),但是不能注册同一类型的多个对象。

if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
        return errors.New("rpc: service already defined: " + sname)
    }

方法调用

当服务端收到一个非法的请求时,invalidRequest作为服务端响应的一个占位符。它将不会被客户端解码,因为响应包含一个error.

// A value sent as a placeholder for the server's response value when the server
// receives an invalid request. It is never decoded by the client since the Response
// contains an error when it is used.
var invalidRequest = struct{}{}

func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
    resp := server.getResponse()
    // Encode the response header
    resp.ServiceMethod = req.ServiceMethod
    if errmsg != "" {
        resp.Error = errmsg
        reply = invalidRequest
    }
    resp.Seq = req.Seq
    sending.Lock()
    err := codec.WriteResponse(resp, reply)
    if debugLog && err != nil {
        log.Println("rpc: writing response:", err)
    }
    sending.Unlock()
    server.freeResponse(resp)
}

func (m *methodType) NumCalls() (n uint) {
    m.Lock()
    n = m.numCalls
    m.Unlock()
    return n
}

func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
    if wg != nil {
        defer wg.Done()
    }
    mtype.Lock()
    mtype.numCalls++
    mtype.Unlock()
    function := mtype.method.Func
    // Invoke the method, providing a new value for the reply.
    returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
    // The return value for the method is an error.
    errInter := returnValues[0].Interface()
    errmsg := ""
    if errInter != nil {
        errmsg = errInter.(error).Error()
    }
    server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
    server.freeRequest(req)
}

调用方法的执行流程:

  • 方法类型的调用次数自增
  • 调用方法执行,获取调用结果
  • 发送调用结果作为响应

ServeConn

  • ServeConn 在一个单一连接中跑server. 调用者经常在一个go 语句中调用ServerConn,默认使用gob编码,如果想使用其他编码方式,可以使用ServeCodec
  • ServeCodec和ServeConn相同,但是使用特定的编码,来解码请求和编码响应。
  • ServeRequest与ServeCodec类似,但同步提供单个请求。
// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
// The caller typically invokes ServeConn in a go statement.
// ServeConn uses the gob wire format (see package gob) on the
// connection. To use an alternate codec, use ServeCodec.
// See NewClient's comment for information about concurrent access.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
    buf := bufio.NewWriter(conn)
    srv := &gobServerCodec{
        rwc:    conn,
        dec:    gob.NewDecoder(conn),
        enc:    gob.NewEncoder(buf),
        encBuf: buf,
    }
    server.ServeCodec(srv)
}

// ServeCodec is like ServeConn but uses the specified codec to
// decode requests and encode responses.
func (server *Server) ServeCodec(codec ServerCodec) {
    sending := new(sync.Mutex)
    wg := new(sync.WaitGroup)
    for {
        service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
        if err != nil {
            if debugLog && err != io.EOF {
                log.Println("rpc:", err)
            }
            if !keepReading {
                break
            }
            // send a response if we actually managed to read a header.
            if req != nil {
                server.sendResponse(sending, req, invalidRequest, codec, err.Error())
                server.freeRequest(req)
            }
            continue
        }
        wg.Add(1)
        go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
    }
    // We've seen that there are no more requests.
    // Wait for responses to be sent before closing codec.
    wg.Wait()
    codec.Close()
}

// ServeRequest is like ServeCodec but synchronously serves a single request.
// It does not close the codec upon completion.
func (server *Server) ServeRequest(codec ServerCodec) error {
    sending := new(sync.Mutex)
    service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
    if err != nil {
        if !keepReading {
            return err
        }
        // send a response if we actually managed to read a header.
        if req != nil {
            server.sendResponse(sending, req, invalidRequest, codec, err.Error())
            server.freeRequest(req)
        }
        return err
    }
    service.call(server, sending, nil, mtype, req, argv, replyv, codec)
    return nil
}

func (server *Server) getRequest() *Request {
    server.reqLock.Lock()
    req := server.freeReq
    if req == nil {
        req = new(Request)
    } else {
        server.freeReq = req.next
        *req = Request{}
    }
    server.reqLock.Unlock()
    return req
}

func (server *Server) freeRequest(req *Request) {
    server.reqLock.Lock()
    req.next = server.freeReq
    server.freeReq = req
    server.reqLock.Unlock()
}

func (server *Server) getResponse() *Response {
    server.respLock.Lock()
    resp := server.freeResp
    if resp == nil {
        resp = new(Response)
    } else {
        server.freeResp = resp.next
        *resp = Response{}
    }
    server.respLock.Unlock()
    return resp
}

func (server *Server) freeResponse(resp *Response) {
    server.respLock.Lock()
    resp.next = server.freeResp
    server.freeResp = resp
    server.respLock.Unlock()
}

func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
    service, mtype, req, keepReading, err = server.readRequestHeader(codec)
    if err != nil {
        if !keepReading {
            return
        }
        // discard body
        codec.ReadRequestBody(nil)
        return
    }

    // Decode the argument value.
    argIsValue := false // if true, need to indirect before calling.
    if mtype.ArgType.Kind() == reflect.Ptr {
        argv = reflect.New(mtype.ArgType.Elem())
    } else {
        argv = reflect.New(mtype.ArgType)
        argIsValue = true
    }
    // argv guaranteed to be a pointer now.
    if err = codec.ReadRequestBody(argv.Interface()); err != nil {
        return
    }
    if argIsValue {
        argv = argv.Elem()
    }

    replyv = reflect.New(mtype.ReplyType.Elem())

    switch mtype.ReplyType.Elem().Kind() {
    case reflect.Map:
        replyv.Elem().Set(reflect.MakeMap(mtype.ReplyType.Elem()))
    case reflect.Slice:
        replyv.Elem().Set(reflect.MakeSlice(mtype.ReplyType.Elem(), 0, 0))
    }
    return
}

func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
    // Grab the request header.
    req = server.getRequest()
    err = codec.ReadRequestHeader(req)
    if err != nil {
        req = nil
        if err == io.EOF || err == io.ErrUnexpectedEOF {
            return
        }
        err = errors.New("rpc: server cannot decode request: " + err.Error())
        return
    }

    // We read the header successfully. If we see an error now,
    // we can still recover and move on to the next request.
    keepReading = true

    dot := strings.LastIndex(req.ServiceMethod, ".")
    if dot < 0 {
        err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
        return
    }
    serviceName := req.ServiceMethod[:dot]
    methodName := req.ServiceMethod[dot+1:]

    // Look up the request.
    svci, ok := server.serviceMap.Load(serviceName)
    if !ok {
        err = errors.New("rpc: can't find service " + req.ServiceMethod)
        return
    }
    svc = svci.(*service)
    mtype = svc.method[methodName]
    if mtype == nil {
        err = errors.New("rpc: can't find method " + req.ServiceMethod)
    }
    return
}

相关文章

  • net/rpc源码学习

    几个结构体类型 1、调用包的Register方法将接受者方法集发布到默认的服务端上2、ServerCodec实现读...

  • golang rpc源码问题记录

    源码位置 net/rpc/server.go 数据结构 在结构体Server中维护了freeReq, freeRe...

  • net/rpc

    rpc包提供了通过网络或其他I/O连接对一个对象的导出方法的访问 应用 比如我们在服务器注册一个对象,这个对象可以...

  • net/rpc

    rpc包提供了通过网络或者其他io连接,对一个对象导出方法的访问。服务器端注册一个对象,当做一个拥有类型名称的服务...

  • 微服务

    微服务简介 RPC 官方net/rpc包 多参数 new struct client: rpc和protobuf结...

  • 一个入门rpc框架的分析学习

    参考 huangyong-rpc[http://git.oschina.net/huangyong/rpc] 轻量...

  • Go RPC

    Go RPC Go 原生的网络RPC需要关联Go的net框架和库内容(net,tpc,http等等) 理论 总体来...

  • Go net/rpc

    Golang官方提供的net/rpc库使用encoding/gob进行编解码,支持TCP或HTTP数据传输方式,由...

  • dubbo源码分析(一):dubbo整体源码结构分析

    @(dubbo源码分析)[#rpc] dubbo源码分析(一):dubbo整体源码结构分析 dubbo整体的结构分...

  • 《RPC实战与核心原理》学习笔记Day5

    06 | RPC实战:剖析gRPC源码,动手实现一个完整的RPC 我们通过动态代理技术,屏蔽RPC调用的细节,从而...

网友评论

      本文标题:net/rpc源码学习

      本文链接:https://www.haomeiwen.com/subject/uxsckctx.html