美文网首页
grpc-go源码阅读(3)grpc invoke一次请求的过程

grpc-go源码阅读(3)grpc invoke一次请求的过程

作者: one_zheng | 来源:发表于2018-06-12 15:40 被阅读45次

1.grpc.invoke

// Invoke sends the RPC request on the wire and returns after response is received.
// Invoke is called by generated code. Also users can call Invoke directly when it
// is really needed in their use cases.
func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) error {
    //如果在dial的时候WithUnaryInterceptor  则会先调用拦截器(可以做一些传递数据,日志等等事情)
    if cc.dopts.unaryInt != nil {
        return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
    }
    return invoke(ctx, method, args, reply, cc, opts...)
}

invoke

func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {
    c := defaultCallInfo //默认的callinfo配置
    if mc, ok := cc.getMethodConfig(method); ok {//获取methodConfig的配置
        c.failFast = !mc.WaitForReady
        if mc.Timeout > 0 {
            var cancel context.CancelFunc
            ctx, cancel = context.WithTimeout(ctx, mc.Timeout)
            defer cancel()
        }
    }
    for _, o := range opts {
        if err := o.before(&c); err != nil { //调用之前的处理
            return toRPCErr(err)
        }
    }
    defer func() {
        for _, o := range opts { //调用之后的处理
            o.after(&c)
        }
    }()
    if EnableTracing {
        c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
        defer c.traceInfo.tr.Finish()
        c.traceInfo.firstLine.client = true
        if deadline, ok := ctx.Deadline(); ok {
            c.traceInfo.firstLine.deadline = deadline.Sub(time.Now())
        }
        c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false)
        // TODO(dsymonds): Arrange for c.traceInfo.firstLine.remoteAddr to be set.
        defer func() {
            if e != nil {
                c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{e}}, true)
                c.traceInfo.tr.SetError()
            }
        }()
    }
    sh := cc.dopts.copts.StatsHandler//通过WithStatsHandler来设置的, 对调用前后的状态进行掌控
    if sh != nil {
        ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})//附加一些信息到ctx
        begin := &stats.Begin{
            Client:    true,
            BeginTime: time.Now(),
            FailFast:  c.failFast,
        }
        sh.HandleRPC(ctx, begin)
    }
    defer func() {
        if sh != nil {
            end := &stats.End{
                Client:  true,
                EndTime: time.Now(),
                Error:   e,
            }
            sh.HandleRPC(ctx, end)
        }
    }()
    topts := &transport.Options{
        Last:  true,
        Delay: false,
    }
    for {//进入循环
        var (
            err    error
            t      transport.ClientTransport
            stream *transport.Stream
            // Record the put handler from Balancer.Get(...). It is called once the
            // RPC has completed or failed.
            put func()
        )
        // TODO(zhaoq): Need a formal spec of fail-fast.
        callHdr := &transport.CallHdr{
            Host:   cc.authority,
            Method: method,
        }
        if cc.dopts.cp != nil {//发送的时候的数据压缩算法
            callHdr.SendCompress = cc.dopts.cp.Type()
        }

        gopts := BalancerGetOptions{
            BlockingWait: !c.failFast,// 默认failFast=true, 所以Balancer Get不到有效的地址的时候不会block
        }
        //获取一个连接
        t, put, err = cc.getTransport(ctx, gopts)
        if err != nil {
            // TODO(zhaoq): Probably revisit the error handling.
            if _, ok := err.(*rpcError); ok {
                return err
            }
            if err == errConnClosing || err == errConnUnavailable {
                if c.failFast {//默认是true
                    return Errorf(codes.Unavailable, "%v", err)
                }
                continue
            }
            // All the other errors are treated as Internal errors.
            return Errorf(codes.Internal, "%v", err)
        }
        if c.traceInfo.tr != nil {
            c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
        }
        //发送请求
        stream, err = sendRequest(ctx, cc.dopts, cc.dopts.cp, callHdr, t, args, topts)
        if err != nil {
            if put != nil {
                put()
                put = nil
            }
            // Retry a non-failfast RPC when
            // i) there is a connection error; or
            // ii) the server started to drain before this RPC was initiated.
            if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
                if c.failFast {//默认是true
                    return toRPCErr(err)
                }
                continue
            }
            return toRPCErr(err)
        }
        //接受响应数据,里面是阻塞等待的
        err = recvResponse(ctx, cc.dopts, t, &c, stream, reply)
        if err != nil {
            if put != nil {
                put()
                put = nil
            }
            if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
                if c.failFast {
                    return toRPCErr(err)
                }
                continue
            }
            return toRPCErr(err)
        }
        if c.traceInfo.tr != nil {
            c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true)
        }
        t.CloseStream(stream, nil)
        if put != nil {
            put()
            put = nil
        }
        return Errorf(stream.StatusCode(), "%s", stream.StatusDesc())
    }
}

默认的defaultCallInfo
// callInfo contains all related configuration and information about an RPC.
type callInfo struct {
    failFast  bool
    headerMD  metadata.MD
    trailerMD metadata.MD
    peer      *peer.Peer
    traceInfo traceInfo // in trace.go
}

var defaultCallInfo = callInfo{failFast: true}

2.(cc *ClientConn) getTransport

func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
    var (
        ac  *addrConn
        ok  bool
        put func()
    )
    if cc.dopts.balancer == nil { //如果没有设置balancer,只会有一个地址,直接返回
        // If balancer is nil, there should be only one addrConn available.
        cc.mu.RLock()
        if cc.conns == nil {
            cc.mu.RUnlock()
            return nil, nil, toRPCErr(ErrClientConnClosing)
        }
        for _, ac = range cc.conns {
            // Break after the first iteration to get the first addrConn.
            ok = true
            break
        }
        cc.mu.RUnlock()
    } else { //如果有设置balancer(etcd等),根据策略选一个(默认是轮询)
        var (
            addr Address
            err  error
        )
        //得到一个地址,如果!BlockingWait即failfast(默认),不保证这个地址一定是有效的;反之,则能保证
        addr, put, err = cc.dopts.balancer.Get(ctx, opts) //(rr *roundRobin) Get
        if err != nil {
            return nil, nil, toRPCErr(err)
        }
        cc.mu.RLock()
        if cc.conns == nil {
            cc.mu.RUnlock()
            return nil, nil, toRPCErr(ErrClientConnClosing)
        }
        ac, ok = cc.conns[addr]
        cc.mu.RUnlock()
    }
    if !ok { //如果这个地址不在cc.conns里,说明这个地址已经被删了(比如etcd中掉了)
        if put != nil {
            put()
        }
        return nil, nil, errConnClosing
    }
    //等待一个连接,默认情况下,这里是保证连接ok,如果不ok会返回错误
    t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
    if err != nil {
        if put != nil {
            put()
        }
        return nil, nil, err
    }
    return t, put, nil
}

3.(ac *addrConn) wait

//等待failfast默认是true)
//默认情况下 ac.state=Shutdown,ready,TransientFailure,直接返回
//否则,ac.state=Connecting等,则等待ready(成功或者失败),直到ctx超时
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
// iv) transport is in TransientFailure and there is a balancer/failfast is true.
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
    for {
        ac.mu.Lock()
        switch {
        case ac.state == Shutdown:
            if failfast || !hasBalancer {
                // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
                err := ac.tearDownErr
                ac.mu.Unlock()
                return nil, err
            }
            ac.mu.Unlock()
            return nil, errConnClosing
        case ac.state == Ready:
            ct := ac.transport
            ac.mu.Unlock()
            return ct, nil
        case ac.state == TransientFailure:
            if failfast || hasBalancer {
                ac.mu.Unlock()
                return nil, errConnUnavailable
            }
        }
        //ac.state=Connecting,默认情况下,走到这里,表示正在连接~~,所以等一等~~
        ready := ac.ready
        if ready == nil {
            ready = make(chan struct{})
            ac.ready = ready
        }
        ac.mu.Unlock()
        select {
        case <-ctx.Done(): //这个请求被cancel了(超时等等)
            return nil, toRPCErr(ctx.Err())
        // Wait until the new transport is ready or failed.
        //不管成功还是失败,都会close(ac.ready)
        case <-ready:
        }
    }
}

相关文章

网友评论

      本文标题:grpc-go源码阅读(3)grpc invoke一次请求的过程

      本文链接:https://www.haomeiwen.com/subject/zazleftx.html