[toc]
对grpc双向流的理解
一. 双向流的实现
1.1 proto
service RouteGuide {
// 定义一个双向流方法
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}
1.2 服务端
- 自动生成代码部分
// 服务端接口RouteGuideServer
type RouteGuideServer interface {
RouteChat(RouteGuide_RouteChatServer) error
}
// 发送/接收RouteGuide_RouteChatServer接口
type RouteGuide_RouteChatServer interface {
Send(*RouteNote) error
Recv() (*RouteNote, error)
grpc.ServerStream
}
// RouteGuide_RouteChatServer接口的实现
type routeGuideRouteChatServer struct {
grpc.ServerStream
}
func (x *routeGuideRouteChatServer) Send(m *RouteNote) error {
return x.ServerStream.SendMsg(m)
}
func (x *routeGuideRouteChatServer) Recv() (*RouteNote, error) {
m := new(RouteNote)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// 注册服务
func RegisterRouteGuideServer(s *grpc.Server, srv RouteGuideServer) {
s.RegisterService(&_RouteGuide_serviceDesc, srv)
}
- 深层源码
// 服务注册
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
ht := reflect.TypeOf(sd.HandlerType).Elem()
st := reflect.TypeOf(ss)
if !st.Implements(ht) {
grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
}
s.register(sd, ss)
}
// 核心逻辑
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
s.printf("RegisterService(%q)", sd.ServiceName)
if s.serve {
grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
}
if _, ok := s.m[sd.ServiceName]; ok {
grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
}
srv := &service{
server: ss,
md: make(map[string]*MethodDesc),
sd: make(map[string]*StreamDesc),
mdata: sd.Metadata,
}
// 赋值非流式方法
for i := range sd.Methods {
d := &sd.Methods[i]
srv.md[d.MethodName] = d
}
// 赋值流式方法
for i := range sd.Streams {
d := &sd.Streams[i]
srv.sd[d.StreamName] = d
}
// 注册服务
s.m[sd.ServiceName] = srv
}
- 调用部分
func main() {
// 监听,返回listener
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
grpcServer := grpc.NewServer(opts...)
// 注册服务
pb.RegisterRouteGuideServer(grpcServer, &routeGuideServer{})
// 使用服务 ,grpcServer.Serve(lis)内部调用 rawConn, err := lis.Accept() -> s.handleRawConn(rawConn)
grpcServer.Serve(lis)
}
// 实现RouteGuideServer接口
type routeGuideServer struct {
pb.UnimplementedRouteGuideServer
}
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
for {
...
// 接收
in, err := stream.Recv()
...
// 发送
err := stream.Send(note)
}
}
1.3 客户端
- 自动生成代码部分
// 客户端接口RouteGuideClient
type RouteGuideClient interface {
RouteChat(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RouteChatClient, error)
}
// 发送/接收RouteGuide_RouteChatClient
type RouteGuide_RouteChatClient interface {
Send(*RouteNote) error
Recv() (*RouteNote, error)
grpc.ClientStream
}
// 接口RouteGuide_RouteChatClient的实现
type routeGuideRouteChatClient struct {
grpc.ClientStream
}
// 发送
func (x *routeGuideRouteChatClient) Send(m *RouteNote) error {
return x.ClientStream.SendMsg(m)
}
// 接收
func (x *routeGuideRouteChatClient) Recv() (*RouteNote, error) {
m := new(RouteNote)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// RouteGuideClient接口的实现
type routeGuideClient struct {
cc grpc.ClientConnInterface
}
// 实现RouteGuideClient接口的实现
func (c *routeGuideClient) RouteChat(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RouteChatClient, error) {
stream, err := c.cc.NewStream(ctx, &_RouteGuide_serviceDesc.Streams[2], "/routeguide.RouteGuide/RouteChat", opts...)
if err != nil {
return nil, err
}
// 将stream赋值给grpc.ClientStream
x := &routeGuideRouteChatClient{stream}
return x, nil
}
// 客户的构造函数
// grpc.ClientConnInterface是一个grpc的连接
func NewRouteGuideClient(cc grpc.ClientConnInterface) RouteGuideClient {
return &routeGuideClient{cc}
}
- 客户端调用代码
// grpc拨号并返回socket连接
conn, err := grpc.Dial(*serverAddr, opts...)
// 传入grpc连接,生成服务的客户端
client := pb.NewRouteGuideClient(conn)
// 调用RouteChat返回流
stream, err := client.RouteChat返回流(ctx)
go func() {
for {
// 连续读取数据
in, err := stream.Recv()
}
}()
for _, note := range notes {
// 连续发送数据
stream.Send(note);
}
// 使用完记得关闭
stream.CloseSend()
网友评论