美文网首页
grpc-go源码阅读(2) grpc dial 建立连接过程

grpc-go源码阅读(2) grpc dial 建立连接过程

作者: one_zheng | 来源:发表于2018-06-12 15:06 被阅读986次

参考:https://github.com/liangzhiyang/annotate-grpc-go

正常启动一个grpcClient连接如下:

func main() {
    // Set up a connection to the server.
    conn, err := grpc.Dial(address, grpc.WithInsecure())
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()
    c := pb.NewGreeterClient(conn)

    // Contact the server and print out its response.
    name := defaultName
    if len(os.Args) > 1 {
        name = os.Args[1]
    }
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
    if err != nil {
        log.Fatalf("could not greet: %v", err)
    }
    log.Printf("Greeting: %s", r.Message)
}

1.grpc.Dial

// Dial creates a client connection to the given target.

// targe->server地址  opts->DialOption

func Dial(target string, opts ...DialOption) (*ClientConn, error) {

return DialContext(context.Background(), target, opts...)

}

DialOption参数,grpcClient连接时传入

// dialOptions configure a Dial call. dialOptions are set by the DialOption

// values passed to Dial.

type dialOptions struct {

unaryInt  UnaryClientInterceptor  // UnaryClientInterceptor intercepts the execution of a unary RPC on the client. 

streamInt StreamClientInterceptor // StreamClientInterceptor intercepts the creation of ClientStream

codec    Codec  //编码方式,默认是protoCodec

cp        Compressor // Compressor defines the interface gRPC uses to compress a message.

dc        Decompressor // Decompressor defines the interface gRPC uses to decompress a message.

bs        backoffStrategy //backoff重试策略

balancer  Balancer //负载均衡,自带的RoundRobin就会返回一个轮训策略的对象

block    bool  // 设置为true则如果集群有多个address,grpc连接是阻塞的,一直等到所有连接成功

insecure  bool         //是否需要安全验证

timeout  time.Duration // 超时时间
copts    transport.ConnectOptions // // ConnectOptions covers all relevant options for communicating with the server.

scChan    <-chan  ServiceConfig //通过WithServiceConfig设置,可以异步设置 提供配置负债均衡的方式及service里的methods,如下:

type ServiceConfig struct {
    // LB is the load balancer the service providers recommends. The balancer specified
    // via grpc.WithBalancer will override this.
    LB Balancer
    // Methods contains a map for the methods in this service.
    Methods map[string]MethodConfig
}
}

2.DialContext

// DialContext creates a client connection to the given target. ctx can be used to
// cancel or expire the pending connecting. Once this function returns, the
// cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
// to terminate all the pending operations after this function returns.
// This is the EXPERIMENTAL API.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
    cc := &ClientConn{
        target: target,
        conns:  make(map[Address]*addrConn),
    }
    //这里设置取消的context,可以调用cc.cancel 主动中断dial
    cc.ctx, cc.cancel = context.WithCancel(context.Background())
    for _, opt := range opts {
        opt(&cc.dopts)//初始化所有参数
    }
    if cc.dopts.timeout > 0 {//这个值可以通过 WithTimeout 设置;
        var cancel context.CancelFunc
        //这个方法里面会 在timeout 时间之后,close chan,这样Done() 就可以读了
        ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) // 设置定时的context
        defer cancel() //这一步是用来及时释放资源,里面调用了ctx.cancel,关闭了chan(Done()会返回的那个),
    }

    defer func() {
        select {
        case <-ctx.Done()://收到这个表示 前面WithTimeout设置了超时 并且ctx过期了
            conn, err = nil, ctx.Err()
        default:
        }

        if err != nil {
            cc.Close()
        }
    }()
    //通过WithServiceConfig 设置, 可以异步的 设置service config
    if cc.dopts.scChan != nil {
        // Wait for the initial service config.
        select {
        case sc, ok := <-cc.dopts.scChan:
            if ok {
                cc.sc = sc
            }
        case <-ctx.Done(): //同上
            return nil, ctx.Err()
        }
    }
    // Set defaults.
    if cc.dopts.codec == nil {
        cc.dopts.codec = protoCodec{}//默认编码
    }
    if cc.dopts.bs == nil {
        cc.dopts.bs = DefaultBackoffConfig //默认的backoff重试策略,
    }
    creds := cc.dopts.copts.TransportCredentials
    if creds != nil && creds.Info().ServerName != "" {
        cc.authority = creds.Info().ServerName
    } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
        cc.authority = cc.dopts.copts.Authority
    } else {
        colonPos := strings.LastIndex(target, ":")
        if colonPos == -1 {
            colonPos = len(target)
        }
        cc.authority = target[:colonPos]
    }
    var ok bool
    waitC := make(chan error, 1)
    //单独goroutine执行,里面有错误会写到waitC 里, 用来获取集群服务的所有地址,并建立连接
    //虽然go出去了,但是还是要等待这个goroutine执行结束,是阻塞的
    go func() {
        var addrs []Address
        //负载均衡的配置
        //cc.dopts.balancer和cc.sc.LB都是Balancer接口,分别通过WithBalancer 和 通过WithServiceConfig 设置,前者会覆盖后者
        if cc.dopts.balancer == nil && cc.sc.LB != nil {
            cc.dopts.balancer = cc.sc.LB
        }
        if cc.dopts.balancer == nil {
            // Connect to target directly if balancer is nil.
            //如果没有设置balancer,地址列表里面就只有target这一个地址
            addrs = append(addrs, Address{Addr: target})
        } else {
            var credsClone credentials.TransportCredentials
            if creds != nil {
                credsClone = creds.Clone()
            }
            config := BalancerConfig{
                DialCreds: credsClone,
            }
            //balancer(etcd等)的初始化
            if err := cc.dopts.balancer.Start(target, config); err != nil {
                waitC <- err
                return
            }
            //这里会 返回一个chan,元素是每一次地址更新后的所有地址(是全量,不是增量)
            ch := cc.dopts.balancer.Notify()
            if ch == nil {
                // There is no name resolver installed.
                addrs = append(addrs, Address{Addr: target})
            } else {
                addrs, ok = <-ch//ok 表示chan是否关闭,如果关闭了就不需要lbWatcher了(监控地址改动)
                if !ok || len(addrs) == 0 {
                    waitC <- errNoAddr //没有从balance找到有效的地址
                    return
                }
            }
        }
        //对于每一个地址 建立连接;
        // 如果调用了WithBlock,则这一步是阻塞的,一直等到所有连接成功(注:建议不要这样,除非你知道你在干什么)
        // 否则里面是通过goroutine异步处理的,不会等待所有的连接成功
        for _, a := range addrs {
            if err := cc.resetAddrConn(a, false, nil); err != nil {
                waitC <- err
                return
            }
        }
        close(waitC) //关闭waitC,这样会读取到err=nil
    }()
    select {
    case <-ctx.Done()://同上
        return nil, ctx.Err()
    case err := <-waitC://这一步会等待上一个goroutine结束
        if err != nil {
            return nil, err
        }
    }

    // If balancer is nil or balancer.Notify() is nil, ok will be false here.
    // The lbWatcher goroutine will not be created.
    if ok {//只有采用balancer(etcd等)的才会走到这一步,监控服务集群地址的变化
        go cc.lbWatcher()
    }

    if cc.dopts.scChan != nil {
        go cc.scWatcher()//监控ServiceConfig的变化,这样可以在dial之后动态的修改client的访问服务的配置ServiceConfig
    }
    return cc, nil
}

3.resetAddrConn

// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
// If tearDownErr is nil, errConnDrain will be used instead.
func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error {
    ac := &addrConn{
        cc:    cc,
        addr:  addr,
        dopts: cc.dopts,
    }
    ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
    ac.stateCV = sync.NewCond(&ac.mu)
    if EnableTracing {
        ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
    }
    if !ac.dopts.insecure {
        if ac.dopts.copts.TransportCredentials == nil {
            return errNoTransportSecurity
        }
    } else {
        if ac.dopts.copts.TransportCredentials != nil {
            return errCredentialsConflict
        }
        for _, cd := range ac.dopts.copts.PerRPCCredentials {
            if cd.RequireTransportSecurity() {
                return errTransportCredentialsMissing
            }
        }
    }
    // Track ac in cc. This needs to be done before any getTransport(...) is called.
    cc.mu.Lock()
    if cc.conns == nil {
        cc.mu.Unlock()
        return ErrClientConnClosing
    }
    stale := cc.conns[ac.addr] //获取旧的addrConn,可能没有,就是nil
    cc.conns[ac.addr] = ac //用新的addrConn 替换
    cc.mu.Unlock()
    if stale != nil {
        //已经存在一个旧的addrConn,需要关闭,有两种可能
        //1. balancer(etcd等) 存在bug,返回了重复的地址~~O(∩_∩)O哈哈~完美甩锅
        //2. 旧的ac收到http2 的goaway(表示服务端不接受新的请求了,但是已有的请求要继续处理完),这里又新建一个,?~在transportMonitor里
        // There is an addrConn alive on ac.addr already. This could be due to
        // 1) a buggy Balancer notifies duplicated Addresses;
        // 2) goaway was received, a new ac will replace the old ac.
        //    The old ac should be deleted from cc.conns, but the
        //    underlying transport should drain rather than close.
        if tearDownErr == nil {
            // tearDownErr is nil if resetAddrConn is called by
            // 1) Dial
            // 2) lbWatcher
            // In both cases, the stale ac should drain, not close.
            stale.tearDown(errConnDrain)
//errConnDrain不会马上close transport,则是会先stop accepting new RPCs and wait the completion of the pending RPCs
        } else {
            stale.tearDown(tearDownErr)
        }
    }
    //通过WithBlock设置为true后,这里会阻塞,直到所有连接成功;
    // skipWait may overwrite the decision in ac.dopts.block.
    if ac.dopts.block && !skipWait {
        if err := ac.resetTransport(false); err != nil {
            if err != errConnClosing { //如果有错 且不是errConnClosing(表示已经关闭)
                // Tear down ac and delete it from cc.conns.
                cc.mu.Lock()
                delete(cc.conns, ac.addr) //从cc.conns 删除,不在维护
                cc.mu.Unlock()
                ac.tearDown(err)// 关闭
            }
            if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
                return e.Origin()
            }
            return err
        }
        // Start to monitor the error status of transport.
        go ac.transportMonitor()
    } else {//这里不会阻塞,异步的建立连接
        // Start a goroutine connecting to the server asynchronously.
        go func() {
            if err := ac.resetTransport(false); err != nil {
                grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
                if err != errConnClosing {
                    // Keep this ac in cc.conns, to get the reason it's torn down.
                    ac.tearDown(err) //关闭,(和上面相比)但是不从cc.conns 删除,为了方便得到tearDownErr reason
                }
                return
            }
            ac.transportMonitor()
        }()
    }
    return nil
}

tearDown方法(处理返回的address已存在的情况)

// tearDown starts to tear down the addrConn.
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
// some edge cases (e.g., the caller opens and closes many addrConn's in a
// tight loop.
// tearDown doesn't remove ac from ac.cc.conns.
func (ac *addrConn) tearDown(err error) {
    ac.cancel() //执行这个之后ac.ctx.Done() 会收到消息

    ac.mu.Lock()
    defer ac.mu.Unlock()
    if ac.down != nil {//通知balancer 这个地址 down了
        ac.down(downErrorf(false, false, "%v", err)) //  // the handler called when a connection is down.
        ac.down = nil
    }
    if err == errConnDrain && ac.transport != nil {
        // GracefulClose(...) may be executed multiple times when
        // i) receiving multiple GoAway frames from the server; or
        // ii) there are concurrent name resolver/Balancer triggered
        // address removal and GoAway.
        ac.transport.GracefulClose()
    }
    if ac.state == Shutdown {
        return
    }
    ac.state = Shutdown
    ac.tearDownErr = err
    ac.stateCV.Broadcast()
    if ac.events != nil {
        ac.events.Finish()
        ac.events = nil
    }
    if ac.ready != nil {
        close(ac.ready)
        ac.ready = nil
    }
    if ac.transport != nil && err != errConnDrain {
        ac.transport.Close()
    }
    return
}


4.transportMonitor

// Run in a goroutine to track the error in transport and create the
// new transport if an error happens. It returns when the channel is closing.
func (ac *addrConn) transportMonitor() {
    for {
        ac.mu.Lock()
        t := ac.transport
        ac.mu.Unlock()
        select {
        // This is needed to detect the teardown when
        // the addrConn is idle (i.e., no RPC in flight).
        case <-ac.ctx.Done(): //这一步 表示ac.teardown 了,不需要维护了,return掉
            select {
            case <-t.Error():
                t.Close()
            default:
            }
            return
        case <-t.GoAway():// 这一步,会resetAddrConn,里面会新建一个transportMonitor,这里就不需要维护了,retrrn掉
            // If GoAway happens without any network I/O error, ac is closed without shutting down the
            // underlying transport (the transport will be closed when all the pending RPCs finished or
            // failed.).
            // If GoAway and some network I/O error happen concurrently, ac and its underlying transport
            // are closed.
            // In both cases, a new ac is created.
            select {
            case <-t.Error():
                ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
            default:
                ac.cc.resetAddrConn(ac.addr, true, errConnDrain)
            }
            return
        case <-t.Error()://这里是transport 有错,
            select {
            case <-ac.ctx.Done()://这一步 表示ac.teardown(比如ctx 被cancel的情况)不需要维护了,return掉
                t.Close()
                return
            case <-t.GoAway()://同上
                ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
                return
            default: //如果有错,走到这里~~没有return, 下面会重连
            }
            ac.mu.Lock()
            if ac.state == Shutdown { //不是通过ac.teardown的(这种情况会走到上面),但是还Shutdown的了,什么情况呢???
                // ac has been shutdown.
                ac.mu.Unlock()
                return
            }
            ac.state = TransientFailure //置为临时失败,暂时不让用
            ac.stateCV.Broadcast()
            ac.mu.Unlock()
            if err := ac.resetTransport(true); err != nil {
                ac.mu.Lock()
                ac.printf("transport exiting: %v", err)
                ac.mu.Unlock()
                grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err)
                if err != errConnClosing {
                    // Keep this ac in cc.conns, to get the reason it's torn down.
                    ac.tearDown(err)
                }
                return
            }
        }
    }
}

5.resetTransport

func (ac *addrConn) resetTransport(closeTransport bool) error {
    for retries := 0; ; retries++ { //一直循重试建立连接直到成功,除非某些条件下返回
        ac.mu.Lock()
        ac.printf("connecting")
        if ac.state == Shutdown {
            //Shutdown状态表示这个连接已经关闭,不需要维护了,通常服务先挂了,然后balancer(etcd等)中这个地址又被移除了的情况会走到这,直接返回
            // ac.tearDown(...) has been invoked.
            ac.mu.Unlock()
            return errConnClosing
        }
        if ac.down != nil {
            //一般通过存balancer(etcd等)的UP 方法返回的,
            // 如果存在,这里就通知balancer 这个地址连接不ok了,不要用了,其实是因为接下里要reset,O(∩_∩)O哈哈~
            ac.down(downErrorf(false, true, "%v", errNetworkIO))
            ac.down = nil
        }
        ac.state = Connecting  //状态改为正在连接中~~
        ac.stateCV.Broadcast() //lzy todo
        t := ac.transport
        ac.mu.Unlock()
        if closeTransport && t != nil { //旧的要关闭
            t.Close()
        }
        //这里是根据重试的超时策略,返回两次重试的间隔时间;即如果这次重连还是失败,会等待sleepTime才会进入下一次循环
        //retries越大,sleepTime越大
        sleepTime := ac.dopts.bs.backoff(retries)
        timeout := minConnectTimeout
        if timeout < sleepTime {
            timeout = sleepTime
        }
        //设置超时时间,最小20s
        //注:为啥下面err==nil的时候cancel没有执行(官方推荐的是立马defer cancel()) bug?????
        //已经提了个issue https://github.com/grpc/grpc-go/issues/1099
        ctx, cancel := context.WithTimeout(ac.ctx, timeout)
        connectTime := time.Now()
        sinfo := transport.TargetInfo{
            Addr:     ac.addr.Addr,
            Metadata: ac.addr.Metadata,
        }
        newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
        if err != nil {
            cancel()
            //如果不是临时错误,立马返回;否则接下里会重试
            if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
                return err
            }
            grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
            ac.mu.Lock()
            if ac.state == Shutdown { //同上,即这个地址不需要了
                // ac.tearDown(...) has been invoked.
                ac.mu.Unlock()
                return errConnClosing
            }
            ac.errorf("transient failure: %v", err)
            ac.state = TransientFailure //状态改为短暂的失败
            ac.stateCV.Broadcast()
            if ac.ready != nil { //只有进入ac.wait()才会走入这个逻辑,表示有一个请求正在等待这个地址的连接是成功还是失败
                close(ac.ready) //建立连接失败了 关闭ready
                ac.ready = nil
            }
            ac.mu.Unlock()
            closeTransport = false
            select {
            case <-time.After(sleepTime - time.Since(connectTime)): //一直等待足够sleepTime长时间,再进入下一次循环
            case <-ac.ctx.Done(): //这个连接是被cancel掉(超时或者主动cancel)
                return ac.ctx.Err()
            }
            continue
        }
        ac.mu.Lock()
        ac.printf("ready")
        if ac.state == Shutdown { //同上,所以这个时候要把已经建立的连接close,手动从etcd中删除这个地址会走到这
            // ac.tearDown(...) has been invoked.
            ac.mu.Unlock()
            newTransport.Close()
            return errConnClosing
        }
        ac.state = Ready //状态ok了
        ac.stateCV.Broadcast()
        ac.transport = newTransport
        if ac.ready != nil { //只有进入ac.wait()才会走入这个逻辑,表示有一个请求正在等待这个地址的连接是成功还是失败
            close(ac.ready) //建立连接成功了关闭ready
            ac.ready = nil
        }
        //如果存在balancer(etcd等)就通知balancer 这个地址连接ok了,可以用了
        if ac.cc.dopts.balancer != nil {
            ac.down = ac.cc.dopts.balancer.Up(ac.addr)
        }
        ac.mu.Unlock()
        return nil
    }
}

以下基于默认配置情况下(还有其它没有提到的配置都取默认值):

设置了balancer(etcd等)
没有设置WithBlock,即dialOptions.block = false
没有设置FailOnNonTempDialError,即dialOptions.copts.FailOnNonTempDialError = false
grpc.Dial 正常的执行流程,第一次进入的时候的有些逻辑是走不到或者不太重要的都舍去不表

A. grpc.Dial() 返回一个*ClientConn

从balancer(etcd等)返回一批地址,但是这批地址暂时还是不能用的,需要等待A225

A2,对于每一个地址依次建立连接,循环调用cc.resetAddrConn, 即
A3,单独goroutine监控balancer(etcd等)的变化(cc.lbWatcher()),实时更新服务集群地址,即
单独goroutine监控监控ServiceConfig的变化(cc.scWatcher),可以在服务启动后动态更新调用服务的配置
A2,cc.resetAddrConn针对一个地址建立连接,创建一个addrConn加入到ClientConn.conns中去,主流程分为:

如果这个地址已经存在连接了,先关闭掉ac.teardown
A22,ac.resetTransport,建立一个底层连接(http2),这一步默认是goroutine出去的,不会阻塞,除非调用WithBlock;
A23,单独goroutine监控底层连接的状态变化(ac.transportMonitor),进行重连或者放弃

A22, ac.resetTransport里面是一个大循环,重试建立连接直到成功,除非某些条件下返回(默认情况下只有被ac.teardown了,即在balance中删除),每个循环里面:

如果ac.state == Shutdown ,直接返回
将状态改为正在连接中~~ ac.state == Connecting

计算sleepTime,这里是根据重试的超时策略,返回两次重试的间隔时间;即如果这次重连还是失败,会等待sleepTime才会进入下一次循环
A224,建立一个底层的http2连接(transport.NewClientTransport);如果是临时错误, 将状态改为短暂的失败,ac.state = TransientFailure,等待sleepTime;如果是非临时错误,直接返回,默认情况下可以认为都是临时错误;
将状态改为readyac.state = Ready ,通知balancer(etcd等)这个地址连接ok了(up);这样下次就能从balancer 中读取到这个地址了
A224, 建立一个底层的http2连接(newHTTP2Client)

dial一个tcp连接,失败的话,默认返回一个临时错误
单独goroutine,循环的读取所有的帧,并且分发到相应的流中去,如果有错误了,会有通知到A233
初始化http2 相关的操作(发送setting帧等)
A23,transportMonitor 是一个单独的goroutine,里面是一个循环,会监控这个连接以下几种情况:

如果这个连接被ac.teardown了,直接退出,不需要维护了
如果收到http2的goaway帧,再重新cc.resetAddrConn,即A2,然后当前直接退出;相当于用一个新的连接来替换
如果这个连接出错了,置为临时失败ac.state = TransientFailure ,暂时不让用,然后重试连接ac.resetTransport,即A22
A3,cc.lbWatcher监控balancer(etcd等)的变化 , 实时更新集群服务地址

balancer.Notify() 是一个channel,每当有更新的时候,从这里读取到所有的地址(全量而非增量)
判断有哪些地址是新增的,哪些地址是删除掉的

对于新增的地址执行cc.resetAddrConn,即A2

对于删掉的地址直接ac.tearDown,通知balancer(etcd等)这个地址down了, 这样可能会影响到A231,A221

对于ac.tearDown,里面会关闭底层的连接,修改状态为ac.state == Shutdown,然后通知balancer(etcd等)这个地址down了,在下次轮询的时候,就不会有这个地址了;

如果这个balancer(etcd等)收到这个地址的UP的通知,表示这个地址又OK了

相关文章

网友评论

      本文标题:grpc-go源码阅读(2) grpc dial 建立连接过程

      本文链接:https://www.haomeiwen.com/subject/bmlleftx.html