美文网首页
从 protobuf 看 gRPC

从 protobuf 看 gRPC

作者: Robin92 | 来源:发表于2020-04-13 18:48 被阅读0次

从 protobuf 看 gRPC

protobuf 生成的 service.pb.go 文件

service.pb.go 中有三部分:

  • 服务 Client 部分:XxxServiceClient 接口、实例及方法
  • 服务 Server 部分:XxxServiceServer 接口、实例及方法
  • 一个 ServiceDesc 变量 var _XxxService_serviceDesc = grpc.ServiceDesc{...}
  • 一个 init() 函数 { proto.RegisterFile("member/service.proto", fileDescriptor17) }
  • 一个二进制字节数组变量 fileDescriptor17 = []byte{...}

fileDescriptor17

这个是每一个 proto 文件序列化后的二进制数据信息。17 是一个文件序号。

init()

init() 注册了 proto 文件

func init() { proto.RegisterFile("Xxx/service.proto", fileDescriptor5) }

在非 service 的 proto 中还注册了各个 Message

func init() {
    proto.RegisterType((*BenchmarkRequest)(nil), "xxxrpc.common.benchmark.BenchmarkRequest")
    proto.RegisterType((*BenchmarkStep)(nil), "xxxrpc.common.benchmark.BenchmarkStep")
    proto.RegisterType((*ServiceCall)(nil), "xxxrpc.common.benchmark.ServiceCall")
}

ServiceDesc 变量

ServiceDesc 关联了服务和对应的服务方法。

var _XxxService_serviceDesc = grpc.ServiceDesc{
    ServiceName: "xxxrpc.xxx.XxxService",
    HandlerType: (*XxxServiceServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            MethodName: "Benchmark",
            Handler:    _XxxService_Benchmark_Handler,
        },
    },
    Streams:  []grpc.StreamDesc{},
    Metadata: "xxx/service.proto",
}

客户端部分

  • 定义了此服务的 Client 接口,即接口上实现了些服务提供的各个方法。
  • 定义了 client 类型,类型中是一个 连接 的封装。
  • 提供服务 Client 的工厂函数。NewXxxServiceClient
  • client 实例实现接口的所有函数。
type XxxServiceClient interface {
    Benchmark(ctx context.Context, in *benchmark.BenchmarkRequest, opts ...grpc.CallOption) (*resp.EmptyResponse, error)
}
type xxxServiceClient struct {
    cc *grpc.ClientConn
}

func NewXxxServiceClient(cc *grpc.ClientConn) XxxServiceClient {
    return &xxxServiceClient{cc}
}

func (c *xxxServiceClient) Benchmark(ctx context.Context, in *benchmark.BenchmarkRequest, opts ...grpc.CallOption) (*resp.EmptyResponse, error) {
    out := new(resp.EmptyResponse)
    err := grpc.Invoke(ctx, "/xxxrpc.xxx.XxxService/Benchmark", in, out, c.cc, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

接下来我们看一下 invoke 函数的调用。

// Invoke sends the RPC request on the wire and returns after response is
// received.  This is typically called by generated code.
//
// DEPRECATED: Use ClientConn.Invoke instead.
func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) error {
    return cc.Invoke(ctx, method, args, reply, opts...)
}

可以看到, grpc.Invoke 函数调用了 ClientConn 的 Invoke 函数。下面看一下 ClientConn.Invoke()


// Invoke sends the RPC request on the wire and returns after response is
// received.  This is typically called by generated code.
//
// All errors returned by Invoke are compatible with the status package.
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
    // allow interceptor to see all applicable call options, which means those
    // configured as defaults from dial option as well as per-call options
    opts = combine(cc.dopts.callOptions, opts)

    if cc.dopts.unaryInt != nil {
        return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
    }
    return invoke(ctx, method, args, reply, cc, opts...)
}

func combine(o1 []CallOption, o2 []CallOption) []CallOption {
    // we don't use append because o1 could have extra capacity whose
    // elements would be overwritten, which could cause inadvertent
    // sharing (and race conditions) between concurrent calls
    if len(o1) == 0 {
        return o2
    } else if len(o2) == 0 {
        return o1
    }
    ret := make([]CallOption, len(o1)+len(o2))
    copy(ret, o1)
    copy(ret[len(o1):], o2)
    return ret
}

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
    cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
    if err != nil {
        return err
    }
    if err := cs.SendMsg(req); err != nil {
        return err
    }
    return cs.RecvMsg(reply)
}

我们现在就分析一下 invoke() 的几个参数

  • ctx 参数不用多说
  • method 参数是 "/xxxrpc.xxx.XxxService/Benchmark"
  • reqreply 不用多说,是输入参数和输出参数
  • cc 是一个连接本身
  • opts 参数是一个 CallOption 的数组,CallOption 是个接口,定义了两个方法 before()after(),很好理解,一个是调用前执行的函数,一个是调用结束后执行的函数。
type CallOption interface {
    // before is called before the call is sent to any server.  If before
    // returns a non-nil error, the RPC fails with that error.
    before(*callInfo) error

    // after is called after the call has completed.  after cannot return an
    // error, so any failures should be reported via output parameters.
    after(*callInfo)
}

如上,可看到 Invoke() 主要有两个方法,一个是 combine() 一个是 invoke()

  • combine() 只是将函数调用时的选项 CallOption 与默认的选项合并在一起,然后返回(默认信息在前)。
  • invoke() 函数 new 了一个客户端流,然后发送数据,再返回接收到的数据。

后面再看源码,可以看到在 newClientStream() 返回的对象实现的接口方法,下文中有两个调用,分别是 SendMsg()RecvMsg()

看接口处的注释:

  • SendMsg()
    • 如果客户端出错直接返回,否则在 stream 上追加 io.EOF 以备被 RecvMsg 接收到
    • 它会 block 直到
      • 有充足的流量来控制传输
      • 完成(done)
      • 跳出(break)
  • RecvMsg()
    • 它会 block 直到接收到消息,或 stream 完成
    • 如果 stream 成功完成会返回一个 io.EOF
    • 如果 stream 发生错误,它会取消并且返回错误信息(包含 RPC 状态)

服务端部分

相关文章

网友评论

      本文标题:从 protobuf 看 gRPC

      本文链接:https://www.haomeiwen.com/subject/prinmhtx.html