const (
// 被用于HandleHTTP方法
DefaultRPCPath = "/_goRPC_"
DefaultDebugPath = "/debug/rpc"
// 验证error的reflect type.不能直接使用error
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
type methodType struct {
sync.Mutex // protects counters
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
numCalls uint
type service struct {
name string // name of service
rcvr reflect.Value // receiver of methods for the service
typ reflect.Type // type of the receiver
method map[string]*methodType // registered methods
// 内部使用,每个rpc调用之前的header写入
type Request struct {
ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client
next *Request // for free list in Server
type Response struct {
ServiceMethod string // echoes that of the Request
Seq uint64 // echoes that of the request
Error string // error, if any.
next *Response // for free list in Server
// 代表rpc服务端
type Server struct {
serviceMap sync.Map // map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *Request
respLock sync.Mutex // protects freeResp
freeResp *Response
// NewServer returns a new Server.
func NewServer() *Server {
return &Server{}
// DefaultServer is the default instance of *Server.
var DefaultServer = NewServer()
// Register publishes the receiver's methods in the DefaultServer.
func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
// RegisterName is like Register but uses the provided name for the type
// instead of the receiver's concrete type.
func RegisterName(name string, rcvr interface{}) error {
return DefaultServer.RegisterName(name, rcvr)
// A ServerCodec implements reading of RPC requests and writing of
// RPC responses for the server side of an RPC session.
// The server calls ReadRequestHeader and ReadRequestBody in pairs
// to read requests from the connection, and it calls WriteResponse to
// write a response back. The server calls Close when finished with the
// connection. ReadRequestBody may be called with a nil
// argument to force the body of the request to be read and discarded.
// See NewClient's comment for information about concurrent access.
type ServerCodec interface {
ReadRequestHeader(*Request) error
ReadRequestBody(interface{}) error
WriteResponse(*Response, interface{}) error
// Close can be called multiple times and must be idempotent.
Close() error
// ServeConn runs the DefaultServer on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
// The caller typically invokes ServeConn in a go statement.
// ServeConn uses the gob wire format (see package gob) on the
// connection. To use an alternate codec, use ServeCodec.
// See NewClient's comment for information about concurrent access.
func ServeConn(conn io.ReadWriteCloser) {
// ServeCodec is like ServeConn but uses the specified codec to
// decode requests and encode responses.
func ServeCodec(codec ServerCodec) {
// ServeRequest is like ServeCodec but synchronously serves a single request.
// It does not close the codec upon completion.
func ServeRequest(codec ServerCodec) error {
return DefaultServer.ServeRequest(codec)
// Accept accepts connections on the listener and serves requests
// to DefaultServer for each incoming connection.
// Accept blocks; the caller typically invokes it in a go statement.
func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
// Can connect to RPC service using HTTP CONNECT to rpcPath.
var connected = "200 Connected to Go RPC"
// ServeHTTP implements an http.Handler that answers RPC requests.
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "CONNECT" {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
io.WriteString(w, "405 must CONNECT\n")
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
// HandleHTTP registers an HTTP handler for RPC messages on rpcPath,
// and a debugging handler on debugPath.
// It is still necessary to invoke http.Serve(), typically in a go statement.
func (server *Server) HandleHTTP(rpcPath, debugPath string) {
http.Handle(rpcPath, server)
http.Handle(debugPath, debugHTTP{server})
// HandleHTTP registers an HTTP handler for RPC messages to DefaultServer
// on DefaultRPCPath and a debugging handler on DefaultDebugPath.
// It is still necessary to invoke http.Serve(), typically in a go statement.
func HandleHTTP() {
DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
net/rpc默认使用的是该编码方式,在实际使用中, 可以根据情况,使用第三方的组件,如protobuf proto
type gobServerCodec struct {
rwc io.ReadWriteCloser
dec *gob.Decoder
enc *gob.Encoder
encBuf *bufio.Writer
closed bool
func (c *gobServerCodec) ReadRequestHeader(r *Request) error {
return c.dec.Decode(r)
func (c *gobServerCodec) ReadRequestBody(body interface{}) error {
return c.dec.Decode(body)
func (c *gobServerCodec) WriteResponse(r *Response, body interface{}) (err error) {
if err = c.enc.Encode(r); err != nil {
if c.encBuf.Flush() == nil {
// Gob couldn't encode the header. Should not happen, so if it does,
// shut down the connection to signal that the connection is broken.
log.Println("rpc: gob error encoding response:", err)
if err = c.enc.Encode(body); err != nil {
if c.encBuf.Flush() == nil {
// Was a gob problem encoding the body but the header has been written.
// Shut down the connection to signal that the connection is broken.
log.Println("rpc: gob error encoding body:", err)
return c.encBuf.Flush()
func (c *gobServerCodec) Close() error {
if c.closed {
// Only call c.rwc.Close once; otherwise the semantics are undefined.
return nil
c.closed = true
return c.rwc.Close()
// Register publishes in the server the set of methods of the
// receiver value that satisfy the following conditions:
// - exported method of exported type
// - two arguments, both of exported type
// - the second argument is a pointer
// - one return value, of type error
// It returns an error if the receiver is not an exported type or has
// no suitable methods. It also logs the error using package log.
// The client accesses each method using a string of the form "Type.Method",
// where Type is the receiver's concrete type.
func (server *Server) Register(rcvr interface{}) error {
return server.register(rcvr, "", false)
// RegisterName is like Register but uses the provided name for the type
// instead of the receiver's concrete type.
func (server *Server) RegisterName(name string, rcvr interface{}) error {
return server.register(rcvr, name, true)
func (server *Server) register(rcvr interface{}, name string, useName bool) error {
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name()
if useName {
sname = name
if sname == "" {
s := "rpc.Register: no service name for type " + s.typ.String()
return errors.New(s)
if !isExported(sname) && !useName {
s := "rpc.Register: type " + sname + " is not exported"
return errors.New(s)
s.name = sname
// Install the methods
s.method = suitableMethods(s.typ, true)
if len(s.method) == 0 {
str := ""
// To help the user, see if a pointer receiver would work.
method := suitableMethods(reflect.PtrTo(s.typ), false)
if len(method) != 0 {
str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
} else {
str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
return errors.New(str)
if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
return errors.New("rpc: service already defined: " + sname)
return nil
// suitableMethods returns suitable Rpc methods of typ, it will report
// error using log if reportErr is true.
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
methods := make(map[string]*methodType)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := method.Name
// Method must be exported.
if method.PkgPath != "" {
// Method needs three ins: receiver, *args, *reply.
if mtype.NumIn() != 3 {
if reportErr {
log.Printf("rpc.Register: method %q has %d input parameters; needs exactly three\n", mname, mtype.NumIn())
// First arg need not be a pointer.
argType := mtype.In(1)
if !isExportedOrBuiltinType(argType) {
if reportErr {
log.Printf("rpc.Register: argument type of method %q is not exported: %q\n", mname, argType)
// Second arg must be a pointer.
replyType := mtype.In(2)
if replyType.Kind() != reflect.Ptr {
if reportErr {
log.Printf("rpc.Register: reply type of method %q is not a pointer: %q\n", mname, replyType)
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
if reportErr {
log.Printf("rpc.Register: reply type of method %q is not exported: %q\n", mname, replyType)
// Method needs one out.
if mtype.NumOut() != 1 {
if reportErr {
log.Printf("rpc.Register: method %q has %d output parameters; needs exactly one\n", mname, mtype.NumOut())
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
if reportErr {
log.Printf("rpc.Register: return type of method %q is %q, must be error\n", mname, returnType)
methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
return methods
err = client.Call("Arith.Multiply", args, &reply)
if err != nil {
log.Fatal("arith error:", err)
if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
return errors.New("rpc: service already defined: " + sname)
// A value sent as a placeholder for the server's response value when the server
// receives an invalid request. It is never decoded by the client since the Response
// contains an error when it is used.
var invalidRequest = struct{}{}
func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
resp := server.getResponse()
// Encode the response header
resp.ServiceMethod = req.ServiceMethod
if errmsg != "" {
resp.Error = errmsg
reply = invalidRequest
resp.Seq = req.Seq
err := codec.WriteResponse(resp, reply)
if debugLog && err != nil {
log.Println("rpc: writing response:", err)
func (m *methodType) NumCalls() (n uint) {
n = m.numCalls
return n
func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
if wg != nil {
defer wg.Done()
function := mtype.method.Func
// Invoke the method, providing a new value for the reply.
returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
// The return value for the method is an error.
errInter := returnValues[0].Interface()
errmsg := ""
if errInter != nil {
errmsg = errInter.(error).Error()
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
// The caller typically invokes ServeConn in a go statement.
// ServeConn uses the gob wire format (see package gob) on the
// connection. To use an alternate codec, use ServeCodec.
// See NewClient's comment for information about concurrent access.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
buf := bufio.NewWriter(conn)
srv := &gobServerCodec{
rwc: conn,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
encBuf: buf,
// ServeCodec is like ServeConn but uses the specified codec to
// decode requests and encode responses.
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
if debugLog && err != io.EOF {
log.Println("rpc:", err)
if !keepReading {
// send a response if we actually managed to read a header.
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error())
go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
// We've seen that there are no more requests.
// Wait for responses to be sent before closing codec.
// ServeRequest is like ServeCodec but synchronously serves a single request.
// It does not close the codec upon completion.
func (server *Server) ServeRequest(codec ServerCodec) error {
sending := new(sync.Mutex)
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
if !keepReading {
return err
// send a response if we actually managed to read a header.
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error())
return err
service.call(server, sending, nil, mtype, req, argv, replyv, codec)
return nil
func (server *Server) getRequest() *Request {
req := server.freeReq
if req == nil {
req = new(Request)
} else {
server.freeReq = req.next
*req = Request{}
return req
func (server *Server) freeRequest(req *Request) {
req.next = server.freeReq
server.freeReq = req
func (server *Server) getResponse() *Response {
resp := server.freeResp
if resp == nil {
resp = new(Response)
} else {
server.freeResp = resp.next
*resp = Response{}
return resp
func (server *Server) freeResponse(resp *Response) {
resp.next = server.freeResp
server.freeResp = resp
func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
service, mtype, req, keepReading, err = server.readRequestHeader(codec)
if err != nil {
if !keepReading {
// discard body
// Decode the argument value.
argIsValue := false // if true, need to indirect before calling.
if mtype.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(mtype.ArgType.Elem())
} else {
argv = reflect.New(mtype.ArgType)
argIsValue = true
// argv guaranteed to be a pointer now.
if err = codec.ReadRequestBody(argv.Interface()); err != nil {
if argIsValue {
argv = argv.Elem()
replyv = reflect.New(mtype.ReplyType.Elem())
switch mtype.ReplyType.Elem().Kind() {
case reflect.Map:
case reflect.Slice:
replyv.Elem().Set(reflect.MakeSlice(mtype.ReplyType.Elem(), 0, 0))
func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
// Grab the request header.
req = server.getRequest()
err = codec.ReadRequestHeader(req)
if err != nil {
req = nil
if err == io.EOF || err == io.ErrUnexpectedEOF {
err = errors.New("rpc: server cannot decode request: " + err.Error())
// We read the header successfully. If we see an error now,
// we can still recover and move on to the next request.
keepReading = true
dot := strings.LastIndex(req.ServiceMethod, ".")
if dot < 0 {
err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
serviceName := req.ServiceMethod[:dot]
methodName := req.ServiceMethod[dot+1:]
// Look up the request.
svci, ok := server.serviceMap.Load(serviceName)
if !ok {
err = errors.New("rpc: can't find service " + req.ServiceMethod)
svc = svci.(*service)
mtype = svc.method[methodName]
if mtype == nil {
err = errors.New("rpc: can't find method " + req.ServiceMethod)