参考:https://github.com/liangzhiyang/annotate-grpc-go
正常启动一个grpcClient连接如下:
func main() {
// Set up a connection to the server.
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// Contact the server and print out its response.
name := defaultName
if len(os.Args) > 1 {
name = os.Args[1]
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.Message)
}
1.grpc.Dial
// Dial creates a client connection to the given target.
// targe->server地址 opts->DialOption
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
DialOption参数,grpcClient连接时传入
// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
unaryInt UnaryClientInterceptor // UnaryClientInterceptor intercepts the execution of a unary RPC on the client.
streamInt StreamClientInterceptor // StreamClientInterceptor intercepts the creation of ClientStream
codec Codec //编码方式,默认是protoCodec
cp Compressor // Compressor defines the interface gRPC uses to compress a message.
dc Decompressor // Decompressor defines the interface gRPC uses to decompress a message.
bs backoffStrategy //backoff重试策略
balancer Balancer //负载均衡,自带的RoundRobin就会返回一个轮训策略的对象
block bool // 设置为true则如果集群有多个address,grpc连接是阻塞的,一直等到所有连接成功
insecure bool //是否需要安全验证
timeout time.Duration // 超时时间
copts transport.ConnectOptions // // ConnectOptions covers all relevant options for communicating with the server.
scChan <-chan ServiceConfig //通过WithServiceConfig设置,可以异步设置 提供配置负债均衡的方式及service里的methods,如下:
type ServiceConfig struct {
// LB is the load balancer the service providers recommends. The balancer specified
// via grpc.WithBalancer will override this.
LB Balancer
// Methods contains a map for the methods in this service.
Methods map[string]MethodConfig
}
}
2.DialContext
// DialContext creates a client connection to the given target. ctx can be used to
// cancel or expire the pending connecting. Once this function returns, the
// cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
// to terminate all the pending operations after this function returns.
// This is the EXPERIMENTAL API.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
conns: make(map[Address]*addrConn),
}
//这里设置取消的context,可以调用cc.cancel 主动中断dial
cc.ctx, cc.cancel = context.WithCancel(context.Background())
for _, opt := range opts {
opt(&cc.dopts)//初始化所有参数
}
if cc.dopts.timeout > 0 {//这个值可以通过 WithTimeout 设置;
var cancel context.CancelFunc
//这个方法里面会 在timeout 时间之后,close chan,这样Done() 就可以读了
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) // 设置定时的context
defer cancel() //这一步是用来及时释放资源,里面调用了ctx.cancel,关闭了chan(Done()会返回的那个),
}
defer func() {
select {
case <-ctx.Done()://收到这个表示 前面WithTimeout设置了超时 并且ctx过期了
conn, err = nil, ctx.Err()
default:
}
if err != nil {
cc.Close()
}
}()
//通过WithServiceConfig 设置, 可以异步的 设置service config
if cc.dopts.scChan != nil {
// Wait for the initial service config.
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = sc
}
case <-ctx.Done(): //同上
return nil, ctx.Err()
}
}
// Set defaults.
if cc.dopts.codec == nil {
cc.dopts.codec = protoCodec{}//默认编码
}
if cc.dopts.bs == nil {
cc.dopts.bs = DefaultBackoffConfig //默认的backoff重试策略,
}
creds := cc.dopts.copts.TransportCredentials
if creds != nil && creds.Info().ServerName != "" {
cc.authority = creds.Info().ServerName
} else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
cc.authority = cc.dopts.copts.Authority
} else {
colonPos := strings.LastIndex(target, ":")
if colonPos == -1 {
colonPos = len(target)
}
cc.authority = target[:colonPos]
}
var ok bool
waitC := make(chan error, 1)
//单独goroutine执行,里面有错误会写到waitC 里, 用来获取集群服务的所有地址,并建立连接
//虽然go出去了,但是还是要等待这个goroutine执行结束,是阻塞的
go func() {
var addrs []Address
//负载均衡的配置
//cc.dopts.balancer和cc.sc.LB都是Balancer接口,分别通过WithBalancer 和 通过WithServiceConfig 设置,前者会覆盖后者
if cc.dopts.balancer == nil && cc.sc.LB != nil {
cc.dopts.balancer = cc.sc.LB
}
if cc.dopts.balancer == nil {
// Connect to target directly if balancer is nil.
//如果没有设置balancer,地址列表里面就只有target这一个地址
addrs = append(addrs, Address{Addr: target})
} else {
var credsClone credentials.TransportCredentials
if creds != nil {
credsClone = creds.Clone()
}
config := BalancerConfig{
DialCreds: credsClone,
}
//balancer(etcd等)的初始化
if err := cc.dopts.balancer.Start(target, config); err != nil {
waitC <- err
return
}
//这里会 返回一个chan,元素是每一次地址更新后的所有地址(是全量,不是增量)
ch := cc.dopts.balancer.Notify()
if ch == nil {
// There is no name resolver installed.
addrs = append(addrs, Address{Addr: target})
} else {
addrs, ok = <-ch//ok 表示chan是否关闭,如果关闭了就不需要lbWatcher了(监控地址改动)
if !ok || len(addrs) == 0 {
waitC <- errNoAddr //没有从balance找到有效的地址
return
}
}
}
//对于每一个地址 建立连接;
// 如果调用了WithBlock,则这一步是阻塞的,一直等到所有连接成功(注:建议不要这样,除非你知道你在干什么)
// 否则里面是通过goroutine异步处理的,不会等待所有的连接成功
for _, a := range addrs {
if err := cc.resetAddrConn(a, false, nil); err != nil {
waitC <- err
return
}
}
close(waitC) //关闭waitC,这样会读取到err=nil
}()
select {
case <-ctx.Done()://同上
return nil, ctx.Err()
case err := <-waitC://这一步会等待上一个goroutine结束
if err != nil {
return nil, err
}
}
// If balancer is nil or balancer.Notify() is nil, ok will be false here.
// The lbWatcher goroutine will not be created.
if ok {//只有采用balancer(etcd等)的才会走到这一步,监控服务集群地址的变化
go cc.lbWatcher()
}
if cc.dopts.scChan != nil {
go cc.scWatcher()//监控ServiceConfig的变化,这样可以在dial之后动态的修改client的访问服务的配置ServiceConfig
}
return cc, nil
}
3.resetAddrConn
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
// If tearDownErr is nil, errConnDrain will be used instead.
func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error {
ac := &addrConn{
cc: cc,
addr: addr,
dopts: cc.dopts,
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
ac.stateCV = sync.NewCond(&ac.mu)
if EnableTracing {
ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
}
if !ac.dopts.insecure {
if ac.dopts.copts.TransportCredentials == nil {
return errNoTransportSecurity
}
} else {
if ac.dopts.copts.TransportCredentials != nil {
return errCredentialsConflict
}
for _, cd := range ac.dopts.copts.PerRPCCredentials {
if cd.RequireTransportSecurity() {
return errTransportCredentialsMissing
}
}
}
// Track ac in cc. This needs to be done before any getTransport(...) is called.
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
return ErrClientConnClosing
}
stale := cc.conns[ac.addr] //获取旧的addrConn,可能没有,就是nil
cc.conns[ac.addr] = ac //用新的addrConn 替换
cc.mu.Unlock()
if stale != nil {
//已经存在一个旧的addrConn,需要关闭,有两种可能
//1. balancer(etcd等) 存在bug,返回了重复的地址~~O(∩_∩)O哈哈~完美甩锅
//2. 旧的ac收到http2 的goaway(表示服务端不接受新的请求了,但是已有的请求要继续处理完),这里又新建一个,?~在transportMonitor里
// There is an addrConn alive on ac.addr already. This could be due to
// 1) a buggy Balancer notifies duplicated Addresses;
// 2) goaway was received, a new ac will replace the old ac.
// The old ac should be deleted from cc.conns, but the
// underlying transport should drain rather than close.
if tearDownErr == nil {
// tearDownErr is nil if resetAddrConn is called by
// 1) Dial
// 2) lbWatcher
// In both cases, the stale ac should drain, not close.
stale.tearDown(errConnDrain)
//errConnDrain不会马上close transport,则是会先stop accepting new RPCs and wait the completion of the pending RPCs
} else {
stale.tearDown(tearDownErr)
}
}
//通过WithBlock设置为true后,这里会阻塞,直到所有连接成功;
// skipWait may overwrite the decision in ac.dopts.block.
if ac.dopts.block && !skipWait {
if err := ac.resetTransport(false); err != nil {
if err != errConnClosing { //如果有错 且不是errConnClosing(表示已经关闭)
// Tear down ac and delete it from cc.conns.
cc.mu.Lock()
delete(cc.conns, ac.addr) //从cc.conns 删除,不在维护
cc.mu.Unlock()
ac.tearDown(err)// 关闭
}
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
return e.Origin()
}
return err
}
// Start to monitor the error status of transport.
go ac.transportMonitor()
} else {//这里不会阻塞,异步的建立连接
// Start a goroutine connecting to the server asynchronously.
go func() {
if err := ac.resetTransport(false); err != nil {
grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err) //关闭,(和上面相比)但是不从cc.conns 删除,为了方便得到tearDownErr reason
}
return
}
ac.transportMonitor()
}()
}
return nil
}
tearDown方法(处理返回的address已存在的情况)
// tearDown starts to tear down the addrConn.
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
// some edge cases (e.g., the caller opens and closes many addrConn's in a
// tight loop.
// tearDown doesn't remove ac from ac.cc.conns.
func (ac *addrConn) tearDown(err error) {
ac.cancel() //执行这个之后ac.ctx.Done() 会收到消息
ac.mu.Lock()
defer ac.mu.Unlock()
if ac.down != nil {//通知balancer 这个地址 down了
ac.down(downErrorf(false, false, "%v", err)) // // the handler called when a connection is down.
ac.down = nil
}
if err == errConnDrain && ac.transport != nil {
// GracefulClose(...) may be executed multiple times when
// i) receiving multiple GoAway frames from the server; or
// ii) there are concurrent name resolver/Balancer triggered
// address removal and GoAway.
ac.transport.GracefulClose()
}
if ac.state == Shutdown {
return
}
ac.state = Shutdown
ac.tearDownErr = err
ac.stateCV.Broadcast()
if ac.events != nil {
ac.events.Finish()
ac.events = nil
}
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
}
if ac.transport != nil && err != errConnDrain {
ac.transport.Close()
}
return
}
4.transportMonitor
// Run in a goroutine to track the error in transport and create the
// new transport if an error happens. It returns when the channel is closing.
func (ac *addrConn) transportMonitor() {
for {
ac.mu.Lock()
t := ac.transport
ac.mu.Unlock()
select {
// This is needed to detect the teardown when
// the addrConn is idle (i.e., no RPC in flight).
case <-ac.ctx.Done(): //这一步 表示ac.teardown 了,不需要维护了,return掉
select {
case <-t.Error():
t.Close()
default:
}
return
case <-t.GoAway():// 这一步,会resetAddrConn,里面会新建一个transportMonitor,这里就不需要维护了,retrrn掉
// If GoAway happens without any network I/O error, ac is closed without shutting down the
// underlying transport (the transport will be closed when all the pending RPCs finished or
// failed.).
// If GoAway and some network I/O error happen concurrently, ac and its underlying transport
// are closed.
// In both cases, a new ac is created.
select {
case <-t.Error():
ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
default:
ac.cc.resetAddrConn(ac.addr, true, errConnDrain)
}
return
case <-t.Error()://这里是transport 有错,
select {
case <-ac.ctx.Done()://这一步 表示ac.teardown(比如ctx 被cancel的情况)不需要维护了,return掉
t.Close()
return
case <-t.GoAway()://同上
ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
return
default: //如果有错,走到这里~~没有return, 下面会重连
}
ac.mu.Lock()
if ac.state == Shutdown { //不是通过ac.teardown的(这种情况会走到上面),但是还Shutdown的了,什么情况呢???
// ac has been shutdown.
ac.mu.Unlock()
return
}
ac.state = TransientFailure //置为临时失败,暂时不让用
ac.stateCV.Broadcast()
ac.mu.Unlock()
if err := ac.resetTransport(true); err != nil {
ac.mu.Lock()
ac.printf("transport exiting: %v", err)
ac.mu.Unlock()
grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
return
}
}
}
}
5.resetTransport
func (ac *addrConn) resetTransport(closeTransport bool) error {
for retries := 0; ; retries++ { //一直循重试建立连接直到成功,除非某些条件下返回
ac.mu.Lock()
ac.printf("connecting")
if ac.state == Shutdown {
//Shutdown状态表示这个连接已经关闭,不需要维护了,通常服务先挂了,然后balancer(etcd等)中这个地址又被移除了的情况会走到这,直接返回
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
return errConnClosing
}
if ac.down != nil {
//一般通过存balancer(etcd等)的UP 方法返回的,
// 如果存在,这里就通知balancer 这个地址连接不ok了,不要用了,其实是因为接下里要reset,O(∩_∩)O哈哈~
ac.down(downErrorf(false, true, "%v", errNetworkIO))
ac.down = nil
}
ac.state = Connecting //状态改为正在连接中~~
ac.stateCV.Broadcast() //lzy todo
t := ac.transport
ac.mu.Unlock()
if closeTransport && t != nil { //旧的要关闭
t.Close()
}
//这里是根据重试的超时策略,返回两次重试的间隔时间;即如果这次重连还是失败,会等待sleepTime才会进入下一次循环
//retries越大,sleepTime越大
sleepTime := ac.dopts.bs.backoff(retries)
timeout := minConnectTimeout
if timeout < sleepTime {
timeout = sleepTime
}
//设置超时时间,最小20s
//注:为啥下面err==nil的时候cancel没有执行(官方推荐的是立马defer cancel()) bug?????
//已经提了个issue https://github.com/grpc/grpc-go/issues/1099
ctx, cancel := context.WithTimeout(ac.ctx, timeout)
connectTime := time.Now()
sinfo := transport.TargetInfo{
Addr: ac.addr.Addr,
Metadata: ac.addr.Metadata,
}
newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
if err != nil {
cancel()
//如果不是临时错误,立马返回;否则接下里会重试
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
return err
}
grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
ac.mu.Lock()
if ac.state == Shutdown { //同上,即这个地址不需要了
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
return errConnClosing
}
ac.errorf("transient failure: %v", err)
ac.state = TransientFailure //状态改为短暂的失败
ac.stateCV.Broadcast()
if ac.ready != nil { //只有进入ac.wait()才会走入这个逻辑,表示有一个请求正在等待这个地址的连接是成功还是失败
close(ac.ready) //建立连接失败了 关闭ready
ac.ready = nil
}
ac.mu.Unlock()
closeTransport = false
select {
case <-time.After(sleepTime - time.Since(connectTime)): //一直等待足够sleepTime长时间,再进入下一次循环
case <-ac.ctx.Done(): //这个连接是被cancel掉(超时或者主动cancel)
return ac.ctx.Err()
}
continue
}
ac.mu.Lock()
ac.printf("ready")
if ac.state == Shutdown { //同上,所以这个时候要把已经建立的连接close,手动从etcd中删除这个地址会走到这
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
newTransport.Close()
return errConnClosing
}
ac.state = Ready //状态ok了
ac.stateCV.Broadcast()
ac.transport = newTransport
if ac.ready != nil { //只有进入ac.wait()才会走入这个逻辑,表示有一个请求正在等待这个地址的连接是成功还是失败
close(ac.ready) //建立连接成功了关闭ready
ac.ready = nil
}
//如果存在balancer(etcd等)就通知balancer 这个地址连接ok了,可以用了
if ac.cc.dopts.balancer != nil {
ac.down = ac.cc.dopts.balancer.Up(ac.addr)
}
ac.mu.Unlock()
return nil
}
}
以下基于默认配置情况下(还有其它没有提到的配置都取默认值):
设置了balancer(etcd等)
没有设置WithBlock,即dialOptions.block = false
没有设置FailOnNonTempDialError,即dialOptions.copts.FailOnNonTempDialError = false
grpc.Dial 正常的执行流程,第一次进入的时候的有些逻辑是走不到或者不太重要的都舍去不表
A. grpc.Dial() 返回一个*ClientConn
从balancer(etcd等)返回一批地址,但是这批地址暂时还是不能用的,需要等待A225
A2,对于每一个地址依次建立连接,循环调用cc.resetAddrConn, 即
A3,单独goroutine监控balancer(etcd等)的变化(cc.lbWatcher()),实时更新服务集群地址,即
单独goroutine监控监控ServiceConfig的变化(cc.scWatcher),可以在服务启动后动态更新调用服务的配置
A2,cc.resetAddrConn针对一个地址建立连接,创建一个addrConn加入到ClientConn.conns中去,主流程分为:
如果这个地址已经存在连接了,先关闭掉ac.teardown
A22,ac.resetTransport,建立一个底层连接(http2),这一步默认是goroutine出去的,不会阻塞,除非调用WithBlock;
A23,单独goroutine监控底层连接的状态变化(ac.transportMonitor),进行重连或者放弃
A22, ac.resetTransport里面是一个大循环,重试建立连接直到成功,除非某些条件下返回(默认情况下只有被ac.teardown了,即在balance中删除),每个循环里面:
如果ac.state == Shutdown ,直接返回
将状态改为正在连接中~~ ac.state == Connecting
计算sleepTime,这里是根据重试的超时策略,返回两次重试的间隔时间;即如果这次重连还是失败,会等待sleepTime才会进入下一次循环
A224,建立一个底层的http2连接(transport.NewClientTransport);如果是临时错误, 将状态改为短暂的失败,ac.state = TransientFailure,等待sleepTime;如果是非临时错误,直接返回,默认情况下可以认为都是临时错误;
将状态改为readyac.state = Ready ,通知balancer(etcd等)这个地址连接ok了(up);这样下次就能从balancer 中读取到这个地址了
A224, 建立一个底层的http2连接(newHTTP2Client)
dial一个tcp连接,失败的话,默认返回一个临时错误
单独goroutine,循环的读取所有的帧,并且分发到相应的流中去,如果有错误了,会有通知到A233
初始化http2 相关的操作(发送setting帧等)
A23,transportMonitor 是一个单独的goroutine,里面是一个循环,会监控这个连接以下几种情况:
如果这个连接被ac.teardown了,直接退出,不需要维护了
如果收到http2的goaway帧,再重新cc.resetAddrConn,即A2,然后当前直接退出;相当于用一个新的连接来替换
如果这个连接出错了,置为临时失败ac.state = TransientFailure ,暂时不让用,然后重试连接ac.resetTransport,即A22
A3,cc.lbWatcher监控balancer(etcd等)的变化 , 实时更新集群服务地址
balancer.Notify() 是一个channel,每当有更新的时候,从这里读取到所有的地址(全量而非增量)
判断有哪些地址是新增的,哪些地址是删除掉的
对于新增的地址执行cc.resetAddrConn,即A2
对于删掉的地址直接ac.tearDown,通知balancer(etcd等)这个地址down了, 这样可能会影响到A231,A221
对于ac.tearDown,里面会关闭底层的连接,修改状态为ac.state == Shutdown,然后通知balancer(etcd等)这个地址down了,在下次轮询的时候,就不会有这个地址了;
如果这个balancer(etcd等)收到这个地址的UP的通知,表示这个地址又OK了
网友评论