net/rpc

作者: 第八共同体 | 来源:发表于2019-07-08 19:01 被阅读0次

rpc包提供了通过网络或者其他io连接,对一个对象导出方法的访问。服务器端注册一个对象,当做一个拥有类型名称的服务使其可见。注册后,导出方法可被远程访问,一个服务器可以注册多个不同类型的对象(或者服务),但是不能注册同一类型的多个对象。
满足一下条件的方法,可以用于远程访问,否则将被忽略:

  • 方法类型是可被导出的
  • 方法是可导出的
  • 方法有两个参数,都是可导出(或者内建)的类型
  • 方法的第二个参数需是指针类型
  • 方法返回错误类型

方法看起来如下样式:
func (t *T) MethodName(argType T1, replyType *T2) error
其中,T1和T2可以使用encoding/gob进行编码。其需求适用于其他编码要求。

方法的第一个参数代表调用端的参数,第二个参数代表返回给调用方的结果参数,方法的返回值,如果非空,就像一个字符串一样返回给客户端好像客户端调用errors.New创建的。如果返回了一个错误,返回值参数将不会被分返回给客户端。

调用方法等待远程调用结束,直到go方法异步加载调用并使用结构体Done ,除非指定特定编码,默认使用encoding/gob进行编码数据。

示例:
server.go

package server

import (
    "errors"
)

type Args struct {
    A, B int
}

type Quotient struct {
    Quo, Rem int
}

type Arith int

func (t *Arith) Multiply(args *Args, reply *int) error {
    *reply = args.A * args.B
    return nil
}

func (t *Arith) Divide(args *Args, quo *Quotient) error {
    if args.B == 0 {
        return errors.New("divide by zero")
    }
    quo.Quo = args.A / args.B
    quo.Rem = args.A % args.B
    return nil
}

server端main.go

package main

import (
    "log"
    "net"
    "net/http"
    "net/rpc"
    "serverproject/server"
)

func main() {
    arith := new(server.Arith)
  //// Register publishes the receiver's methods in the DefaultServer.
    rpc.Register(arith)
    rpc.HandleHTTP()
    l, e := net.Listen("tcp", ":1234")
    if e != nil {
        log.Fatal("listen error:", e)
    }
    go http.Serve(l, nil)
    <-make(chan struct{})
}

client端main.go

package main

import (
    "fmt"
    "log"
    "net/rpc"
    "serverproject/server"
)

func main() {
    client, err := rpc.DialHTTP("tcp", "127.0.0.1:1234")
    if err != nil {
        log.Fatal("dialing:", err)
    }
    args := &server.Args{7, 8}
    // sync
    var reply int
    err = client.Call("Arith.Multiply", args, &reply)
    if err != nil {
        log.Fatal("arith error:", err)
    }
    fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)

    // async
    quotient := new(server.Quotient)
    divCall := client.Go("Arith.Divide", args, quotient, nil)
    replyCall := <-divCall.Done // will be equal to divCall
    // check errors, print, etc.
    fmt.Println(replyCall)
    fmt.Printf("Arith: %d/%d=%d, %d", args.A, args.B, quotient.Quo, quotient.Rem)
}


看一下这部分的源码,其实Call方法内部也是调用Go方法。Go方法属于异步调用。它返回Call结构体代表调用。done channel在调用完成后发出信号。如果done为nil,Go会创建一个新的channel.如果非空,done需缓存,否则会crash.

// Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
    call := new(Call)
    call.ServiceMethod = serviceMethod
    call.Args = args
    call.Reply = reply
    if done == nil {
        done = make(chan *Call, 10) // buffered.
    } else {
        // If caller passes done != nil, it must arrange that
        // done has enough buffer for the number of simultaneous
        // RPCs that will be using that channel. If the channel
        // is totally unbuffered, it's best not to run at all.
        if cap(done) == 0 {
            log.Panic("rpc: done channel is unbuffered")
        }
    }
    call.Done = done
    client.send(call)
    return call
}

// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
    call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
    return call.Error
}

客户端的重点为:发送调用,接受响应

func (client *Client) send(call *Call) {
    client.reqMutex.Lock()
    defer client.reqMutex.Unlock()

    // Register this call.
    client.mutex.Lock()
    if client.shutdown || client.closing {
        client.mutex.Unlock()
        call.Error = ErrShutdown
        call.done()
        return
    }
    seq := client.seq
    client.seq++
    client.pending[seq] = call
    client.mutex.Unlock()

    // Encode and send the request.
    client.request.Seq = seq
    client.request.ServiceMethod = call.ServiceMethod
    err := client.codec.WriteRequest(&client.request, call.Args)
    if err != nil {
        client.mutex.Lock()
        call = client.pending[seq]
        delete(client.pending, seq)
        client.mutex.Unlock()
        if call != nil {
            call.Error = err
            call.done()
        }
    }
}

func (client *Client) input() {
    var err error
    var response Response
    for err == nil {
        response = Response{}
        err = client.codec.ReadResponseHeader(&response)
        if err != nil {
            break
        }
        seq := response.Seq
        client.mutex.Lock()
        call := client.pending[seq]
        delete(client.pending, seq)
        client.mutex.Unlock()

        switch {
        case call == nil:
            // We've got no pending call. That usually means that
            // WriteRequest partially failed, and call was already
            // removed; response is a server telling us about an
            // error reading request body. We should still attempt
            // to read error body, but there's no one to give it to.
            err = client.codec.ReadResponseBody(nil)
            if err != nil {
                err = errors.New("reading error body: " + err.Error())
            }
        case response.Error != "":
            // We've got an error response. Give this to the request;
            // any subsequent requests will get the ReadResponseBody
            // error if there is one.
            call.Error = ServerError(response.Error)
            err = client.codec.ReadResponseBody(nil)
            if err != nil {
                err = errors.New("reading error body: " + err.Error())
            }
            call.done()
        default:
            err = client.codec.ReadResponseBody(call.Reply)
            if err != nil {
                call.Error = errors.New("reading body " + err.Error())
            }
            call.done()
        }
    }
    // Terminate pending calls.
    client.reqMutex.Lock()
    client.mutex.Lock()
    client.shutdown = true
    closing := client.closing
    if err == io.EOF {
        if closing {
            err = ErrShutdown
        } else {
            err = io.ErrUnexpectedEOF
        }
    }
    for _, call := range client.pending {
        call.Error = err
        call.done()
    }
    client.mutex.Unlock()
    client.reqMutex.Unlock()
    if debugLog && err != io.EOF && !closing {
        log.Println("rpc: client protocol error:", err)
    }
}

相关文章

  • net/rpc

    rpc包提供了通过网络或其他I/O连接对一个对象的导出方法的访问 应用 比如我们在服务器注册一个对象,这个对象可以...

  • net/rpc

    rpc包提供了通过网络或者其他io连接,对一个对象导出方法的访问。服务器端注册一个对象,当做一个拥有类型名称的服务...

  • 微服务

    微服务简介 RPC 官方net/rpc包 多参数 new struct client: rpc和protobuf结...

  • 一个入门rpc框架的分析学习

    参考 huangyong-rpc[http://git.oschina.net/huangyong/rpc] 轻量...

  • Go RPC

    Go RPC Go 原生的网络RPC需要关联Go的net框架和库内容(net,tpc,http等等) 理论 总体来...

  • Go net/rpc

    Golang官方提供的net/rpc库使用encoding/gob进行编解码,支持TCP或HTTP数据传输方式,由...

  • Hadoop9 RPC介绍

    RPC 转载:http://blog.csdn.net/mindfloating/article/details/...

  • net/rpc源码学习

    几个结构体类型 1、调用包的Register方法将接受者方法集发布到默认的服务端上2、ServerCodec实现读...

  • grpc

    Go的RPC标准库 简单使用 Go语言标准库(net/rpc)的RPC规则:方法只能有两个可序列化的参数,其中第二...

  • /jsonrpc

    package jsonrpc import "net/rpc/jsonrpc" jsonrpc包实现了JSON-...

网友评论

      本文标题:net/rpc

      本文链接:https://www.haomeiwen.com/subject/vfnahctx.html