美文网首页
[istio源码分析][pilot] pilot之ads

[istio源码分析][pilot] pilot之ads

作者: nicktming | 来源:发表于2020-02-02 23:30 被阅读0次

    1. 前言

    转载请说明原文出处, 尊重他人劳动成果!

    源码位置: https://github.com/nicktming/istio
    分支: tming-v1.3.6 (基于1.3.6版本)

    在前一篇文章 [istio源码分析][pilot] pilot之DiscoveryServer 中已经分析了DiscoveryServer拿到galleyk8s的数据后向每一个连接的client端(其实是envoy) 发送了一个XdsEventclient.pushChannel.

    本文将分析从clientdiscoveryServer端之间的联系. 主要是分析整体的流程, 关于细节方面的东西可以在具体问题中进行调试即可.

    2. adsc

    adscistio模拟的一个client端, 真正的客户端是sidecar中的envoy. 这里为了方便, 因此用adsc进行分析.

    // pkg/adsc/adsc.go
    func (a *ADSC) Run() error {
        var err error
        if len(a.certDir) > 0 {
            ...
            a.conn, err = grpc.Dial(a.url, opts...)
            ...
        } else {
            a.conn, err = grpc.Dial(a.url, grpc.WithInsecure())
            ...
        }
        // 建立连接
        xds := ads.NewAggregatedDiscoveryServiceClient(a.conn)
        edsstr, err := xds.StreamAggregatedResources(context.Background())
        if err != nil {
            return err
        }
        a.stream = edsstr
        // 从discovery server接收信息
        go a.handleRecv()
        return nil
    }
    func (a *ADSC) handleRecv() {
        for {
            // 从server端获得信息
            msg, err := a.stream.Recv()
            ...
            listeners := []*xdsapi.Listener{}
            clusters := []*xdsapi.Cluster{}
            routes := []*xdsapi.RouteConfiguration{}
            eds := []*xdsapi.ClusterLoadAssignment{}
            // 为获得的数据分类
            for _, rsc := range msg.Resources { // Any
                a.VersionInfo[rsc.TypeUrl] = msg.VersionInfo
                valBytes := rsc.Value
                if rsc.TypeUrl == listenerType {
                    ll := &xdsapi.Listener{}
                    _ = proto.Unmarshal(valBytes, ll)
                    listeners = append(listeners, ll)
                } ...
            }
            ...
            // 向server端发送ACK
            a.ack(msg)
            ...
            // 客户端处理listener, cluster, endpoint, route
            ...
        }
    }
    func (a *ADSC) ack(msg *xdsapi.DiscoveryResponse) {
        _ = a.stream.Send(&xdsapi.DiscoveryRequest{
            ResponseNonce: msg.Nonce,
            TypeUrl:       msg.TypeUrl,
            Node:          a.node(),
            VersionInfo:   msg.VersionInfo,
        })
    }
    

    1.discoveryServer建立连接.
    2. 通过handleRecv方法与server端交流.
    3. 将获得的数据分类(endpoint, listener, cluster, route)
    4.server端发送ack.
    5. 客户端自己处理数据.

    envoy就是数据生成的配置文件了, xds协议.

    3. server端

    // StreamAggregatedResources implements the ADS interface.
    func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
        peerInfo, ok := peer.FromContext(stream.Context())
        peerAddr := "0.0.0.0"
        if ok {
            peerAddr = peerInfo.Addr.String()
        }
        t0 := time.Now()
        err := s.globalPushContext().InitContext(s.Env)
        ...
        // 创建一个XdsConnection
        con := newXdsConnection(peerAddr, stream)
        var receiveError error
        reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
        // 接收con对应的client的请求
        go receiveThread(con, reqChannel, &receiveError)
    
        node := &core.Node{}
        for {
            // Block until either a request is received or a push is triggered.
            select {
            case discReq, ok := <-reqChannel:
                if !ok {
                    // Remote side closed connection.
                    return receiveError
                }
                // This should be only set for the first request. Guard with ID check regardless.
                if discReq.Node != nil && discReq.Node.Id != "" {
                    node = discReq.Node
                    err = s.initConnectionNode(discReq.Node, con)
                    if err != nil {
                        return err
                    }
                }
                switch discReq.TypeUrl {
                case ClusterType:
                    ...
                case ListenerType:
                    ...
                case RouteType:
                    ...
                case EndpointType:
                    ...
                default:
                    adsLog.Warnf("ADS: Unknown watched resources %s", discReq.String())
                }
    
                con.mu.Lock()
                if !con.added {
                    con.added = true
                    con.mu.Unlock()
                    // 添加到xdsclient中
                    s.addCon(con.ConID, con)
                    defer s.removeCon(con.ConID, con)
                } else {
                    con.mu.Unlock()
                }
            case pushEv := <-con.pushChannel:
                err := s.pushConnection(con, pushEv)
                pushEv.done()
                if err != nil {
                    return nil
                }
            }
        }
    }
    

    可以看到select中有两个case.
    1. discReq, ok := <-reqChannel:是从client来的, receiveThread方法会看到.
    2. pushEv := <-con.pushChannel: 是从k8sgalley中来的, 具体可以参考上文 [istio源码分析][pilot] pilot之DiscoveryServer .

    3.1 第一个分支

    这里先看一下第一个分支的操作.

    receiveThread
    
    func receiveThread(con *XdsConnection, reqChannel chan *xdsapi.DiscoveryRequest, errP *error) {
        defer close(reqChannel) // indicates close of the remote side.
        for {
            // 从client端接收信息
            req, err := con.stream.Recv()
            ...
            select {
            // 将req转到reqChannel中
            case reqChannel <- req:
            case <-con.stream.Context().Done():
                adsLog.Errorf("ADS: %q %s terminated with stream closed", con.PeerAddr, con.ConID)
                return
            }
        }
    }
    

    1.client端接收的req直接放入到reqChannel中, 所以会进入到StreamAggregatedResources中的第一个分支.
    2.ClusterType为例:

                 case ClusterType:
                    if con.CDSWatch {
                        // Already received a cluster watch request, this is an ACK
                        if discReq.ErrorDetail != nil {
                            adsLog.Warnf("ADS:CDS: ACK ERROR %v %s (%s) %v", peerAddr, con.ConID, con.modelNode.ID, discReq.String())
                            errCode := codes.Code(discReq.ErrorDetail.Code)
                            incrementXDSRejects(cdsReject, node.Id, errCode.String())
                        } else if discReq.ResponseNonce != "" {
                            con.ClusterNonceAcked = discReq.ResponseNonce
                        }
                        adsLog.Debugf("ADS:CDS: ACK %s %s (%s) %s %s", peerAddr, con.ConID, con.modelNode.ID, discReq.VersionInfo, discReq.ResponseNonce)
                        continue
                    }
                    // CDS REQ is the first request an envoy makes. This shows up
                    // immediately after connect. It is followed by EDS REQ as
                    // soon as the CDS push is returned.
                    adsLog.Infof("ADS:CDS: REQ %v %s %v version:%s", peerAddr, con.ConID, time.Since(t0), discReq.VersionInfo)
                    con.CDSWatch = true
                    err := s.pushCds(con, s.globalPushContext(), versionInfo())
                    if err != nil {
                        return err
                    }
    

    1. 可以看到除了是第一次会调用pushCds方法, 后面都是打印ACK/NACK信息. 通过一个变量con.CDSWatch进行控制.
    2. 另外通过s.addCon(con.ConID, con)将此连接加入到adsClients, 这个在startPush方法需要分发给所有的client端的时候用到的. 具体可以参考上文 [istio源码分析][pilot] pilot之DiscoveryServer .

    pushCds
    // pilot/pkg/proxy/envoy/v2/cds.go
    func (s *DiscoveryServer) pushCds(con *XdsConnection, push *model.PushContext, version string) error {
        pushStart := time.Now()
        // 发送给该envoy (con.modelNode)
        // 内容在push中
        // 根据client信息和内容生成要发送的clusters集合
        rawClusters := s.generateRawClusters(con.modelNode, push)
        ...
        // 构造response并发送给该client(envoy)
        response := con.clusters(rawClusters)
        err := con.send(response)
        ...
        return nil
    }
    func (s *DiscoveryServer) generateRawClusters(node *model.Proxy, push *model.PushContext) []*xdsapi.Cluster {
        rawClusters := s.ConfigGenerator.BuildClusters(s.Env, node, push)
        ...
        return rawClusters
    }
    

    1. client端(envoy)信息是con.modelNode, 内容为push, 根据此两个信息传入到generateRawClusters中生成cluster集合. 实际上是通过s.ConfigGenerator.BuildClusters方法,

    type ConfigGenerator interface {
        BuildListeners(env *model.Environment, node *model.Proxy, push *model.PushContext) []*v2.Listener
        BuildClusters(env *model.Environment, node *model.Proxy, push *model.PushContext) []*v2.Cluster
        BuildHTTPRoutes(env *model.Environment, node *model.Proxy, push *model.PushContext, routeNames []string) []*v2.RouteConfiguration
    }
    

    这个部分的内容就是根据当前的内容生成envoy所接受的cluster, endpointroute.

    2.client端(envoy)发送数据.

    3.2 第二个分支

    这里分析pushEv := <-con.pushChannel:, 这个分支是从configControllerServiceController 过来的, 具体可以参考上文 [istio源码分析][pilot] pilot之DiscoveryServer .

    func (s *DiscoveryServer) pushConnection(con *XdsConnection, pushEv *XdsEvent) error {
        ...
        if con.CDSWatch {
            err := s.pushCds(con, pushEv.push, currentVersion)
            ...
        }
        if len(con.Clusters) > 0 {
            err := s.pushEds(pushEv.push, con, currentVersion, nil)
            ...
        }
        if con.LDSWatch {
            err := s.pushLds(con, pushEv.push, currentVersion)
            ...
        }
        if len(con.Routes) > 0 {
            err := s.pushRoute(con, pushEv.push, currentVersion)
            ...
        }
        ...
        return nil
    }
    

    很多细节部分省略了很多, 可以看到最终是往此client端发送cluster, endpoint, routelistener.

    4. 总结

    ads.png

    5. 参考

    1. istio 1.3.6源码

    相关文章

      网友评论

          本文标题:[istio源码分析][pilot] pilot之ads

          本文链接:https://www.haomeiwen.com/subject/vvsqxhtx.html