从 protobuf 看 gRPC
protobuf
生成的 service.pb.go
文件
service.pb.go
中有三部分:
- 服务
Client
部分:XxxServiceClient
接口、实例及方法 - 服务
Server
部分:XxxServiceServer
接口、实例及方法 - 一个
ServiceDesc
变量var _XxxService_serviceDesc = grpc.ServiceDesc{...}
- 一个
init()
函数{ proto.RegisterFile("member/service.proto", fileDescriptor17) }
- 一个二进制字节数组变量
fileDescriptor17 = []byte{...}
fileDescriptor17
这个是每一个 proto 文件序列化后的二进制数据信息。17 是一个文件序号。
init()
init()
注册了 proto 文件
func init() { proto.RegisterFile("Xxx/service.proto", fileDescriptor5) }
在非 service 的 proto
中还注册了各个 Message
func init() {
proto.RegisterType((*BenchmarkRequest)(nil), "xxxrpc.common.benchmark.BenchmarkRequest")
proto.RegisterType((*BenchmarkStep)(nil), "xxxrpc.common.benchmark.BenchmarkStep")
proto.RegisterType((*ServiceCall)(nil), "xxxrpc.common.benchmark.ServiceCall")
}
ServiceDesc 变量
ServiceDesc 关联了服务和对应的服务方法。
var _XxxService_serviceDesc = grpc.ServiceDesc{
ServiceName: "xxxrpc.xxx.XxxService",
HandlerType: (*XxxServiceServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Benchmark",
Handler: _XxxService_Benchmark_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "xxx/service.proto",
}
客户端部分
- 定义了此服务的
Client
接口,即接口上实现了些服务提供的各个方法。 - 定义了 client 类型,类型中是一个 连接 的封装。
- 提供服务 Client 的工厂函数。
NewXxxServiceClient
。 - client 实例实现接口的所有函数。
type XxxServiceClient interface {
Benchmark(ctx context.Context, in *benchmark.BenchmarkRequest, opts ...grpc.CallOption) (*resp.EmptyResponse, error)
}
type xxxServiceClient struct {
cc *grpc.ClientConn
}
func NewXxxServiceClient(cc *grpc.ClientConn) XxxServiceClient {
return &xxxServiceClient{cc}
}
func (c *xxxServiceClient) Benchmark(ctx context.Context, in *benchmark.BenchmarkRequest, opts ...grpc.CallOption) (*resp.EmptyResponse, error) {
out := new(resp.EmptyResponse)
err := grpc.Invoke(ctx, "/xxxrpc.xxx.XxxService/Benchmark", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
接下来我们看一下 invoke
函数的调用。
// Invoke sends the RPC request on the wire and returns after response is
// received. This is typically called by generated code.
//
// DEPRECATED: Use ClientConn.Invoke instead.
func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) error {
return cc.Invoke(ctx, method, args, reply, opts...)
}
可以看到, grpc.Invoke
函数调用了 ClientConn 的 Invoke 函数。下面看一下 ClientConn.Invoke()
// Invoke sends the RPC request on the wire and returns after response is
// received. This is typically called by generated code.
//
// All errors returned by Invoke are compatible with the status package.
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = combine(cc.dopts.callOptions, opts)
if cc.dopts.unaryInt != nil {
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
}
return invoke(ctx, method, args, reply, cc, opts...)
}
func combine(o1 []CallOption, o2 []CallOption) []CallOption {
// we don't use append because o1 could have extra capacity whose
// elements would be overwritten, which could cause inadvertent
// sharing (and race conditions) between concurrent calls
if len(o1) == 0 {
return o2
} else if len(o2) == 0 {
return o1
}
ret := make([]CallOption, len(o1)+len(o2))
copy(ret, o1)
copy(ret[len(o1):], o2)
return ret
}
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(req); err != nil {
return err
}
return cs.RecvMsg(reply)
}
我们现在就分析一下 invoke()
的几个参数
-
ctx
参数不用多说 -
method
参数是"/xxxrpc.xxx.XxxService/Benchmark"
-
req
和reply
不用多说,是输入参数和输出参数 -
cc
是一个连接本身 -
opts
参数是一个CallOption
的数组,CallOption
是个接口,定义了两个方法before()
和after()
,很好理解,一个是调用前执行的函数,一个是调用结束后执行的函数。
type CallOption interface {
// before is called before the call is sent to any server. If before
// returns a non-nil error, the RPC fails with that error.
before(*callInfo) error
// after is called after the call has completed. after cannot return an
// error, so any failures should be reported via output parameters.
after(*callInfo)
}
如上,可看到 Invoke()
主要有两个方法,一个是 combine()
一个是 invoke()
-
combine()
只是将函数调用时的选项CallOption
与默认的选项合并在一起,然后返回(默认信息在前)。 -
invoke()
函数 new 了一个客户端流,然后发送数据,再返回接收到的数据。
后面再看源码,可以看到在 newClientStream()
返回的对象实现的接口方法,下文中有两个调用,分别是 SendMsg()
和 RecvMsg()
看接口处的注释:
-
SendMsg()
- 如果客户端出错直接返回,否则在 stream 上追加
io.EOF
以备被RecvMsg
接收到 - 它会 block 直到
- 有充足的流量来控制传输
- 完成(done)
- 跳出(break)
- 如果客户端出错直接返回,否则在 stream 上追加
-
RecvMsg()
- 它会 block 直到接收到消息,或 stream 完成
- 如果 stream 成功完成会返回一个
io.EOF
- 如果 stream 发生错误,它会取消并且返回错误信息(包含 RPC 状态)
网友评论