美文网首页
Dive into gRPC(6):metadata

Dive into gRPC(6):metadata

作者: 起名难倒英雄汉 | 来源:发表于2018-10-11 09:46 被阅读0次

    验证客户端中,客户端通过在metadata中加入客户端需要的数据来进行验证,服务器收到请求后,可以从metadata中获得客户端的数据,然后在interceptor中对客户端的数据进行验证。

    上面的这个过程涉及到了两个部分的内容:metadata和interceptor。在这篇文章中,我们介绍一下metadata的概念以及使用。

    1. 简介

    gRPC让我们可以像本地调用一样实现远程调用,对于每一次的RPC调用中,都可能会有一些有用的数据,而这些数据就可以通过metadata来传递。metadata是以key-value的形式存储数据的,其中key是string类型,而value是[]string,即一个字符串数组类型。metadata使得client和server能够为对方提供关于本次调用的一些信息,就像一次http请求的RequestHeader和ResponseHeader一样。http中header的生命周周期是一次http请求,那么metadata的生命周期就是一次RPC调用。

    2. 创建metadata

    为了使用metadata,我们需要在程序中导入相应的包:

    import "google.golang.org/grpc/metadata"
    

    这个包中实现了多种创建metadata的方法。首先我们来看看metadata的定义:

    type MD map[string][]string
    

    就像之前说的,metadata其实就是一个map。注意metadata的value是一个字符串数组,意味着我们可以对同一个key添加多个value。

    2.1 New一个metadata

    在metadata包中,我们可以通过New方法创建一个新的metadata:

    md := metadata.New(map[string]string{"key1":"value1","key2":"value2"})
    

    我们也可以通过Pairs方法来创建一个新的metadata:

    md := metadata.Pairs(
        "key1", "value1",
        "key1", "value1.2", // "key1" will have map value []string{"value1", "value1.2"}
        "key2", "value2",
    )
    

    Pairs方法的参数需要是偶数个。同时,对于相同的key,value值会被合并到一个数组中。

    还有一个需要注意的地方就是,metadata中key是不区分大小写的,也就是说key1KEY1是同一个key,这对于NewPairs是一样的。

    2.2 存储二进制数据

    在metadata中,key永远是string类型,但是value可以是string也可以是二进制数据。为了在metadata中存储二进制数据,我们仅仅需要在key的后面加上一个-bin后缀。具有-bin后缀的key所对应的value在创建metadata时会被编码(base64),收到的时候会被解码:

    md := metadata.Pairs(
        "key", "string value",
        "key-bin", string([]byte{96, 102}),
    )
    

    知道了如何创建metadata后,我们来看看在client和server中如何发送以及接收metadata。

    3. 在client中发送以及接收metadata

    3.1 发送metadata

    在client中可以通过两个方法将metadata发送到server端。通过AppendToOutgoingContext方法可以将key-value对添加到已有的context中。如果对应的context没有metadata,那么就会创建一个;如果已有metadata了,那么就将数据添加到原来的metadata中:

    // create a new context with some metadata
    ctx := metadata.AppendToOutgoingContext(ctx, "k1", "v1", "k1", "v2", "k2", "v3")
    
    // later, add some more metadata to the context (e.g. in an interceptor)
    ctx := metadata.AppendToOutgoingContext(ctx, "k3", "v4")
    
    // make unary RPC
    response, err := client.SomeRPC(ctx, someRequest)
    
    // or make streaming RPC
    stream, err := client.SomeStreamingRPC(ctx)
    

    另一个方法是通过NewOutgoingContext方法将新创建的metadata添加到context中,这样会覆盖掉原来已有的metadata,所以使用的时候需要注意。同时,这个方法比AppendToOutgoingContext方法要慢,因此推荐使用AppendToOutgoingContext

    // create a new context with some metadata
    md := metadata.Pairs("k1", "v1", "k1", "v2", "k2", "v3")
    ctx := metadata.NewOutgoingContext(context.Background(), md)
    
    // later, add some more metadata to the context (e.g. in an interceptor)
    md, _ := metadata.FromOutgoingContext(ctx)
    newMD := metadata.Pairs("k3", "v3")
    ctx = metadata.NewContext(ctx, metadata.Join(metadata.New(send), newMD))
    
    // make unary RPC
    response, err := client.SomeRPC(ctx, someRequest)
    
    // or make streaming RPC
    stream, err := client.SomeStreamingRPC(ctx)
    

    3.2 接收metadata

    客户端可以接收的metadata只有header和trailer。由于RPC调用分为两种:普通的(unary)和流式的(Streaming),所以接收metadata的方式也不一样。

    Unary Call

    在普通的调用中,我们可以使用grpc.Header()grpc.Trailer()方法来接收:

    var header, trailer metadata.MD // variable to store header and trailer
    r, err := client.SomeRPC(
        ctx,
        someRequest,
        grpc.Header(&header),    // will retrieve header
        grpc.Trailer(&trailer),  // will retrieve trailer
    )
    
    // do something with header and trailer
    

    其实这两个方法是创建了两个CallOption

    Streaming Call

    在之前的文章中我们介绍过,Streaming方式的调用包括三种:

    • Server streaming RPC
    • Client streaming RPC
    • Bidirectional streaming RPC

    而相应的Header和Trailer可以通过调用返回的ClientStream接口的Header()Trailer()方法接收:

    stream, err := client.SomeStreamingRPC(ctx)
    
    // retrieve header
    header, err := stream.Header()
    
    // retrieve trailer
    trailer := stream.Trailer()
    

    下面我们使用一个具体的例子来演示一下。

    3.3 一个例子

    在我们的simplemath服务中,我们实现了四个方法,分别对应四种RPC调用:

    • GreatCommonDivisor: Unary call
    • GetFibonacci: Server streaming call
    • Statistics: Client streaming call
    • PrimeFactorization: Bidirectional streaming call

    在上面的介绍中,Streaming Call接收metadata的方式是一样的,因此这里仅仅在GreatCommonDivisorPrimeFactorization中演示。

    首先,我们定义一个常量:

    const (
        timestampFormat = time.StampNano
    )
    

    然后:

    func GreatCommonDivisor(first, second string) {
        conn, err := getGRPCConn()
        if err != nil {
            log.Fatalf("did not connect: %v", err)
        }
        defer conn.Close()
        a, _ := strconv.ParseInt(first, 10, 32)
        b, _ := strconv.ParseInt(second, 10, 32)
        ctx, cancel := context.WithTimeout(context.Background(), time.Second)
        // sending metadata to server: create a new context with some metadata
        ctx = metadata.AppendToOutgoingContext(ctx, "timestamp", time.Now().Format(timestampFormat))
        defer cancel()
        rsp := pb.GCDResponse{}
    
        var header, trailer metadata.MD
        err = conn.Invoke(ctx, "/api.SimpleMath/GreatCommonDivisor", &pb.GCDRequest{First: int32(a), Second: int32(b)}, &rsp, grpc.Header(&header), grpc.Trailer(&trailer))
        if err != nil {
            log.Fatalf("could not compute: %v", err)
        }
    
        // receiving metadata from server: get the Header and Trailer metadata
        if t, ok := header["timestamp"]; ok {
            log.Printf("timestamp from header: ")
            for i, e := range t {
                log.Printf(" %d. %s", i, e)
            }
        }
        if t, ok := trailer["timestamp"]; ok {
            log.Printf("timestamp from trailer: ")
            for i, e := range t {
                log.Printf(" %d. %s", i, e)
            }
        }
    
        log.Printf("The Greatest Common Divisor of %d and %d is %d", a, b, rsp.Result)
    }
    

    在上面的代码中,我们使用AppendToOutgoingContext方法创建一个metadata,将当前的时间传过去,并通过grpc.Header()grpc.Trailer()方法接收server发送的metadata。

    由于metadata的value是一个string类型的数组,所以我们使用for循环来进行遍历。

    接下来是Streaming:

    func PrimeFactorization(count string) {
        conn, err := getGRPCConn()
        if err != nil {
            log.Fatalf("did not connect: %v", err)
        }
        defer conn.Close()
        client := pb.NewSimpleMathClient(conn)
        // sending metadata to server: create a new context with some metadata
        ctx := metadata.AppendToOutgoingContext(context.Background(), "timestamp", time.Now().Format(timestampFormat))
        stream, err := client.PrimeFactorization(ctx)
        if err != nil {
            log.Fatalf("failed to compute: %v", err)
        }
        waitc := make(chan struct{})
    
        go func() {
            for {
                in, err := stream.Recv()
                if err == io.EOF {
                    close(waitc)
                    break
                }
                if err != nil {
                    log.Fatalf("failed to recv: %v", err)
                }
                log.Printf(in.Result)
            }
            // receiving metadata from server: read trailer
            trailer := stream.Trailer()
            if t, ok := trailer["timestamp"]; ok {
                log.Printf("timestamp from trailer: ")
                for i, e := range t {
                    log.Printf(" %d. %s", i, e)
                }
            }
        }()
    
        num, _ := strconv.ParseInt(count, 10, 32)
        r := rand.New(rand.NewSource(time.Now().UnixNano()))
        var nums []int
        for i := 0; i < int(num); i++ {
            nums = append(nums, r.Intn(1000))
        }
        // receiving metadata from server: read header
        header, err := stream.Header()
        if err != nil {
            log.Fatalf("failed to read header from stream: %v", err)
        }
        if t, ok := header["timestamp"]; ok {
            log.Printf("timestamp from header: ")
            for i, e := range t {
                log.Printf(" %d. %s", i, e)
            }
        }
        for _, n := range nums {
            if err := stream.Send(&pb.PrimeFactorizationRequest{Number: int32(n)}); err != nil {
                log.Fatalf("failed to send: %v", err)
            }
            log.Printf("send number: %d", n)
        }
        stream.CloseSend()
        <-waitc
    }
    

    这里,发送metadata是一样的,不同在于header和trailer的接收上。在Streaming中,header的接收应该在client发送数据之前,trailer的接收应该在接收数据之后。

    4. 在server中发送以及接收metadata

    4.1 接收metadata

    服务器需要在RPC调用中的context中获取客户端发送的metadata。如果是一个普通的RPC调用,那么就可以直接用context;如果是一个Streaming调用,服务器需要从相应的stream里获取context,然后获取metadata。

    Unary Call

    func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
        md, ok := metadata.FromIncomingContext(ctx)
        // do something with metadata
    }
    

    Streaming Call

    func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
        md, ok := metadata.FromIncomingContext(stream.Context()) // get context from stream
        // do something with metadata
    }
    

    4.2 发送metadata

    前面我们说过,client可以接收的metadata只有header和trailer,因此server也只能发送header和trailer。

    Unary Call

    在普通的RPC调用中,服务器可以通过grpc模块的SendHeaderSetTrailer方法向client发送header和trailer。这两个方法的第一个参数都是context:

    func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
        // create and send header
        header := metadata.Pairs("header-key", "val")
        grpc.SendHeader(ctx, header)
        // create and set trailer
        trailer := metadata.Pairs("trailer-key", "val")
        grpc.SetTrailer(ctx, trailer)
    }
    

    Streaming Call

    对于Streaming调用来说,我们同样也可以使用SendHeaderSetTrailer方法,只不过这两个方法是接口ServerStream的方法:

    func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
        // create and send header
        header := metadata.Pairs("header-key", "val")
        stream.SendHeader(header)
        // create and set trailer
        trailer := metadata.Pairs("trailer-key", "val")
        stream.SetTrailer(trailer)
    }
    

    4.3 一个栗子

    在前面我们实现了client的一个例子,在这里我们实现对应的server端的代码。

    同样,我们先定一个常量:

    const (
        timestampFormat = time.StampNano
    )
    

    然后是GreatCommonDivisor对应的代码:

    func (sms *SimpleMathServer) GreatCommonDivisor(ctx context.Context, in *pb.GCDRequest) (*pb.GCDResponse, error) {
        // sending metadata to client: create trailer, using defer to record timestamp of function return
        defer func() {
            trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
            grpc.SetTrailer(ctx, trailer)
        }()
    
        // receiving metadata from client: get metadata from context
        md, _ := metadata.FromIncomingContext(ctx)
        if t, ok := md["timestamp"]; ok {
            log.Printf("timestamp from metadata: ")
            for i, e := range t {
                log.Printf(" %d. %s", i, e)
            }
        }
    
        first := in.First
        second := in.Second
        for second != 0 {
            first, second = second, first%second
        }
        // sending metadata to client: create and send header
        header := metadata.New(map[string]string{"timestamp": time.Now().Format(timestampFormat)})
        grpc.SendHeader(ctx, header)
        return &pb.GCDResponse{Result: first}, nil
    }
    

    在server端,我们使用了metadata.FromIncomingContext来获取client发送的metadata。

    server发送metadata包括两部分,header和trailer,其中grpc.SendHeader需要在server发送结果之前发送,而grpc.SetTrailer需要在发送结果之后发送,所以我们使用了defer来发送trailer。

    下面是关于Streaming调用的PrimeFactorization

    func (sms *SimpleMathServer) PrimeFactorization(stream pb.SimpleMath_PrimeFactorizationServer) error {
        // sending metadata to client: create trailer, using defer to record timestamp of function return
        defer func() {
            trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
            stream.SetTrailer(trailer)
        }()
    
        // receiving metadata from client: read metadata from client
        md, _ := metadata.FromIncomingContext(stream.Context())
        if t, ok := md["timestamp"]; ok {
            log.Printf("timestamp from metadata:")
            for i, e := range t {
                log.Printf(" %d. %s", i, e)
            }
        }
        // sending metadata to client: create and send header
        header := metadata.New(map[string]string{"timestamp": time.Now().Format(timestampFormat)})
        stream.SendHeader(header)
        for {
            in, err := stream.Recv()
            if err == io.EOF {
                return nil
            }
            if err != nil {
                log.Fatalf("failed to recv: %v", err)
                return err
            }
            stream.Send(&pb.PrimeFactorizationResponse{Result: primeFactorization(int(in.Number))})
        }
        return nil
    }
    

    这里其实和普通的RPC调用是一样的,所需要注意的问题也是一样的。

    5. Let them talk

    编写完代码,我们可以编译运行了,首先运行server,然后运行client:

    $ ./client gcd 12 15
    

    client端结果如下:

    2018/10/11 09:42:27 timestamp from header:
    2018/10/11 09:42:27  0. Oct 11 09:42:27.360323334
    2018/10/11 09:42:27 timestamp from trailer:
    2018/10/11 09:42:27  0. Oct 11 09:42:27.360337272
    2018/10/11 09:42:27 The Greatest Common Divisor of 12 and 15 is 3
    

    server端结果如下:

    2018/10/11 09:42:27 timestamp from metadata:
    2018/10/11 09:42:27  0. Oct 11 09:42:27.349315019
    

    然后我们尝试一下Streaming调用:

    $ ./client prime 3
    

    client端结果如下:

    2018/10/11 09:43:56 timestamp from header:
    2018/10/11 09:43:56  0. Oct 11 09:43:56.237487612
    2018/10/11 09:43:56 send number: 626
    2018/10/11 09:43:56 send number: 735
    2018/10/11 09:43:56 send number: 191
    2018/10/11 09:43:56 626 = 2 * 313
    2018/10/11 09:43:56 735 = 3 * 5 * 7 * 7
    2018/10/11 09:43:56 191 = 1 * 191
    2018/10/11 09:43:56 timestamp from trailer:
    2018/10/11 09:43:56  0. Oct 11 09:43:56.238427008
    

    server端结果如下:

    2018/10/11 09:43:56 timestamp from metadata:
    2018/10/11 09:43:56  0. Oct 11 09:43:56.231409679
    

    以上就是关于metadata的介绍。

    To Be Continued ~

    6. 系列文章

    相关文章

      网友评论

          本文标题:Dive into gRPC(6):metadata

          本文链接:https://www.haomeiwen.com/subject/bpivaftx.html