美文网首页
grpc+gprc-gateway+consul or etcd

grpc+gprc-gateway+consul or etcd

作者: ZplD | 来源:发表于2020-03-26 11:59 被阅读0次

golang里面最火的应该就是微服务了, 所以最近研究了下grpc

先附上我的demo代码:
https://github.com/Dragon-Zpl/gprc-grpc-gateway-etcdorconsul

记录下自己在看源码了解到的部分东西:

首先如果需要用到第三方的服务注册中心的话,需要去调用“google.golang.org/grpc/resolver”该包下的resolver.Register,将自己实现了(Build,Scheme,ResolveNow,Close)的结构体注册进去,以下是我监听服务改变的函数以下是使用consul,但没使用他的服务注册而是使用consul的key/value结构简单的模拟下:

// 监听,没使用consul里面的服务注册只使用到了key/value模仿,如使用服务的话可使用cc.consulClient.Health().severice获取所有注册的服务信息,并检查健康状态
func (cr *consulResolver) Watcher() {
    cr.wg.Add(1)
    go func() {
        defer cr.wg.Done()
        t := time.NewTimer(10 * time.Second)
        for   {
            select {
            case <- t.C:
                datas := consul.GetConsulDirData(cr.serverName)
                address := make([]resolver.Address, 0)
                for _, data := range datas {
                    address = append(address, resolver.Address{
                        Addr:       data.Ip + ":" + data.Port,
                    })
                } 
                                // 此方法是关键
                cr.cc.UpdateState(resolver.State{Addresses:address})
                t.Reset(10 * time.Second)
            case <- cr.closeCh:
                return
            }
        }
    }()
}

负载的方法重写:

//gateway启动时会把watch传入的address在这里存储起来
func (*roundRobinPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
    grpclog.Infof("roundrobinPicker: newPicker called with readySCs: %v", readySCs)
        
    if len(readySCs) == 0 {
        return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
    }
    var scs []balancer.SubConn
    for _, sc := range readySCs {
        scs = append(scs, sc)
    }

    return &roundRobinPicker{
        subConns: scs,
        next:     rand.Intn(len(scs)),
    }
}

// 每次有请求过来就会调用该方法获取对应的服务的链接 采用轮询
func (p *roundRobinPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
    p.mu.Lock()
    sc := p.subConns[p.next]
    p.next = (p.next + 1) % len(p.subConns)
    p.mu.Unlock()
    return sc, nil, nil
}

gateway 启动

    gwmux := runtime.NewServeMux()
    ctx := context.Background()
    RegisterConsul(serverName)
        // 与服务端建立起链接
    err := pb_test.RegisterMyTestHandlerFromEndpoint(ctx, gwmux, "consul:///", []grpc.DialOption{grpc.WithInsecure(), grpc.WithBalancerName(RoundRobin)})
    if err != nil {
        panic(err)
    }

    err = http.ListenAndServe(":8070", gwmux)
    if err != nil {
        panic(err)
    }

其中我遇到的一个一开始一直困惑我的问题, 为什么RegisterMyTestHandlerFromEndpoint的第三个参数endpoint要使用consul:///",我看了下源码

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
    cc := &ClientConn{
        target:            target,
        csMgr:             &connectivityStateManager{},
        conns:             make(map[*addrConn]struct{}),
        dopts:             defaultDialOptions(),
        blockingpicker:    newPickerWrapper(),
        czData:            new(channelzData),
        firstResolveEvent: grpcsync.NewEvent(),
    }
    cc.retryThrottler.Store((*retryThrottler)(nil))
    cc.ctx, cc.cancel = context.WithCancel(context.Background())

    for _, opt := range opts {
        opt.apply(&cc.dopts)
    }

    chainUnaryClientInterceptors(cc)
    chainStreamClientInterceptors(cc)

    defer func() {
        if err != nil {
            cc.Close()
        }
    }()

    if channelz.IsOn() {
        if cc.dopts.channelzParentID != 0 {
            cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
            channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
                Desc:     "Channel Created",
                Severity: channelz.CtINFO,
                Parent: &channelz.TraceEventDesc{
                    Desc:     fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
                    Severity: channelz.CtINFO,
                },
            })
        } else {
            cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
            channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
                Desc:     "Channel Created",
                Severity: channelz.CtINFO,
            })
        }
        cc.csMgr.channelzID = cc.channelzID
    }

    if !cc.dopts.insecure {
        if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
            return nil, errNoTransportSecurity
        }
        if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
            return nil, errTransportCredsAndBundle
        }
    } else {
        if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
            return nil, errCredentialsConflict
        }
        for _, cd := range cc.dopts.copts.PerRPCCredentials {
            if cd.RequireTransportSecurity() {
                return nil, errTransportCredentialsMissing
            }
        }
    }

    if cc.dopts.defaultServiceConfigRawJSON != nil {
        sc, err := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
        if err != nil {
            return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, err)
        }
        cc.dopts.defaultServiceConfig = sc
    }
    cc.mkp = cc.dopts.copts.KeepaliveParams

    if cc.dopts.copts.Dialer == nil {
        cc.dopts.copts.Dialer = newProxyDialer(
            func(ctx context.Context, addr string) (net.Conn, error) {
                network, addr := parseDialTarget(addr)
                return (&net.Dialer{}).DialContext(ctx, network, addr)
            },
        )
    }

    if cc.dopts.copts.UserAgent != "" {
        cc.dopts.copts.UserAgent += " " + grpcUA
    } else {
        cc.dopts.copts.UserAgent = grpcUA
    }

    if cc.dopts.timeout > 0 {
        var cancel context.CancelFunc
        ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
        defer cancel()
    }
    defer func() {
        select {
        case <-ctx.Done():
            conn, err = nil, ctx.Err()
        default:
        }
    }()

    scSet := false
    if cc.dopts.scChan != nil {
        // Try to get an initial service config.
        select {
        case sc, ok := <-cc.dopts.scChan:
            if ok {
                cc.sc = &sc
                scSet = true
            }
        default:
        }
    }
    if cc.dopts.bs == nil {
        cc.dopts.bs = backoff.Exponential{
            MaxDelay: DefaultBackoffConfig.MaxDelay,
        }
    }
    if cc.dopts.resolverBuilder == nil {
        // Only try to parse target when resolver builder is not already set.
        cc.parsedTarget = parseTarget(cc.target) //此方法是关键
        grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
        cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
        if cc.dopts.resolverBuilder == nil {
            // If resolver builder is still nil, the parsed target's scheme is
            // not registered. Fallback to default resolver and set Endpoint to
            // the original target.
            grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
            cc.parsedTarget = resolver.Target{
                Scheme:   resolver.GetDefaultScheme(),
                Endpoint: target,
            }
            cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
        }
    } else {
        cc.parsedTarget = resolver.Target{Endpoint: target}
    }
    creds := cc.dopts.copts.TransportCredentials
    if creds != nil && creds.Info().ServerName != "" {
        cc.authority = creds.Info().ServerName
    } else if cc.dopts.insecure && cc.dopts.authority != "" {
        cc.authority = cc.dopts.authority
    } else {
        // Use endpoint from "scheme://authority/endpoint" as the default
        // authority for ClientConn.
        cc.authority = cc.parsedTarget.Endpoint
    }

    if cc.dopts.scChan != nil && !scSet {
        // Blocking wait for the initial service config.
        select {
        case sc, ok := <-cc.dopts.scChan:
            if ok {
                cc.sc = &sc
            }
        case <-ctx.Done():
            return nil, ctx.Err()
        }
    }
    if cc.dopts.scChan != nil {
        go cc.scWatcher()
    }

    var credsClone credentials.TransportCredentials
    if creds := cc.dopts.copts.TransportCredentials; creds != nil {
        credsClone = creds.Clone()
    }
    cc.balancerBuildOpts = balancer.BuildOptions{
        DialCreds:        credsClone,
        CredsBundle:      cc.dopts.copts.CredsBundle,
        Dialer:           cc.dopts.copts.Dialer,
        ChannelzParentID: cc.channelzID,
        Target:           cc.parsedTarget,
    }

    // Build the resolver.
    rWrapper, err := newCCResolverWrapper(cc)
    if err != nil {
        return nil, fmt.Errorf("failed to build resolver: %v", err)
    }

    cc.mu.Lock()
    cc.resolverWrapper = rWrapper
    cc.mu.Unlock()
    // A blocking dial blocks until the clientConn is ready.
    if cc.dopts.block {
        for {
            s := cc.GetState()
            if s == connectivity.Ready {
                break
            } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
                if err = cc.blockingpicker.connectionError(); err != nil {
                    terr, ok := err.(interface {
                        Temporary() bool
                    })
                    if ok && !terr.Temporary() {
                        return nil, err
                    }
                }
            }
            if !cc.WaitForStateChange(ctx, s) {
                // ctx got timeout or canceled.
                return nil, ctx.Err()
            }
        }
    }

    return cc, nil
}

其中的parseTarget是关键,他对这个target进行了切割获取到Scheme,而该方法下根据scheme去获取resolverBuilder:cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme),其实这个就是我们在开启gateway时调用RegisterConsul会将我们自定义的服务注册的Builder(自定义的结构体)赋到map[string]Builder这样的map中,而相应的负载方法我们重写后也是注册到一个map中,在创建服务端连接的时候就会从中获取.

func parseTarget(target string) (ret resolver.Target) {
    var ok bool
    ret.Scheme, ret.Endpoint, ok = split2(target, "://")
    if !ok {
        return resolver.Target{Endpoint: target}
    }
    ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
    if !ok {
        return resolver.Target{Endpoint: target}
    }
    return ret
}

写的可能有点乱,可参考此文章:https://segmentfault.com/a/1190000018424798?utm_source=tag-newest

相关文章

网友评论

      本文标题:grpc+gprc-gateway+consul or etcd

      本文链接:https://www.haomeiwen.com/subject/hoiquhtx.html