美文网首页
对grpc双向流的理解

对grpc双向流的理解

作者: 会理发的店小二 | 来源:发表于2020-07-15 23:56 被阅读0次

    [toc]

    对grpc双向流的理解

    一. 双向流的实现

    1.1 proto

    service RouteGuide {
      // 定义一个双向流方法
      rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    }
    

    1.2 服务端

    • 自动生成代码部分
    // 服务端接口RouteGuideServer
    type RouteGuideServer interface {
        RouteChat(RouteGuide_RouteChatServer) error
    }
    // 发送/接收RouteGuide_RouteChatServer接口
    type RouteGuide_RouteChatServer interface {
        Send(*RouteNote) error
        Recv() (*RouteNote, error)
        grpc.ServerStream
    }
    // RouteGuide_RouteChatServer接口的实现
    type routeGuideRouteChatServer struct {
        grpc.ServerStream
    }
    
    func (x *routeGuideRouteChatServer) Send(m *RouteNote) error {
        return x.ServerStream.SendMsg(m)
    }
    
    func (x *routeGuideRouteChatServer) Recv() (*RouteNote, error) {
        m := new(RouteNote)
        if err := x.ServerStream.RecvMsg(m); err != nil {
            return nil, err
        }
        return m, nil
    }
    // 注册服务
    func RegisterRouteGuideServer(s *grpc.Server, srv RouteGuideServer) {
        s.RegisterService(&_RouteGuide_serviceDesc, srv)
    }
    
    • 深层源码
    // 服务注册
    func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
        ht := reflect.TypeOf(sd.HandlerType).Elem()
        st := reflect.TypeOf(ss)
        if !st.Implements(ht) {
            grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
        }
        s.register(sd, ss)
    }
    // 核心逻辑
    func (s *Server) register(sd *ServiceDesc, ss interface{}) {
        s.mu.Lock()
        defer s.mu.Unlock()
        s.printf("RegisterService(%q)", sd.ServiceName)
        if s.serve {
            grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
        }
        if _, ok := s.m[sd.ServiceName]; ok {
            grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
        }
        srv := &service{
            server: ss,
            md:     make(map[string]*MethodDesc),
            sd:     make(map[string]*StreamDesc),
            mdata:  sd.Metadata,
        }
      // 赋值非流式方法
        for i := range sd.Methods {
            d := &sd.Methods[i]
            srv.md[d.MethodName] = d
        }
      // 赋值流式方法
        for i := range sd.Streams {
            d := &sd.Streams[i]
            srv.sd[d.StreamName] = d
        }
      // 注册服务
        s.m[sd.ServiceName] = srv
    }
    
    • 调用部分
    func main() {
      // 监听,返回listener
        lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
        grpcServer := grpc.NewServer(opts...)
      // 注册服务
      pb.RegisterRouteGuideServer(grpcServer, &routeGuideServer{})
      // 使用服务  ,grpcServer.Serve(lis)内部调用 rawConn, err := lis.Accept() ->   s.handleRawConn(rawConn)
        grpcServer.Serve(lis)
    }
    // 实现RouteGuideServer接口
    type routeGuideServer struct {
        pb.UnimplementedRouteGuideServer
    }
    
    
    func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
      for {
          ...
          // 接收 
          in, err := stream.Recv()
          ...
          // 发送
          err := stream.Send(note)
      }
    }
    
    

    1.3 客户端

    • 自动生成代码部分
    // 客户端接口RouteGuideClient
    type RouteGuideClient interface {
        RouteChat(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RouteChatClient, error)
    }
    
    // 发送/接收RouteGuide_RouteChatClient
    type RouteGuide_RouteChatClient interface {
        Send(*RouteNote) error
        Recv() (*RouteNote, error)
        grpc.ClientStream
    }
    
    // 接口RouteGuide_RouteChatClient的实现
    type routeGuideRouteChatClient struct {
        grpc.ClientStream
    }
    // 发送
    func (x *routeGuideRouteChatClient) Send(m *RouteNote) error {
        return x.ClientStream.SendMsg(m)
    }
    // 接收
    func (x *routeGuideRouteChatClient) Recv() (*RouteNote, error) {
        m := new(RouteNote)
        if err := x.ClientStream.RecvMsg(m); err != nil {
            return nil, err
        }
        return m, nil
    }
    
    // RouteGuideClient接口的实现
    type routeGuideClient struct {
        cc grpc.ClientConnInterface
    }
    
    // 实现RouteGuideClient接口的实现
    func (c *routeGuideClient) RouteChat(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RouteChatClient, error) {
        stream, err := c.cc.NewStream(ctx, &_RouteGuide_serviceDesc.Streams[2], "/routeguide.RouteGuide/RouteChat", opts...)
        if err != nil {
            return nil, err
        }
      // 将stream赋值给grpc.ClientStream
        x := &routeGuideRouteChatClient{stream}
        return x, nil
    }
    
    // 客户的构造函数
    // grpc.ClientConnInterface是一个grpc的连接
    func NewRouteGuideClient(cc grpc.ClientConnInterface) RouteGuideClient {
        return &routeGuideClient{cc}
    }
    
    
    • 客户端调用代码
    // grpc拨号并返回socket连接
    conn, err := grpc.Dial(*serverAddr, opts...)
    // 传入grpc连接,生成服务的客户端
    client := pb.NewRouteGuideClient(conn)
    // 调用RouteChat返回流
    stream, err := client.RouteChat返回流(ctx)
    go func() {
            for {
          // 连续读取数据
                in, err := stream.Recv()
            }
    }()
    for _, note := range notes {
      // 连续发送数据
        stream.Send(note);
    }
    // 使用完记得关闭
    stream.CloseSend()
    
    

    相关文章

      网友评论

          本文标题:对grpc双向流的理解

          本文链接:https://www.haomeiwen.com/subject/negshktx.html