rpc包提供了通过网络或者其他io连接,对一个对象导出方法的访问。服务器端注册一个对象,当做一个拥有类型名称的服务使其可见。注册后,导出方法可被远程访问,一个服务器可以注册多个不同类型的对象(或者服务),但是不能注册同一类型的多个对象。
满足一下条件的方法,可以用于远程访问,否则将被忽略:
- 方法类型是可被导出的
- 方法是可导出的
- 方法有两个参数,都是可导出(或者内建)的类型
- 方法的第二个参数需是指针类型
- 方法返回错误类型
方法看起来如下样式:
func (t *T) MethodName(argType T1, replyType *T2) error
其中,T1和T2可以使用encoding/gob进行编码。其需求适用于其他编码要求。
方法的第一个参数代表调用端的参数,第二个参数代表返回给调用方的结果参数,方法的返回值,如果非空,就像一个字符串一样返回给客户端好像客户端调用errors.New创建的。如果返回了一个错误,返回值参数将不会被分返回给客户端。
调用方法等待远程调用结束,直到go方法异步加载调用并使用结构体Done ,除非指定特定编码,默认使用encoding/gob进行编码数据。
示例:
server.go
package server
import (
"errors"
)
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
type Arith int
func (t *Arith) Multiply(args *Args, reply *int) error {
*reply = args.A * args.B
return nil
}
func (t *Arith) Divide(args *Args, quo *Quotient) error {
if args.B == 0 {
return errors.New("divide by zero")
}
quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
return nil
}
server端main.go
package main
import (
"log"
"net"
"net/http"
"net/rpc"
"serverproject/server"
)
func main() {
arith := new(server.Arith)
//// Register publishes the receiver's methods in the DefaultServer.
rpc.Register(arith)
rpc.HandleHTTP()
l, e := net.Listen("tcp", ":1234")
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
<-make(chan struct{})
}
client端main.go
package main
import (
"fmt"
"log"
"net/rpc"
"serverproject/server"
)
func main() {
client, err := rpc.DialHTTP("tcp", "127.0.0.1:1234")
if err != nil {
log.Fatal("dialing:", err)
}
args := &server.Args{7, 8}
// sync
var reply int
err = client.Call("Arith.Multiply", args, &reply)
if err != nil {
log.Fatal("arith error:", err)
}
fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)
// async
quotient := new(server.Quotient)
divCall := client.Go("Arith.Divide", args, quotient, nil)
replyCall := <-divCall.Done // will be equal to divCall
// check errors, print, etc.
fmt.Println(replyCall)
fmt.Printf("Arith: %d/%d=%d, %d", args.A, args.B, quotient.Quo, quotient.Rem)
}
看一下这部分的源码,其实Call方法内部也是调用Go方法。Go方法属于异步调用。它返回Call结构体代表调用。done channel在调用完成后发出信号。如果done为nil,Go会创建一个新的channel.如果非空,done需缓存,否则会crash.
// Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
// If caller passes done != nil, it must arrange that
// done has enough buffer for the number of simultaneous
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}
// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
客户端的重点为:发送调用,接受响应
func (client *Client) send(call *Call) {
client.reqMutex.Lock()
defer client.reqMutex.Unlock()
// Register this call.
client.mutex.Lock()
if client.shutdown || client.closing {
client.mutex.Unlock()
call.Error = ErrShutdown
call.done()
return
}
seq := client.seq
client.seq++
client.pending[seq] = call
client.mutex.Unlock()
// Encode and send the request.
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}
func (client *Client) input() {
var err error
var response Response
for err == nil {
response = Response{}
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
}
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
switch {
case call == nil:
// We've got no pending call. That usually means that
// WriteRequest partially failed, and call was already
// removed; response is a server telling us about an
// error reading request body. We should still attempt
// to read error body, but there's no one to give it to.
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
case response.Error != "":
// We've got an error response. Give this to the request;
// any subsequent requests will get the ReadResponseBody
// error if there is one.
call.Error = ServerError(response.Error)
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
call.done()
default:
err = client.codec.ReadResponseBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
// Terminate pending calls.
client.reqMutex.Lock()
client.mutex.Lock()
client.shutdown = true
closing := client.closing
if err == io.EOF {
if closing {
err = ErrShutdown
} else {
err = io.ErrUnexpectedEOF
}
}
for _, call := range client.pending {
call.Error = err
call.done()
}
client.mutex.Unlock()
client.reqMutex.Unlock()
if debugLog && err != io.EOF && !closing {
log.Println("rpc: client protocol error:", err)
}
}
网友评论