美文网首页
grpc+gprc-gateway+consul or etcd

grpc+gprc-gateway+consul or etcd

作者: ZplD | 来源:发表于2020-03-26 11:59 被阅读0次

    golang里面最火的应该就是微服务了, 所以最近研究了下grpc

    先附上我的demo代码:
    https://github.com/Dragon-Zpl/gprc-grpc-gateway-etcdorconsul

    记录下自己在看源码了解到的部分东西:

    首先如果需要用到第三方的服务注册中心的话,需要去调用“google.golang.org/grpc/resolver”该包下的resolver.Register,将自己实现了(Build,Scheme,ResolveNow,Close)的结构体注册进去,以下是我监听服务改变的函数以下是使用consul,但没使用他的服务注册而是使用consul的key/value结构简单的模拟下:

    // 监听,没使用consul里面的服务注册只使用到了key/value模仿,如使用服务的话可使用cc.consulClient.Health().severice获取所有注册的服务信息,并检查健康状态
    func (cr *consulResolver) Watcher() {
        cr.wg.Add(1)
        go func() {
            defer cr.wg.Done()
            t := time.NewTimer(10 * time.Second)
            for   {
                select {
                case <- t.C:
                    datas := consul.GetConsulDirData(cr.serverName)
                    address := make([]resolver.Address, 0)
                    for _, data := range datas {
                        address = append(address, resolver.Address{
                            Addr:       data.Ip + ":" + data.Port,
                        })
                    } 
                                    // 此方法是关键
                    cr.cc.UpdateState(resolver.State{Addresses:address})
                    t.Reset(10 * time.Second)
                case <- cr.closeCh:
                    return
                }
            }
        }()
    }
    
    

    负载的方法重写:

    //gateway启动时会把watch传入的address在这里存储起来
    func (*roundRobinPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
        grpclog.Infof("roundrobinPicker: newPicker called with readySCs: %v", readySCs)
            
        if len(readySCs) == 0 {
            return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
        }
        var scs []balancer.SubConn
        for _, sc := range readySCs {
            scs = append(scs, sc)
        }
    
        return &roundRobinPicker{
            subConns: scs,
            next:     rand.Intn(len(scs)),
        }
    }
    
    // 每次有请求过来就会调用该方法获取对应的服务的链接 采用轮询
    func (p *roundRobinPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
        p.mu.Lock()
        sc := p.subConns[p.next]
        p.next = (p.next + 1) % len(p.subConns)
        p.mu.Unlock()
        return sc, nil, nil
    }
    
    

    gateway 启动

        gwmux := runtime.NewServeMux()
        ctx := context.Background()
        RegisterConsul(serverName)
            // 与服务端建立起链接
        err := pb_test.RegisterMyTestHandlerFromEndpoint(ctx, gwmux, "consul:///", []grpc.DialOption{grpc.WithInsecure(), grpc.WithBalancerName(RoundRobin)})
        if err != nil {
            panic(err)
        }
    
        err = http.ListenAndServe(":8070", gwmux)
        if err != nil {
            panic(err)
        }
    

    其中我遇到的一个一开始一直困惑我的问题, 为什么RegisterMyTestHandlerFromEndpoint的第三个参数endpoint要使用consul:///",我看了下源码

    func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
        cc := &ClientConn{
            target:            target,
            csMgr:             &connectivityStateManager{},
            conns:             make(map[*addrConn]struct{}),
            dopts:             defaultDialOptions(),
            blockingpicker:    newPickerWrapper(),
            czData:            new(channelzData),
            firstResolveEvent: grpcsync.NewEvent(),
        }
        cc.retryThrottler.Store((*retryThrottler)(nil))
        cc.ctx, cc.cancel = context.WithCancel(context.Background())
    
        for _, opt := range opts {
            opt.apply(&cc.dopts)
        }
    
        chainUnaryClientInterceptors(cc)
        chainStreamClientInterceptors(cc)
    
        defer func() {
            if err != nil {
                cc.Close()
            }
        }()
    
        if channelz.IsOn() {
            if cc.dopts.channelzParentID != 0 {
                cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
                channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
                    Desc:     "Channel Created",
                    Severity: channelz.CtINFO,
                    Parent: &channelz.TraceEventDesc{
                        Desc:     fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
                        Severity: channelz.CtINFO,
                    },
                })
            } else {
                cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
                channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
                    Desc:     "Channel Created",
                    Severity: channelz.CtINFO,
                })
            }
            cc.csMgr.channelzID = cc.channelzID
        }
    
        if !cc.dopts.insecure {
            if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
                return nil, errNoTransportSecurity
            }
            if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
                return nil, errTransportCredsAndBundle
            }
        } else {
            if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
                return nil, errCredentialsConflict
            }
            for _, cd := range cc.dopts.copts.PerRPCCredentials {
                if cd.RequireTransportSecurity() {
                    return nil, errTransportCredentialsMissing
                }
            }
        }
    
        if cc.dopts.defaultServiceConfigRawJSON != nil {
            sc, err := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
            if err != nil {
                return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, err)
            }
            cc.dopts.defaultServiceConfig = sc
        }
        cc.mkp = cc.dopts.copts.KeepaliveParams
    
        if cc.dopts.copts.Dialer == nil {
            cc.dopts.copts.Dialer = newProxyDialer(
                func(ctx context.Context, addr string) (net.Conn, error) {
                    network, addr := parseDialTarget(addr)
                    return (&net.Dialer{}).DialContext(ctx, network, addr)
                },
            )
        }
    
        if cc.dopts.copts.UserAgent != "" {
            cc.dopts.copts.UserAgent += " " + grpcUA
        } else {
            cc.dopts.copts.UserAgent = grpcUA
        }
    
        if cc.dopts.timeout > 0 {
            var cancel context.CancelFunc
            ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
            defer cancel()
        }
        defer func() {
            select {
            case <-ctx.Done():
                conn, err = nil, ctx.Err()
            default:
            }
        }()
    
        scSet := false
        if cc.dopts.scChan != nil {
            // Try to get an initial service config.
            select {
            case sc, ok := <-cc.dopts.scChan:
                if ok {
                    cc.sc = &sc
                    scSet = true
                }
            default:
            }
        }
        if cc.dopts.bs == nil {
            cc.dopts.bs = backoff.Exponential{
                MaxDelay: DefaultBackoffConfig.MaxDelay,
            }
        }
        if cc.dopts.resolverBuilder == nil {
            // Only try to parse target when resolver builder is not already set.
            cc.parsedTarget = parseTarget(cc.target) //此方法是关键
            grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
            cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
            if cc.dopts.resolverBuilder == nil {
                // If resolver builder is still nil, the parsed target's scheme is
                // not registered. Fallback to default resolver and set Endpoint to
                // the original target.
                grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
                cc.parsedTarget = resolver.Target{
                    Scheme:   resolver.GetDefaultScheme(),
                    Endpoint: target,
                }
                cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
            }
        } else {
            cc.parsedTarget = resolver.Target{Endpoint: target}
        }
        creds := cc.dopts.copts.TransportCredentials
        if creds != nil && creds.Info().ServerName != "" {
            cc.authority = creds.Info().ServerName
        } else if cc.dopts.insecure && cc.dopts.authority != "" {
            cc.authority = cc.dopts.authority
        } else {
            // Use endpoint from "scheme://authority/endpoint" as the default
            // authority for ClientConn.
            cc.authority = cc.parsedTarget.Endpoint
        }
    
        if cc.dopts.scChan != nil && !scSet {
            // Blocking wait for the initial service config.
            select {
            case sc, ok := <-cc.dopts.scChan:
                if ok {
                    cc.sc = &sc
                }
            case <-ctx.Done():
                return nil, ctx.Err()
            }
        }
        if cc.dopts.scChan != nil {
            go cc.scWatcher()
        }
    
        var credsClone credentials.TransportCredentials
        if creds := cc.dopts.copts.TransportCredentials; creds != nil {
            credsClone = creds.Clone()
        }
        cc.balancerBuildOpts = balancer.BuildOptions{
            DialCreds:        credsClone,
            CredsBundle:      cc.dopts.copts.CredsBundle,
            Dialer:           cc.dopts.copts.Dialer,
            ChannelzParentID: cc.channelzID,
            Target:           cc.parsedTarget,
        }
    
        // Build the resolver.
        rWrapper, err := newCCResolverWrapper(cc)
        if err != nil {
            return nil, fmt.Errorf("failed to build resolver: %v", err)
        }
    
        cc.mu.Lock()
        cc.resolverWrapper = rWrapper
        cc.mu.Unlock()
        // A blocking dial blocks until the clientConn is ready.
        if cc.dopts.block {
            for {
                s := cc.GetState()
                if s == connectivity.Ready {
                    break
                } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
                    if err = cc.blockingpicker.connectionError(); err != nil {
                        terr, ok := err.(interface {
                            Temporary() bool
                        })
                        if ok && !terr.Temporary() {
                            return nil, err
                        }
                    }
                }
                if !cc.WaitForStateChange(ctx, s) {
                    // ctx got timeout or canceled.
                    return nil, ctx.Err()
                }
            }
        }
    
        return cc, nil
    }
    
    

    其中的parseTarget是关键,他对这个target进行了切割获取到Scheme,而该方法下根据scheme去获取resolverBuilder:cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme),其实这个就是我们在开启gateway时调用RegisterConsul会将我们自定义的服务注册的Builder(自定义的结构体)赋到map[string]Builder这样的map中,而相应的负载方法我们重写后也是注册到一个map中,在创建服务端连接的时候就会从中获取.

    func parseTarget(target string) (ret resolver.Target) {
        var ok bool
        ret.Scheme, ret.Endpoint, ok = split2(target, "://")
        if !ok {
            return resolver.Target{Endpoint: target}
        }
        ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
        if !ok {
            return resolver.Target{Endpoint: target}
        }
        return ret
    }
    

    写的可能有点乱,可参考此文章:https://segmentfault.com/a/1190000018424798?utm_source=tag-newest

    相关文章

      网友评论

          本文标题:grpc+gprc-gateway+consul or etcd

          本文链接:https://www.haomeiwen.com/subject/hoiquhtx.html