美文网首页
GRPC metadata的使用

GRPC metadata的使用

作者: 申_9a33 | 来源:发表于2022-06-18 18:14 被阅读0次

    metadata 使用

    1.修改 helloworld.proto

    syntax = "proto3";
    
    option go_package = "./helloworld";
    
    package helloworld;
    
    service Greeter {
      // 普通调用
      rpc UnaryEcho (HelloRequest) returns (HelloReply) {}
      // 服务流调用
      rpc ServerStreamingEcho(HelloRequest) returns (stream HelloReply) {}
      // 客户端流调用
      rpc ClientStreamingEcho(stream HelloRequest) returns (HelloReply) {}
      // 双向流调用
      rpc BidirectionalStreamingEcho(stream HelloRequest) returns (stream HelloReply) {}
    }
    
    message HelloRequest {
      string name = 1;
    }
    
    message HelloReply {
      string message = 1;
    }
    

    2.普通调用metadata数据使用

    2.1服务端代码

    func (s *server) UnaryEcho(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
        fmt.Println("---UnaryEcho---")
    
        defer func() {
            trailer := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
            grpc.SetTrailer(ctx, trailer)
        }()
    
        md, ok := metadata.FromIncomingContext(ctx)
    
        if !ok {
            return nil, status.Errorf(codes.DataLoss, "无法获取元数据")
        }
    
        if t, ok := md["timestamp"]; ok {
            fmt.Println("timestamp from metadata:")
            for i, e := range t {
                fmt.Printf("%d.%s\n", i, e)
            }
        }
    
        header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(time.StampNano)})
        grpc.SendHeader(ctx, header)
    
        fmt.Printf("已接受到的请求:%v,发送响应\n", in)
    
        return &pb.HelloReply{Message: "Hello again " + in.GetName()}, nil
    }
    
    • 1.在defer中调用SetTrailer;会在grpc关闭时在发送metadata数据,可以调用多次,多次调用会合并metadata数据

    • 2.调用FromIncomingContext;从context中解析出client请求时携带的metadata数据

    • 3.SendHeader发送metadata到客户端,只能调用一次

    • 4.返回数据到客户端

    2.2客户端代码

    func unaryCallWithMetadata(c pb.GreeterClient) {
        fmt.Println("--- unaryCall ---")
    
        // 创建metadata到context中.
        md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
        ctx := metadata.NewOutgoingContext(context.Background(), md)
    
        // 使用metadata的上下文创建RPC
        var header, trailer metadata.MD
        r, err := c.UnaryEcho(ctx, &pb.HelloRequest{Name: "unaryCall"}, grpc.Header(&header), grpc.Trailer(&trailer))
        if err != nil {
            log.Fatalf("调用UnaryEcho失败:%v", err)
        }
    
        if t, ok := header["timestamp"]; ok {
            fmt.Printf("timestamp from header:\n")
            for i, e := range t {
                fmt.Printf("%d.%s\n", i, e)
            }
        } else {
            log.Fatal("需要timestamp,但header中不存在timestamp")
        }
    
        if l, ok := header["location"]; ok {
            fmt.Printf("location from header:\n")
            for i, e := range l {
                fmt.Printf(" %d. %s\n", i, e)
            }
        } else {
            log.Fatal("需要location,但是header中不存在location")
        }
        fmt.Println("response:")
        fmt.Printf(" - %s\n", r.Message)
    
        if t, ok := trailer["timestamp"]; ok {
            fmt.Printf("timestamp from trailer:\n")
            for i, e := range t {
                fmt.Printf(" %d. %s\n", i, e)
            }
        } else {
            log.Fatal("需要timestamp,但header中不存在timestamp")
        }
    }
    
    • 1.NewOutgoingContext 将创建的metadata数据放入context中,在rpc调用时通过context将携带的metadata发送给服务端

    • 2.创建headertrailer用于rpc调用完成后,得到服务端返回的metadata数据,header是服务端刚开始调用时填充的数据,trailer是服务端调用完成后填充的数据

    整个流程

      1. 客户端调用metadata.NewOutgoingContextmetadata填充到context中;然后调用服务端方法c.UnaryEcho,传入context
      1. 服务端方法被调用时,调用metadata.FromIncomingContextcontext中拿到metadata数据
      1. 服务端调用grpc.SendHeader,发送的Header携带metadata数据
      1. 服务端处理完毕返回时,执行defer,调用grpc.SetTrailer,设置最后的metadata数据
      1. 客户端收到返回的数据时,会同时拿到HeaderTrailer,以及内部携带的metadata

    3.服务端Streammetadata数据使用

    3.1 服务端代码

    func (s *server) ServerStreamingEcho(in *pb.HelloRequest, stream pb.Greeter_ServerStreamingEchoServer) error {
        fmt.Printf("--- ServerStreamingEcho ---\n")
    
        defer func() {
            trailer := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
            stream.SetTrailer(trailer)
        }()
    
        md, ok := metadata.FromIncomingContext(stream.Context())
        if !ok {
            return status.Errorf(codes.DataLoss, "ServerStreamingEcho: 无法获取metadata")
        }
        if t, ok := md["timestamp"]; ok {
            fmt.Printf("timestamp from metadata:\n")
            for i, e := range t {
                fmt.Printf("%d.%s\n", i, e)
            }
        }
    
        header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(time.StampNano)})
        stream.SendHeader(header)
    
        fmt.Printf("收到的请求: %v\n", in)
    
        for i := 0; i < 10; i++ {
            fmt.Printf("echo message %v\n", in.Name)
    
            err := stream.Send(&pb.HelloReply{Message: "Hello again " + in.GetName()})
            if err != nil {
                return err
            }
        }
    
        return nil
    }
    
    • 与普通函数调用流程一致,只是将grpc.全部换成stream.

    3.2 客户端代码

        fmt.Printf("--- server streaming ---\n")
    
        // 创建metadata到context中.
        md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
        ctx := metadata.NewOutgoingContext(context.Background(), md)
    
        stream, err := c.ServerStreamingEcho(ctx, &pb.HelloRequest{Name: "serverStreamingWithMetadata"})
        if err != nil {
            log.Fatalf("调用ServerStreamingEcho失败: %v", err)
        }
    
        header, err := stream.Header()
        if err != nil {
            log.Fatalf("无法从stream中获取header: %v", err)
        }
    
        if t, ok := header["timestamp"]; ok {
            fmt.Printf("timestamp from header:\n")
            for i, e := range t {
                fmt.Printf(" %d. %s\n", i, e)
            }
        } else {
            log.Fatal("需要timestamp,但header中不存在timestamp")
        }
    
        if l, ok := header["location"]; ok {
            fmt.Printf("location from header:\n")
            for i, e := range l {
                fmt.Printf(" %d. %s\n", i, e)
            }
        } else {
            log.Fatal("需要location,但是header中不存在location")
        }
    
        // 读取所有的responses
        fmt.Printf("response:\n")
        var rpcStatus error
        for {
            r, err := stream.Recv()
            if err != nil {
                rpcStatus = err
                break
            }
    
            fmt.Printf(" - %s\n", r.Message)
        }
    
        if rpcStatus != io.EOF {
            log.Fatalf("failed to finish server streaming: %v", rpcStatus)
        }
    
        trailer := stream.Trailer()
    
        if t, ok := trailer["timestamp"]; ok {
            fmt.Printf("timestamp from trailer:\n")
            for i, e := range t {
                fmt.Printf(" %d. %s\n", i, e)
            }
        } else {
            log.Fatal("需要timestamp,但header中不存在timestamp")
        }
    

    整个流程

      1. 客户端客户端调用metadata.NewOutgoingContextmetadata放入context中;调用服务端方法c.ServerStreamingEcho,传入context
      1. 服务端方法被调用,使用metadata.FromIncomingContext,取出context中的metadata
      1. 服务端调用stream.SendHeader,发送Header中携带metadata,这个方法只能调用一次
      1. 客户端调用stream.Header(),然后从header中拿到metadata
      1. 服务端所有stream处理完毕,执行defer调用stream.SetTrailer,设置Trailer中的metadata
      1. 客户端接受完服务端所有的stream后,调用stream.Trailer(),从中拿到最后的metadata

    4.客户端Streammetadata数据使用

    4.1 服务端代码

    func (s *server) ClientStreamingEcho(stream pb.Greeter_ClientStreamingEchoServer) error {
        fmt.Printf("--- ClientStreamingEcho ---\n")
    
        defer func() {
            trailer := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
            stream.SetTrailer(trailer)
        }()
    
        md, ok := metadata.FromIncomingContext(stream.Context())
        if !ok {
            return status.Errorf(codes.DataLoss, "ClientStreamingEcho: failed to get metadata")
        }
        if t, ok := md["timestamp"]; ok {
            fmt.Printf("timestamp from metadata:\n")
            for i, e := range t {
                fmt.Printf(" %d. %s\n", i, e)
            }
        }
    
        header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(time.StampNano)})
        stream.SendHeader(header)
    
        // Read requests and send responses.
        for {
            in, err := stream.Recv()
            if err == io.EOF {
                fmt.Printf("echo last received message\n")
                return stream.SendAndClose(&pb.HelloReply{Message: "Hello again " + in.GetName()})
            }
    
            fmt.Printf("request received: %v, building echo\n", in)
            if err != nil {
                return err
            }
        }
    }
    

    4.2 客户端代码

    func clientStreamWithMetadata(c pb.GreeterClient) {
        fmt.Printf("--- client streaming ---\n")
        // Create metadata and context.
        md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
        ctx := metadata.NewOutgoingContext(context.Background(), md)
    
        // Make RPC using the context with the metadata.
        stream, err := c.ClientStreamingEcho(ctx)
        if err != nil {
            log.Fatalf("ClientStreamingEcho 调用失败: %v\n", err)
        }
    
        // Read the header when the header arrives.
        header, err := stream.Header()
        if err != nil {
            log.Fatalf("failed to get header from stream: %v", err)
        }
        // Read metadata from server's header.
        if t, ok := header["timestamp"]; ok {
            fmt.Printf("timestamp from header:\n")
            for i, e := range t {
                fmt.Printf(" %d. %s\n", i, e)
            }
        } else {
            log.Fatal("timestamp expected but doesn't exist in header")
        }
    
        if l, ok := header["location"]; ok {
            fmt.Printf("location from header:\n")
            for i, e := range l {
                fmt.Printf(" %d. %s\n", i, e)
            }
        } else {
            log.Fatal("location expected but doesn't exist in header")
        }
    
        // Send all requests to the server.
        for i := 0; i < 10; i++ {
            if err := stream.Send(&pb.HelloRequest{Name: "clientStreamWithMetadata"}); err != nil {
                log.Fatalf("failed to send streaming: %v\n", err)
            }
        }
    
        // Read the response.
        r, err := stream.CloseAndRecv()
        if err != nil {
            log.Fatalf("failed to CloseAndRecv: %v\n", err)
        }
        fmt.Printf("response:\n")
        fmt.Printf(" - %s\n\n", r.Message)
    
        // Read the trailer after the RPC is finished.
        trailer := stream.Trailer()
        // Read metadata from server's trailer.
        if t, ok := trailer["timestamp"]; ok {
            fmt.Printf("timestamp from trailer:\n")
            for i, e := range t {
                fmt.Printf(" %d. %s\n", i, e)
            }
        } else {
            log.Fatal("timestamp expected but doesn't exist in trailer")
        }
    }
    

    整个流程

      1. 客户端客户端调用metadata.NewOutgoingContextmetadata放入context中;接着调用服务端方法c.ClientStreamingEcho(ctx),并且传入context
      1. 服务端方法被调用,使用metadata.FromIncomingContext(stream.Context()),从context取到metadata
      1. 服务端调用stream.SendHeader,发送带有metadataHeader给客户端
      1. 客户端调用stream.Header(),解析Header中的metadata
      1. 服务端开始接受处理客户端发送的stream信息,处理完成后,运行defer调用stream.SetTrailer(trailer),设置Trailermetadata
      1. 客户端收到服务端处理完stream的返回后,调用stream.Trailer(),拿到最后放在Trailermetadata

    5.双向Streammetadata数据使用

    5.1 服务端代码

    func (s *server) BidirectionalStreamingEcho(stream pb.Greeter_BidirectionalStreamingEchoServer) error {
        fmt.Printf("--- BidirectionalStreamingEcho ---\n")
    
        defer func() {
            trailer := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
            stream.SetTrailer(trailer)
        }()
    
        // Read metadata from client.
        md, ok := metadata.FromIncomingContext(stream.Context())
        if !ok {
            return status.Errorf(codes.DataLoss, "BidirectionalStreamingEcho: failed to get metadata")
        }
    
        if t, ok := md["timestamp"]; ok {
            fmt.Printf("timestamp from metadata:\n")
            for i, e := range t {
                fmt.Printf(" %d. %s\n", i, e)
            }
        }
    
        // Create and send header.
        header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(time.StampNano)})
        stream.SendHeader(header)
    
        // Read requests and send responses.
        for {
            in, err := stream.Recv()
            if err == io.EOF {
                return nil
            }
            if err != nil {
                return err
            }
            fmt.Printf("request received %v, sending echo\n", in)
            if err := stream.Send(&pb.HelloReply{Message: "Hello again " + in.GetName()}); err != nil {
                return err
            }
        }
    }
    

    5.2 客户端代码

    func bidirectionalWithMetadata(c pb.GreeterClient) {
        fmt.Printf("--- bidirectional ---\n")
        // Create metadata and context.
        md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
        ctx := metadata.NewOutgoingContext(context.Background(), md)
    
        // Make RPC using the context with the metadata.
        stream, err := c.BidirectionalStreamingEcho(ctx)
        if err != nil {
            log.Fatalf("failed to call BidirectionalStreamingEcho: %v\n", err)
        }
    
        go func() {
            // Read the header when the header arrives.
            header, err := stream.Header()
            if err != nil {
                log.Fatalf("failed to get header from stream: %v", err)
            }
            // Read metadata from server's header.
            if t, ok := header["timestamp"]; ok {
                fmt.Printf("timestamp from header:\n")
                for i, e := range t {
                    fmt.Printf(" %d. %s\n", i, e)
                }
            } else {
                log.Fatal("timestamp expected but doesn't exist in header")
            }
            if l, ok := header["location"]; ok {
                fmt.Printf("location from header:\n")
                for i, e := range l {
                    fmt.Printf(" %d. %s\n", i, e)
                }
            } else {
                log.Fatal("location expected but doesn't exist in header")
            }
    
            // Send all requests to the server.
            for i := 0; i < 10; i++ {
                if err := stream.Send(&pb.HelloRequest{Name: "clientStreamWithMetadata"}); err != nil {
                    log.Fatalf("failed to send streaming: %v\n", err)
                }
            }
            stream.CloseSend()
        }()
    
        // Read all the responses.
        var rpcStatus error
        fmt.Printf("response:\n")
        for {
            r, err := stream.Recv()
            if err != nil {
                rpcStatus = err
                break
            }
            fmt.Printf(" - %s\n", r.Message)
        }
        if rpcStatus != io.EOF {
            log.Fatalf("failed to finish server streaming: %v", rpcStatus)
        }
    
        // Read the trailer after the RPC is finished.
        trailer := stream.Trailer()
        // Read metadata from server's trailer.
        if t, ok := trailer["timestamp"]; ok {
            fmt.Printf("timestamp from trailer:\n")
            for i, e := range t {
                fmt.Printf(" %d. %s\n", i, e)
            }
        } else {
            log.Fatal("timestamp expected but doesn't exist in trailer")
        }
    
    }
    

    整个流程

      1. 客户端客户端调用metadata.NewOutgoingContextmetadata放入context中;接着调用服务端方法c.BidirectionalStreamingEcho(ctx),并且传入context
      1. 服务端方法被调用,使用metadata.FromIncomingContext(stream.Context()),从context取到metadata
      1. 服务端调用stream.SendHeader,发送带有metadataHeader给客户端
      1. 客户端开启一个协程,调用stream.Header(),解析Header中的metadata,并且开始发送stream到服务端
      1. 服务端接收到客户端发送的stream数据,并通过stream响应数据到客户端,接收到客户端的stream.CloseSend()后,运行defer调用stream.SetTrailer(trailer),设置Trailermetadata
      1. 客户端处理服务端发送的stream的返回后,调用stream.Trailer(),拿到最后放在Trailermetadata

    源码

    相关文章

      网友评论

          本文标题:GRPC metadata的使用

          本文链接:https://www.haomeiwen.com/subject/tpflvrtx.html