// Invoke sends the RPC request on the wire and returns after response is received.
// Invoke is called by generated code. Also users can call Invoke directly when it
// is really needed in their use cases.
func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) error {
//如果在dial的时候WithUnaryInterceptor 则会先调用拦截器(可以做一些传递数据,日志等等事情)
if cc.dopts.unaryInt != nil {
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
return invoke(ctx, method, args, reply, cc, opts...)
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {
c := defaultCallInfo //默认的callinfo配置
if mc, ok := cc.getMethodConfig(method); ok {//获取methodConfig的配置
c.failFast = !mc.WaitForReady
if mc.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, mc.Timeout)
defer cancel()
for _, o := range opts {
if err := o.before(&c); err != nil { //调用之前的处理
return toRPCErr(err)
defer func() {
for _, o := range opts { //调用之后的处理
if EnableTracing {
c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
defer c.traceInfo.tr.Finish()
c.traceInfo.firstLine.client = true
if deadline, ok := ctx.Deadline(); ok {
c.traceInfo.firstLine.deadline = deadline.Sub(time.Now())
c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false)
// TODO(dsymonds): Arrange for c.traceInfo.firstLine.remoteAddr to be set.
defer func() {
if e != nil {
c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{e}}, true)
sh := cc.dopts.copts.StatsHandler//通过WithStatsHandler来设置的, 对调用前后的状态进行掌控
if sh != nil {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})//附加一些信息到ctx
begin := &stats.Begin{
Client: true,
BeginTime: time.Now(),
FailFast: c.failFast,
sh.HandleRPC(ctx, begin)
defer func() {
if sh != nil {
end := &stats.End{
Client: true,
EndTime: time.Now(),
Error: e,
sh.HandleRPC(ctx, end)
topts := &transport.Options{
Last: true,
Delay: false,
for {//进入循环
var (
err error
t transport.ClientTransport
stream *transport.Stream
// Record the put handler from Balancer.Get(...). It is called once the
// RPC has completed or failed.
put func()
// TODO(zhaoq): Need a formal spec of fail-fast.
callHdr := &transport.CallHdr{
Host: cc.authority,
Method: method,
if cc.dopts.cp != nil {//发送的时候的数据压缩算法
callHdr.SendCompress = cc.dopts.cp.Type()
gopts := BalancerGetOptions{
BlockingWait: !c.failFast,// 默认failFast=true, 所以Balancer Get不到有效的地址的时候不会block
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
if _, ok := err.(*rpcError); ok {
return err
if err == errConnClosing || err == errConnUnavailable {
if c.failFast {//默认是true
return Errorf(codes.Unavailable, "%v", err)
// All the other errors are treated as Internal errors.
return Errorf(codes.Internal, "%v", err)
if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
stream, err = sendRequest(ctx, cc.dopts, cc.dopts.cp, callHdr, t, args, topts)
if err != nil {
if put != nil {
put = nil
// Retry a non-failfast RPC when
// i) there is a connection error; or
// ii) the server started to drain before this RPC was initiated.
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {//默认是true
return toRPCErr(err)
return toRPCErr(err)
err = recvResponse(ctx, cc.dopts, t, &c, stream, reply)
if err != nil {
if put != nil {
put = nil
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {
return toRPCErr(err)
return toRPCErr(err)
if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true)
t.CloseStream(stream, nil)
if put != nil {
put = nil
return Errorf(stream.StatusCode(), "%s", stream.StatusDesc())
// callInfo contains all related configuration and information about an RPC.
type callInfo struct {
failFast bool
headerMD metadata.MD
trailerMD metadata.MD
peer *peer.Peer
traceInfo traceInfo // in trace.go
var defaultCallInfo = callInfo{failFast: true}
2.(cc *ClientConn) getTransport
func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
var (
ac *addrConn
ok bool
put func()
if cc.dopts.balancer == nil { //如果没有设置balancer,只会有一个地址,直接返回
// If balancer is nil, there should be only one addrConn available.
if cc.conns == nil {
return nil, nil, toRPCErr(ErrClientConnClosing)
for _, ac = range cc.conns {
// Break after the first iteration to get the first addrConn.
ok = true
} else { //如果有设置balancer(etcd等),根据策略选一个(默认是轮询)
var (
addr Address
err error
addr, put, err = cc.dopts.balancer.Get(ctx, opts) //(rr *roundRobin) Get
if err != nil {
return nil, nil, toRPCErr(err)
if cc.conns == nil {
return nil, nil, toRPCErr(ErrClientConnClosing)
ac, ok = cc.conns[addr]
if !ok { //如果这个地址不在cc.conns里,说明这个地址已经被删了(比如etcd中掉了)
if put != nil {
return nil, nil, errConnClosing
t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
if err != nil {
if put != nil {
return nil, nil, err
return t, put, nil
3.(ac *addrConn) wait
//默认情况下 ac.state=Shutdown,ready,TransientFailure,直接返回
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
// iv) transport is in TransientFailure and there is a balancer/failfast is true.
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
for {
switch {
case ac.state == Shutdown:
if failfast || !hasBalancer {
// RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
err := ac.tearDownErr
return nil, err
return nil, errConnClosing
case ac.state == Ready:
ct := ac.transport
return ct, nil
case ac.state == TransientFailure:
if failfast || hasBalancer {
return nil, errConnUnavailable
ready := ac.ready
if ready == nil {
ready = make(chan struct{})
ac.ready = ready
select {
case <-ctx.Done(): //这个请求被cancel了(超时等等)
return nil, toRPCErr(ctx.Err())
// Wait until the new transport is ready or failed.
case <-ready: