美文网首页
grpc-go源码阅读(2) grpc dial 建立连接过程

grpc-go源码阅读(2) grpc dial 建立连接过程

作者: one_zheng | 来源:发表于2018-06-12 15:06 被阅读986次

    参考:https://github.com/liangzhiyang/annotate-grpc-go

    正常启动一个grpcClient连接如下:

    func main() {
        // Set up a connection to the server.
        conn, err := grpc.Dial(address, grpc.WithInsecure())
        if err != nil {
            log.Fatalf("did not connect: %v", err)
        }
        defer conn.Close()
        c := pb.NewGreeterClient(conn)
    
        // Contact the server and print out its response.
        name := defaultName
        if len(os.Args) > 1 {
            name = os.Args[1]
        }
        ctx, cancel := context.WithTimeout(context.Background(), time.Second)
        defer cancel()
        r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
        if err != nil {
            log.Fatalf("could not greet: %v", err)
        }
        log.Printf("Greeting: %s", r.Message)
    }
    

    1.grpc.Dial

    // Dial creates a client connection to the given target.
    
    // targe->server地址  opts->DialOption
    
    func Dial(target string, opts ...DialOption) (*ClientConn, error) {
    
    return DialContext(context.Background(), target, opts...)
    
    }
    

    DialOption参数,grpcClient连接时传入

    // dialOptions configure a Dial call. dialOptions are set by the DialOption
    
    // values passed to Dial.
    
    type dialOptions struct {
    
    unaryInt  UnaryClientInterceptor  // UnaryClientInterceptor intercepts the execution of a unary RPC on the client. 
    
    streamInt StreamClientInterceptor // StreamClientInterceptor intercepts the creation of ClientStream
    
    codec    Codec  //编码方式,默认是protoCodec
    
    cp        Compressor // Compressor defines the interface gRPC uses to compress a message.
    
    dc        Decompressor // Decompressor defines the interface gRPC uses to decompress a message.
    
    bs        backoffStrategy //backoff重试策略
    
    balancer  Balancer //负载均衡,自带的RoundRobin就会返回一个轮训策略的对象
    
    block    bool  // 设置为true则如果集群有多个address,grpc连接是阻塞的,一直等到所有连接成功
    
    insecure  bool         //是否需要安全验证
    
    timeout  time.Duration // 超时时间
    copts    transport.ConnectOptions // // ConnectOptions covers all relevant options for communicating with the server.
    
    scChan    <-chan  ServiceConfig //通过WithServiceConfig设置,可以异步设置 提供配置负债均衡的方式及service里的methods,如下:
    
    type ServiceConfig struct {
        // LB is the load balancer the service providers recommends. The balancer specified
        // via grpc.WithBalancer will override this.
        LB Balancer
        // Methods contains a map for the methods in this service.
        Methods map[string]MethodConfig
    }
    }
    

    2.DialContext

    // DialContext creates a client connection to the given target. ctx can be used to
    // cancel or expire the pending connecting. Once this function returns, the
    // cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
    // to terminate all the pending operations after this function returns.
    // This is the EXPERIMENTAL API.
    func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
        cc := &ClientConn{
            target: target,
            conns:  make(map[Address]*addrConn),
        }
        //这里设置取消的context,可以调用cc.cancel 主动中断dial
        cc.ctx, cc.cancel = context.WithCancel(context.Background())
        for _, opt := range opts {
            opt(&cc.dopts)//初始化所有参数
        }
        if cc.dopts.timeout > 0 {//这个值可以通过 WithTimeout 设置;
            var cancel context.CancelFunc
            //这个方法里面会 在timeout 时间之后,close chan,这样Done() 就可以读了
            ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) // 设置定时的context
            defer cancel() //这一步是用来及时释放资源,里面调用了ctx.cancel,关闭了chan(Done()会返回的那个),
        }
    
        defer func() {
            select {
            case <-ctx.Done()://收到这个表示 前面WithTimeout设置了超时 并且ctx过期了
                conn, err = nil, ctx.Err()
            default:
            }
    
            if err != nil {
                cc.Close()
            }
        }()
        //通过WithServiceConfig 设置, 可以异步的 设置service config
        if cc.dopts.scChan != nil {
            // Wait for the initial service config.
            select {
            case sc, ok := <-cc.dopts.scChan:
                if ok {
                    cc.sc = sc
                }
            case <-ctx.Done(): //同上
                return nil, ctx.Err()
            }
        }
        // Set defaults.
        if cc.dopts.codec == nil {
            cc.dopts.codec = protoCodec{}//默认编码
        }
        if cc.dopts.bs == nil {
            cc.dopts.bs = DefaultBackoffConfig //默认的backoff重试策略,
        }
        creds := cc.dopts.copts.TransportCredentials
        if creds != nil && creds.Info().ServerName != "" {
            cc.authority = creds.Info().ServerName
        } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
            cc.authority = cc.dopts.copts.Authority
        } else {
            colonPos := strings.LastIndex(target, ":")
            if colonPos == -1 {
                colonPos = len(target)
            }
            cc.authority = target[:colonPos]
        }
        var ok bool
        waitC := make(chan error, 1)
        //单独goroutine执行,里面有错误会写到waitC 里, 用来获取集群服务的所有地址,并建立连接
        //虽然go出去了,但是还是要等待这个goroutine执行结束,是阻塞的
        go func() {
            var addrs []Address
            //负载均衡的配置
            //cc.dopts.balancer和cc.sc.LB都是Balancer接口,分别通过WithBalancer 和 通过WithServiceConfig 设置,前者会覆盖后者
            if cc.dopts.balancer == nil && cc.sc.LB != nil {
                cc.dopts.balancer = cc.sc.LB
            }
            if cc.dopts.balancer == nil {
                // Connect to target directly if balancer is nil.
                //如果没有设置balancer,地址列表里面就只有target这一个地址
                addrs = append(addrs, Address{Addr: target})
            } else {
                var credsClone credentials.TransportCredentials
                if creds != nil {
                    credsClone = creds.Clone()
                }
                config := BalancerConfig{
                    DialCreds: credsClone,
                }
                //balancer(etcd等)的初始化
                if err := cc.dopts.balancer.Start(target, config); err != nil {
                    waitC <- err
                    return
                }
                //这里会 返回一个chan,元素是每一次地址更新后的所有地址(是全量,不是增量)
                ch := cc.dopts.balancer.Notify()
                if ch == nil {
                    // There is no name resolver installed.
                    addrs = append(addrs, Address{Addr: target})
                } else {
                    addrs, ok = <-ch//ok 表示chan是否关闭,如果关闭了就不需要lbWatcher了(监控地址改动)
                    if !ok || len(addrs) == 0 {
                        waitC <- errNoAddr //没有从balance找到有效的地址
                        return
                    }
                }
            }
            //对于每一个地址 建立连接;
            // 如果调用了WithBlock,则这一步是阻塞的,一直等到所有连接成功(注:建议不要这样,除非你知道你在干什么)
            // 否则里面是通过goroutine异步处理的,不会等待所有的连接成功
            for _, a := range addrs {
                if err := cc.resetAddrConn(a, false, nil); err != nil {
                    waitC <- err
                    return
                }
            }
            close(waitC) //关闭waitC,这样会读取到err=nil
        }()
        select {
        case <-ctx.Done()://同上
            return nil, ctx.Err()
        case err := <-waitC://这一步会等待上一个goroutine结束
            if err != nil {
                return nil, err
            }
        }
    
        // If balancer is nil or balancer.Notify() is nil, ok will be false here.
        // The lbWatcher goroutine will not be created.
        if ok {//只有采用balancer(etcd等)的才会走到这一步,监控服务集群地址的变化
            go cc.lbWatcher()
        }
    
        if cc.dopts.scChan != nil {
            go cc.scWatcher()//监控ServiceConfig的变化,这样可以在dial之后动态的修改client的访问服务的配置ServiceConfig
        }
        return cc, nil
    }
    
    

    3.resetAddrConn

    // resetAddrConn creates an addrConn for addr and adds it to cc.conns.
    // If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
    // If tearDownErr is nil, errConnDrain will be used instead.
    func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error {
        ac := &addrConn{
            cc:    cc,
            addr:  addr,
            dopts: cc.dopts,
        }
        ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
        ac.stateCV = sync.NewCond(&ac.mu)
        if EnableTracing {
            ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
        }
        if !ac.dopts.insecure {
            if ac.dopts.copts.TransportCredentials == nil {
                return errNoTransportSecurity
            }
        } else {
            if ac.dopts.copts.TransportCredentials != nil {
                return errCredentialsConflict
            }
            for _, cd := range ac.dopts.copts.PerRPCCredentials {
                if cd.RequireTransportSecurity() {
                    return errTransportCredentialsMissing
                }
            }
        }
        // Track ac in cc. This needs to be done before any getTransport(...) is called.
        cc.mu.Lock()
        if cc.conns == nil {
            cc.mu.Unlock()
            return ErrClientConnClosing
        }
        stale := cc.conns[ac.addr] //获取旧的addrConn,可能没有,就是nil
        cc.conns[ac.addr] = ac //用新的addrConn 替换
        cc.mu.Unlock()
        if stale != nil {
            //已经存在一个旧的addrConn,需要关闭,有两种可能
            //1. balancer(etcd等) 存在bug,返回了重复的地址~~O(∩_∩)O哈哈~完美甩锅
            //2. 旧的ac收到http2 的goaway(表示服务端不接受新的请求了,但是已有的请求要继续处理完),这里又新建一个,?~在transportMonitor里
            // There is an addrConn alive on ac.addr already. This could be due to
            // 1) a buggy Balancer notifies duplicated Addresses;
            // 2) goaway was received, a new ac will replace the old ac.
            //    The old ac should be deleted from cc.conns, but the
            //    underlying transport should drain rather than close.
            if tearDownErr == nil {
                // tearDownErr is nil if resetAddrConn is called by
                // 1) Dial
                // 2) lbWatcher
                // In both cases, the stale ac should drain, not close.
                stale.tearDown(errConnDrain)
    //errConnDrain不会马上close transport,则是会先stop accepting new RPCs and wait the completion of the pending RPCs
            } else {
                stale.tearDown(tearDownErr)
            }
        }
        //通过WithBlock设置为true后,这里会阻塞,直到所有连接成功;
        // skipWait may overwrite the decision in ac.dopts.block.
        if ac.dopts.block && !skipWait {
            if err := ac.resetTransport(false); err != nil {
                if err != errConnClosing { //如果有错 且不是errConnClosing(表示已经关闭)
                    // Tear down ac and delete it from cc.conns.
                    cc.mu.Lock()
                    delete(cc.conns, ac.addr) //从cc.conns 删除,不在维护
                    cc.mu.Unlock()
                    ac.tearDown(err)// 关闭
                }
                if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
                    return e.Origin()
                }
                return err
            }
            // Start to monitor the error status of transport.
            go ac.transportMonitor()
        } else {//这里不会阻塞,异步的建立连接
            // Start a goroutine connecting to the server asynchronously.
            go func() {
                if err := ac.resetTransport(false); err != nil {
                    grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
                    if err != errConnClosing {
                        // Keep this ac in cc.conns, to get the reason it's torn down.
                        ac.tearDown(err) //关闭,(和上面相比)但是不从cc.conns 删除,为了方便得到tearDownErr reason
                    }
                    return
                }
                ac.transportMonitor()
            }()
        }
        return nil
    }
    
    

    tearDown方法(处理返回的address已存在的情况)

    // tearDown starts to tear down the addrConn.
    // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
    // some edge cases (e.g., the caller opens and closes many addrConn's in a
    // tight loop.
    // tearDown doesn't remove ac from ac.cc.conns.
    func (ac *addrConn) tearDown(err error) {
        ac.cancel() //执行这个之后ac.ctx.Done() 会收到消息
    
        ac.mu.Lock()
        defer ac.mu.Unlock()
        if ac.down != nil {//通知balancer 这个地址 down了
            ac.down(downErrorf(false, false, "%v", err)) //  // the handler called when a connection is down.
            ac.down = nil
        }
        if err == errConnDrain && ac.transport != nil {
            // GracefulClose(...) may be executed multiple times when
            // i) receiving multiple GoAway frames from the server; or
            // ii) there are concurrent name resolver/Balancer triggered
            // address removal and GoAway.
            ac.transport.GracefulClose()
        }
        if ac.state == Shutdown {
            return
        }
        ac.state = Shutdown
        ac.tearDownErr = err
        ac.stateCV.Broadcast()
        if ac.events != nil {
            ac.events.Finish()
            ac.events = nil
        }
        if ac.ready != nil {
            close(ac.ready)
            ac.ready = nil
        }
        if ac.transport != nil && err != errConnDrain {
            ac.transport.Close()
        }
        return
    }
    
    
    

    4.transportMonitor

    // Run in a goroutine to track the error in transport and create the
    // new transport if an error happens. It returns when the channel is closing.
    func (ac *addrConn) transportMonitor() {
        for {
            ac.mu.Lock()
            t := ac.transport
            ac.mu.Unlock()
            select {
            // This is needed to detect the teardown when
            // the addrConn is idle (i.e., no RPC in flight).
            case <-ac.ctx.Done(): //这一步 表示ac.teardown 了,不需要维护了,return掉
                select {
                case <-t.Error():
                    t.Close()
                default:
                }
                return
            case <-t.GoAway():// 这一步,会resetAddrConn,里面会新建一个transportMonitor,这里就不需要维护了,retrrn掉
                // If GoAway happens without any network I/O error, ac is closed without shutting down the
                // underlying transport (the transport will be closed when all the pending RPCs finished or
                // failed.).
                // If GoAway and some network I/O error happen concurrently, ac and its underlying transport
                // are closed.
                // In both cases, a new ac is created.
                select {
                case <-t.Error():
                    ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
                default:
                    ac.cc.resetAddrConn(ac.addr, true, errConnDrain)
                }
                return
            case <-t.Error()://这里是transport 有错,
                select {
                case <-ac.ctx.Done()://这一步 表示ac.teardown(比如ctx 被cancel的情况)不需要维护了,return掉
                    t.Close()
                    return
                case <-t.GoAway()://同上
                    ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
                    return
                default: //如果有错,走到这里~~没有return, 下面会重连
                }
                ac.mu.Lock()
                if ac.state == Shutdown { //不是通过ac.teardown的(这种情况会走到上面),但是还Shutdown的了,什么情况呢???
                    // ac has been shutdown.
                    ac.mu.Unlock()
                    return
                }
                ac.state = TransientFailure //置为临时失败,暂时不让用
                ac.stateCV.Broadcast()
                ac.mu.Unlock()
                if err := ac.resetTransport(true); err != nil {
                    ac.mu.Lock()
                    ac.printf("transport exiting: %v", err)
                    ac.mu.Unlock()
                    grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err)
                    if err != errConnClosing {
                        // Keep this ac in cc.conns, to get the reason it's torn down.
                        ac.tearDown(err)
                    }
                    return
                }
            }
        }
    }
    
    

    5.resetTransport

    func (ac *addrConn) resetTransport(closeTransport bool) error {
        for retries := 0; ; retries++ { //一直循重试建立连接直到成功,除非某些条件下返回
            ac.mu.Lock()
            ac.printf("connecting")
            if ac.state == Shutdown {
                //Shutdown状态表示这个连接已经关闭,不需要维护了,通常服务先挂了,然后balancer(etcd等)中这个地址又被移除了的情况会走到这,直接返回
                // ac.tearDown(...) has been invoked.
                ac.mu.Unlock()
                return errConnClosing
            }
            if ac.down != nil {
                //一般通过存balancer(etcd等)的UP 方法返回的,
                // 如果存在,这里就通知balancer 这个地址连接不ok了,不要用了,其实是因为接下里要reset,O(∩_∩)O哈哈~
                ac.down(downErrorf(false, true, "%v", errNetworkIO))
                ac.down = nil
            }
            ac.state = Connecting  //状态改为正在连接中~~
            ac.stateCV.Broadcast() //lzy todo
            t := ac.transport
            ac.mu.Unlock()
            if closeTransport && t != nil { //旧的要关闭
                t.Close()
            }
            //这里是根据重试的超时策略,返回两次重试的间隔时间;即如果这次重连还是失败,会等待sleepTime才会进入下一次循环
            //retries越大,sleepTime越大
            sleepTime := ac.dopts.bs.backoff(retries)
            timeout := minConnectTimeout
            if timeout < sleepTime {
                timeout = sleepTime
            }
            //设置超时时间,最小20s
            //注:为啥下面err==nil的时候cancel没有执行(官方推荐的是立马defer cancel()) bug?????
            //已经提了个issue https://github.com/grpc/grpc-go/issues/1099
            ctx, cancel := context.WithTimeout(ac.ctx, timeout)
            connectTime := time.Now()
            sinfo := transport.TargetInfo{
                Addr:     ac.addr.Addr,
                Metadata: ac.addr.Metadata,
            }
            newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
            if err != nil {
                cancel()
                //如果不是临时错误,立马返回;否则接下里会重试
                if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
                    return err
                }
                grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
                ac.mu.Lock()
                if ac.state == Shutdown { //同上,即这个地址不需要了
                    // ac.tearDown(...) has been invoked.
                    ac.mu.Unlock()
                    return errConnClosing
                }
                ac.errorf("transient failure: %v", err)
                ac.state = TransientFailure //状态改为短暂的失败
                ac.stateCV.Broadcast()
                if ac.ready != nil { //只有进入ac.wait()才会走入这个逻辑,表示有一个请求正在等待这个地址的连接是成功还是失败
                    close(ac.ready) //建立连接失败了 关闭ready
                    ac.ready = nil
                }
                ac.mu.Unlock()
                closeTransport = false
                select {
                case <-time.After(sleepTime - time.Since(connectTime)): //一直等待足够sleepTime长时间,再进入下一次循环
                case <-ac.ctx.Done(): //这个连接是被cancel掉(超时或者主动cancel)
                    return ac.ctx.Err()
                }
                continue
            }
            ac.mu.Lock()
            ac.printf("ready")
            if ac.state == Shutdown { //同上,所以这个时候要把已经建立的连接close,手动从etcd中删除这个地址会走到这
                // ac.tearDown(...) has been invoked.
                ac.mu.Unlock()
                newTransport.Close()
                return errConnClosing
            }
            ac.state = Ready //状态ok了
            ac.stateCV.Broadcast()
            ac.transport = newTransport
            if ac.ready != nil { //只有进入ac.wait()才会走入这个逻辑,表示有一个请求正在等待这个地址的连接是成功还是失败
                close(ac.ready) //建立连接成功了关闭ready
                ac.ready = nil
            }
            //如果存在balancer(etcd等)就通知balancer 这个地址连接ok了,可以用了
            if ac.cc.dopts.balancer != nil {
                ac.down = ac.cc.dopts.balancer.Up(ac.addr)
            }
            ac.mu.Unlock()
            return nil
        }
    }
    

    以下基于默认配置情况下(还有其它没有提到的配置都取默认值):

    设置了balancer(etcd等)
    没有设置WithBlock,即dialOptions.block = false
    没有设置FailOnNonTempDialError,即dialOptions.copts.FailOnNonTempDialError = false
    grpc.Dial 正常的执行流程,第一次进入的时候的有些逻辑是走不到或者不太重要的都舍去不表

    A. grpc.Dial() 返回一个*ClientConn

    从balancer(etcd等)返回一批地址,但是这批地址暂时还是不能用的,需要等待A225

    A2,对于每一个地址依次建立连接,循环调用cc.resetAddrConn, 即
    A3,单独goroutine监控balancer(etcd等)的变化(cc.lbWatcher()),实时更新服务集群地址,即
    单独goroutine监控监控ServiceConfig的变化(cc.scWatcher),可以在服务启动后动态更新调用服务的配置
    A2,cc.resetAddrConn针对一个地址建立连接,创建一个addrConn加入到ClientConn.conns中去,主流程分为:

    如果这个地址已经存在连接了,先关闭掉ac.teardown
    A22,ac.resetTransport,建立一个底层连接(http2),这一步默认是goroutine出去的,不会阻塞,除非调用WithBlock;
    A23,单独goroutine监控底层连接的状态变化(ac.transportMonitor),进行重连或者放弃

    A22, ac.resetTransport里面是一个大循环,重试建立连接直到成功,除非某些条件下返回(默认情况下只有被ac.teardown了,即在balance中删除),每个循环里面:

    如果ac.state == Shutdown ,直接返回
    将状态改为正在连接中~~ ac.state == Connecting

    计算sleepTime,这里是根据重试的超时策略,返回两次重试的间隔时间;即如果这次重连还是失败,会等待sleepTime才会进入下一次循环
    A224,建立一个底层的http2连接(transport.NewClientTransport);如果是临时错误, 将状态改为短暂的失败,ac.state = TransientFailure,等待sleepTime;如果是非临时错误,直接返回,默认情况下可以认为都是临时错误;
    将状态改为readyac.state = Ready ,通知balancer(etcd等)这个地址连接ok了(up);这样下次就能从balancer 中读取到这个地址了
    A224, 建立一个底层的http2连接(newHTTP2Client)

    dial一个tcp连接,失败的话,默认返回一个临时错误
    单独goroutine,循环的读取所有的帧,并且分发到相应的流中去,如果有错误了,会有通知到A233
    初始化http2 相关的操作(发送setting帧等)
    A23,transportMonitor 是一个单独的goroutine,里面是一个循环,会监控这个连接以下几种情况:

    如果这个连接被ac.teardown了,直接退出,不需要维护了
    如果收到http2的goaway帧,再重新cc.resetAddrConn,即A2,然后当前直接退出;相当于用一个新的连接来替换
    如果这个连接出错了,置为临时失败ac.state = TransientFailure ,暂时不让用,然后重试连接ac.resetTransport,即A22
    A3,cc.lbWatcher监控balancer(etcd等)的变化 , 实时更新集群服务地址

    balancer.Notify() 是一个channel,每当有更新的时候,从这里读取到所有的地址(全量而非增量)
    判断有哪些地址是新增的,哪些地址是删除掉的

    对于新增的地址执行cc.resetAddrConn,即A2

    对于删掉的地址直接ac.tearDown,通知balancer(etcd等)这个地址down了, 这样可能会影响到A231,A221

    对于ac.tearDown,里面会关闭底层的连接,修改状态为ac.state == Shutdown,然后通知balancer(etcd等)这个地址down了,在下次轮询的时候,就不会有这个地址了;

    如果这个balancer(etcd等)收到这个地址的UP的通知,表示这个地址又OK了

    相关文章

      网友评论

          本文标题:grpc-go源码阅读(2) grpc dial 建立连接过程

          本文链接:https://www.haomeiwen.com/subject/bmlleftx.html