1.grpc.invoke
// Invoke sends the RPC request on the wire and returns after response is received.
// Invoke is called by generated code. Also users can call Invoke directly when it
// is really needed in their use cases.
func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) error {
//如果在dial的时候WithUnaryInterceptor 则会先调用拦截器(可以做一些传递数据,日志等等事情)
if cc.dopts.unaryInt != nil {
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
}
return invoke(ctx, method, args, reply, cc, opts...)
}
invoke
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {
c := defaultCallInfo //默认的callinfo配置
if mc, ok := cc.getMethodConfig(method); ok {//获取methodConfig的配置
c.failFast = !mc.WaitForReady
if mc.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, mc.Timeout)
defer cancel()
}
}
for _, o := range opts {
if err := o.before(&c); err != nil { //调用之前的处理
return toRPCErr(err)
}
}
defer func() {
for _, o := range opts { //调用之后的处理
o.after(&c)
}
}()
if EnableTracing {
c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
defer c.traceInfo.tr.Finish()
c.traceInfo.firstLine.client = true
if deadline, ok := ctx.Deadline(); ok {
c.traceInfo.firstLine.deadline = deadline.Sub(time.Now())
}
c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false)
// TODO(dsymonds): Arrange for c.traceInfo.firstLine.remoteAddr to be set.
defer func() {
if e != nil {
c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{e}}, true)
c.traceInfo.tr.SetError()
}
}()
}
sh := cc.dopts.copts.StatsHandler//通过WithStatsHandler来设置的, 对调用前后的状态进行掌控
if sh != nil {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})//附加一些信息到ctx
begin := &stats.Begin{
Client: true,
BeginTime: time.Now(),
FailFast: c.failFast,
}
sh.HandleRPC(ctx, begin)
}
defer func() {
if sh != nil {
end := &stats.End{
Client: true,
EndTime: time.Now(),
Error: e,
}
sh.HandleRPC(ctx, end)
}
}()
topts := &transport.Options{
Last: true,
Delay: false,
}
for {//进入循环
var (
err error
t transport.ClientTransport
stream *transport.Stream
// Record the put handler from Balancer.Get(...). It is called once the
// RPC has completed or failed.
put func()
)
// TODO(zhaoq): Need a formal spec of fail-fast.
callHdr := &transport.CallHdr{
Host: cc.authority,
Method: method,
}
if cc.dopts.cp != nil {//发送的时候的数据压缩算法
callHdr.SendCompress = cc.dopts.cp.Type()
}
gopts := BalancerGetOptions{
BlockingWait: !c.failFast,// 默认failFast=true, 所以Balancer Get不到有效的地址的时候不会block
}
//获取一个连接
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
if _, ok := err.(*rpcError); ok {
return err
}
if err == errConnClosing || err == errConnUnavailable {
if c.failFast {//默认是true
return Errorf(codes.Unavailable, "%v", err)
}
continue
}
// All the other errors are treated as Internal errors.
return Errorf(codes.Internal, "%v", err)
}
if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
}
//发送请求
stream, err = sendRequest(ctx, cc.dopts, cc.dopts.cp, callHdr, t, args, topts)
if err != nil {
if put != nil {
put()
put = nil
}
// Retry a non-failfast RPC when
// i) there is a connection error; or
// ii) the server started to drain before this RPC was initiated.
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {//默认是true
return toRPCErr(err)
}
continue
}
return toRPCErr(err)
}
//接受响应数据,里面是阻塞等待的
err = recvResponse(ctx, cc.dopts, t, &c, stream, reply)
if err != nil {
if put != nil {
put()
put = nil
}
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {
return toRPCErr(err)
}
continue
}
return toRPCErr(err)
}
if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true)
}
t.CloseStream(stream, nil)
if put != nil {
put()
put = nil
}
return Errorf(stream.StatusCode(), "%s", stream.StatusDesc())
}
}
默认的defaultCallInfo
// callInfo contains all related configuration and information about an RPC.
type callInfo struct {
failFast bool
headerMD metadata.MD
trailerMD metadata.MD
peer *peer.Peer
traceInfo traceInfo // in trace.go
}
var defaultCallInfo = callInfo{failFast: true}
2.(cc *ClientConn) getTransport
func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
var (
ac *addrConn
ok bool
put func()
)
if cc.dopts.balancer == nil { //如果没有设置balancer,只会有一个地址,直接返回
// If balancer is nil, there should be only one addrConn available.
cc.mu.RLock()
if cc.conns == nil {
cc.mu.RUnlock()
return nil, nil, toRPCErr(ErrClientConnClosing)
}
for _, ac = range cc.conns {
// Break after the first iteration to get the first addrConn.
ok = true
break
}
cc.mu.RUnlock()
} else { //如果有设置balancer(etcd等),根据策略选一个(默认是轮询)
var (
addr Address
err error
)
//得到一个地址,如果!BlockingWait即failfast(默认),不保证这个地址一定是有效的;反之,则能保证
addr, put, err = cc.dopts.balancer.Get(ctx, opts) //(rr *roundRobin) Get
if err != nil {
return nil, nil, toRPCErr(err)
}
cc.mu.RLock()
if cc.conns == nil {
cc.mu.RUnlock()
return nil, nil, toRPCErr(ErrClientConnClosing)
}
ac, ok = cc.conns[addr]
cc.mu.RUnlock()
}
if !ok { //如果这个地址不在cc.conns里,说明这个地址已经被删了(比如etcd中掉了)
if put != nil {
put()
}
return nil, nil, errConnClosing
}
//等待一个连接,默认情况下,这里是保证连接ok,如果不ok会返回错误
t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
if err != nil {
if put != nil {
put()
}
return nil, nil, err
}
return t, put, nil
}
3.(ac *addrConn) wait
//等待failfast默认是true)
//默认情况下 ac.state=Shutdown,ready,TransientFailure,直接返回
//否则,ac.state=Connecting等,则等待ready(成功或者失败),直到ctx超时
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
// iv) transport is in TransientFailure and there is a balancer/failfast is true.
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
for {
ac.mu.Lock()
switch {
case ac.state == Shutdown:
if failfast || !hasBalancer {
// RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
err := ac.tearDownErr
ac.mu.Unlock()
return nil, err
}
ac.mu.Unlock()
return nil, errConnClosing
case ac.state == Ready:
ct := ac.transport
ac.mu.Unlock()
return ct, nil
case ac.state == TransientFailure:
if failfast || hasBalancer {
ac.mu.Unlock()
return nil, errConnUnavailable
}
}
//ac.state=Connecting,默认情况下,走到这里,表示正在连接~~,所以等一等~~
ready := ac.ready
if ready == nil {
ready = make(chan struct{})
ac.ready = ready
}
ac.mu.Unlock()
select {
case <-ctx.Done(): //这个请求被cancel了(超时等等)
return nil, toRPCErr(ctx.Err())
// Wait until the new transport is ready or failed.
//不管成功还是失败,都会close(ac.ready)
case <-ready:
}
}
}
网友评论