美文网首页
GRPC源码实例解析(二)——UnaryRPC Client 篇

GRPC源码实例解析(二)——UnaryRPC Client 篇

作者: 王司技术谈 | 来源:发表于2021-05-28 14:04 被阅读0次

    上篇主要介绍了server端的流程,这篇的关注点是Client端的流程,同样只列出核心主流程代码。

    // 调用接口
    func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
        out := new(HelloReply)
        err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
        if err != nil {
            return nil, err
        }
        return out, nil
    }
    
    // 执行接口,通过ClientStream 利用用transport层http2协议发送和接收消息被完成序列化和反序列化
    func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
        cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
        if err != nil {
            return err
        }
        if err := cs.SendMsg(req); err != nil {
            return err
        }
        return cs.RecvMsg(reply)
    }
    
    // 构建核心clientStream对象
    func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error){
        var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
            return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
        }
        return newStream(ctx, func() {})
    }
    
    func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
        callHdr := &transport.CallHdr{
            Host:           cc.authority,
            Method:         method,
            ContentSubtype: c.contentSubtype,
            DoneFunc:       doneFunc,
        }
    
        cs := &clientStream{
            callHdr:      callHdr,
            ...
            ...
    
        }
        // 构建attempt,attempt是在transport层用stream在clientStream中完成实际的sendMsg,并实现retry机制
        cs.newAttemptLocked(sh, trInfo)
        op := func(a *csAttempt) error { return a.newStream() }
        if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
            cs.finish(err)
            return nil, err
        }
    }
    
    func (cs *clientStream) SendMsg(m interface{}) (err error) {
        // 预处理数据, 将数据encode并压缩得到hdr、data为构建http 2 frame做准备
        hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
        // 利用Retry机制,通过csAttempt实际执行向server端发送消息
        op := func(a *csAttempt) error {
            err := a.sendMsg(m, hdr, payload, data)
            m, data = nil, nil
            return err
        }
        err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
    }
    
    // 通过csAttempt将构建好的http2 dataFrame写入stream中,等于通过http2方式向服务器发数据。
    func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
        a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams})
    }
    
    // 将构建好的http2 dataFrame 写入流中
    func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
    
        df := &dataFrame{
            streamID:  s.id,
            endStream: opts.Last,
            h:         hdr,
            d:         data,
        }
      return t.controlBuf.put(df)
    }
    
    // 利用retry机制,调用csAttempt recvMsg
    func (cs *clientStream) RecvMsg(m interface{}) error {
        err := cs.withRetry(func(a *csAttempt) error {
            return a.recvMsg(m, recvInfo)
        }, cs.commitAttemptLocked)
    }
    
    func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
          err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
    }
    
    // 实现数据解压,利用预定义Codec Unmarshal数据得到reply,在本例中就是HelloReply结构体
    func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
        d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
        c.Unmarshal(d, m);
    }
    ``
    
    GRPC源码实例解析(一)
    https://www.jianshu.com/p/8bbc6dc36859

    相关文章

      网友评论

          本文标题:GRPC源码实例解析(二)——UnaryRPC Client 篇

          本文链接:https://www.haomeiwen.com/subject/cffssltx.html