美文网首页
[istio源码分析][citadel] citadel之nod

[istio源码分析][citadel] citadel之nod

作者: nicktming | 来源:发表于2020-02-06 23:56 被阅读0次

    1. 前言

    转载请说明原文出处, 尊重他人劳动成果!

    源码位置: https://github.com/nicktming/istio
    分支: tming-v1.3.6 (基于1.3.6版本)

    前面两篇文章 [istio源码分析][citadel] citadel之istio_ca[istio源码分析][citadel] citadel之istio_ca(grpc server) 分析了istio_caserviceaccount controller 和 自定义签名, 以及istio_ca提供的一个grpc server 服务.

    本文将分析node_agent_k8s组件的一个使用场景ingressgateway sds.

    2. 例子

    例子参考官网的 Secure Gateways(SDS) .
    创建完mygatewayhttpbin-credential 之后. 运行一下脚本查看配置文件:

    podname=istio-ingressgateway-85bb5b4c57-l2pcz
    ns=istio-system
    rm lds.json rds.json cds.json eds.json sds.json
    istioctl proxy-config listener $podname -n $ns -o json > lds.json
    istioctl proxy-config route $podname -n $ns -o json > rds.json
    istioctl proxy-config cluster $podname -n $ns -o json > cds.json
    istioctl proxy-config endpoint $podname -n $ns -o json > eds.json
    istioctl proxy-config secret $podname -n $ns -o json > sds.json
    

    查看lds.json

    {
            "name": "0.0.0.0_443",
            "address": {
                "socketAddress": {
                    "address": "0.0.0.0",
                    "portValue": 443
                }
            },
            "filterChains": [
                {
                    "filterChainMatch": {
                        "serverNames": [
                            "httpbin.example.com"
                        ]
                    },
                    "tlsContext": {
                        "commonTlsContext": {
                            "tlsCertificateSdsSecretConfigs": [
                                {
                                    "name": "httpbin-credential",
                                    "sdsConfig": {
                                        "apiConfigSource": {
                                            "apiType": "GRPC",
                                            "grpcServices": [
                                                {
                                                    "googleGrpc": {
                                                        "targetUri": "unix:/var/run/ingress_gateway/sds",
                                                        "statPrefix": "sdsstat"
                                                    }
                                                }
                                            ]
                                        }
                                    }
                                }
                            ],
                            ...
                        },
                        "requireClientCertificate": false
                    },
          ...
        }
    

    然后查看sds.json

    {
        "dynamicActiveSecrets": [
            {
                "name": "httpbin-credential",
                "versionInfo": "2020-02-07 06:35:36.286120317 +0000 UTC m=+69968.244929345",
                "lastUpdated": "2020-02-07T06:35:36.287Z",
                "secret": {
                    "name": "httpbin-credential",
                    "tlsCertificate": {
                        "certificateChain": {
                            "inlineBytes": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUZVekNDQXp1Z0F3SUJBZ0lERUFJU01BMEdDU3FHU0liM0RRRUJDd1VBTUVveEN6QUpCZ05WQkFZVEFsVlQKTVE4d0RRWURWUVFJREFaRVpXNXBZV3d4RERBS0JnTlZCQW9NQTBScGN6RWNNQm9HQTFVRUF3d1RhSFIwY0dKcApiaTVsZUdGdGNHeGxMbU52YlRBZUZ3MHlNREF5TURjd05UUTRNRFZhRncweU1UQXlNVFl3TlRRNE1EVmFNR0F4CkN6QUpCZ05WQkFZVEFsVlRNUTh3RFFZRFZRUUlEQVpFWlc1cFlXd3hGREFTQmdOVkJBY01DMU53Y21sdVoyWnAKWld4a01Rd3dDZ1lEVlFRS0RBTkVhWE14SERBYUJnTlZCQU1NRTJoMGRIQmlhVzR1WlhoaGJYQnNaUzVqYjIwdwpnZ0VpTUEwR0NTcUdTSWIzRFFFQkFRVUFBNElCRHdBd2dnRUtBb0lCQVFDcTVySC9CYnZwZXlacFNmdUU2d09TCi83ZDFIYU1Dek1mQ3E2NmxyYmZVSUhueG5xd0kzemh0OExMVzU1OTVKNk1wZ1FGdEZoNFA4KzhkQVF1TkxQa2IKNkpTZjdIOFpCWnY1NlRKS2pvNEYrVG1aTTFHTExmMzdBZXBsSnQwandFMUxXK3BmODliWHhvYVVSMHg5K2o5ZQpObncyK3RjUEdHSFZNdndEWVVzbGYyM3Z1RnpKckpFZWpudWRTK0FSTTRTL1krY1IreXF3aDVueTJRcjFZS3E4CkVNeWNyS0NGT3JpaElCT3Y1bERRSmRya05ZMFdMRG5QZWNIYTlvd0Y1Nk5BSnJhM2dGQWJuTHpiZ2xOa2NWWjEKTC9rUjF0NGMzV3FURHJhRXRwRHpBTXlMT0RHWkN1N1JBaGdCM2g2b1dkVW9KSGZ3N0hUWXltWWY1VHVtU3F3dApBZ01CQUFHamdnRXFNSUlCSmpBSkJnTlZIUk1FQWpBQU1CRUdDV0NHU0FHRytFSUJBUVFFQXdJR1FEQXpCZ2xnCmhrZ0JodmhDQVEwRUpoWWtUM0JsYmxOVFRDQkhaVzVsY21GMFpXUWdVMlZ5ZG1WeUlFTmxjblJwWm1sallYUmwKTUIwR0ExVWREZ1FXQkJRYmxqaDJaRlZwbWMxRTkzQnpLZS9NdjFMSnNEQ0JqQVlEVlIwakJJR0VNSUdCZ0JRVQpoYzdkdVdrbkdsRUYrdEN4RXFqWU1VNVZBYUZrcEdJd1lERUxNQWtHQTFVRUJoTUNWVk14RHpBTkJnTlZCQWdNCkJrUmxibWxoYkRFVU1CSUdBMVVFQnd3TFUzQnlhVzVuWm1sbGJHUXhEREFLQmdOVkJBb01BMFJwY3pFY01Cb0cKQTFVRUF3d1RhSFIwY0dKcGJpNWxlR0Z0Y0d4bExtTnZiWUlERUFJU01BNEdBMVVkRHdFQi93UUVBd0lGb0RBVApCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBVEFOQmdrcWhraUc5dzBCQVFzRkFBT0NBZ0VBZlVQSjJGYk9vY3h1CmFFQ0tBbVNoeTB4eWJ1RXBCeXV4UGdVWkhnaG1wRmluNVJwUmJoOGMxcTlwd3duVGthcGEyb2lscTBqSmMrZmcKL3Q0TnFVYTZER2RHOHAxZnNlcnB5eXNaQlVXU1JNMktJOWJWTVpzNmNValVIekJ2MVZFekxKdmdUT0k4QjRjUwpyak5jcUc3TXcyVkNRMDl4TE00MVQrOC91R0FkT1IvSEpQOGNKdUp1L0huTjFyOFU5N1JqUnRjMUlHMXlOMlowClBMdGpCd1pGdXpBVGlwN0lnWDFqKzZkaXdiV3VFZjQ4bWNuQ3BNbExjQ2Y1S1o3Z0gzejEwWkcvcFoxRnNURHgKeWlBZndHZDUyMzNmMk5xL1Q3dXIxbWxWY1prSy8zc2QrMjgxRUU1cWpqY05GVDdWY2hDL21FYVNHK2F5UGpJdApLNzdBenlzcnNnOGlyMHhLU2VuSVF2NHRHbytHL09aNzhtSU4vWmQreEVOVnNDazVRN3NQNFloYkZKKzc4WFIvCkJNWTNFaUNEbXlZMm1sbytlL2hjN0ZIM3Uxb3VJc0s0UnJrLzJSRWtYSCtsUk1RYjNCbzJhYkFrY09RaU93ZHkKb2lvVnFOOVZUdkEwMDh6eEp6Mm11Qmp5MzJobnYyRUV4eDRLMUplKzVtZ3lwYkp4MnNOQ2xoMCtTTDE0OWVkcApPMUt3bXNsdGhLTEZhT2RrMEF4b2dxVEpnbjZpYUsrRFRFRk9IMjIxSzUwQld5ZTUvbDJsZHozL0hXd3VMZlMzCndlN0h6dWxIRGV5aXdJdCtqVkhUTkF0Z0VLOGMyTFJkTS9xR3lwTEwrN3AxdjhZYUdJeG1Hc3pYRnFxOGdhREwKZG5IWEY4U3czT2NSRUhxVlA4ai9sbTlXWFY1QVRKUT0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo="
                        },
                        "privateKey": {
                            "inlineString": "[redacted]"
                        }
                    }
                }
            }
        ]
    }
    

    3. 分析

    查看istio-system/ingressgateway的详细pod信息, 可以看到其运行参数如下:

    apiVersion: apps/v1
    kind: Deployment
    ...
      name: istio-ingressgateway
    ...
        spec:
          ...
          containers:
          - env:
            - name: ENABLE_WORKLOAD_SDS
              value: "false"
            - name: ENABLE_INGRESS_GATEWAY_SDS
              value: "true"
            - name: INGRESS_GATEWAY_NAMESPACE
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: metadata.namespace
            image: docker.io/istio/node-agent-k8s:1.4.3
            name: ingress-sds
           ...
            volumeMounts:
            - mountPath: /var/run/ingress_gateway
              name: ingressgatewaysdsudspath
         ...
          volumes:
          - emptyDir: {}
            name: ingressgatewaysdsudspath
    

    可以看一下两个参数的意义是什么

        // The workload SDS mode allows node agent to provision credentials to workload proxy by sending
        // CSR to CA.
        enableWorkloadSDS     = "ENABLE_WORKLOAD_SDS"
        enableWorkloadSDSFlag = "enableWorkloadSDS"
    
        // The ingress gateway SDS mode allows node agent to provision credentials to ingress gateway
        // proxy by watching kubernetes secrets.
        enableIngressGatewaySDS     = "ENABLE_INGRESS_GATEWAY_SDS"
        enableIngressGatewaySDSFlag = "enableIngressGatewaySDS"
    

    1. enableWorkloadSDS 是表示agent可以通过向CA发送CSR获得签名从而给workload身份(key and cert). 简明的意思是自己生成key, 然后向citadel发送签名请求获得签名证书.
    2. enableIngressGatewaySDS 是表示agent可以通过监控secret从而给ingress gateway身份(key and cert). 简明意思是key and cert需要从k8s中获得.

    本文分析enableIngressGatewaySDS的情况.

    3. sds service

    此处和 [istio源码分析][pilot] pilot之ads 中内容类似.

    有一个测试的客户端 security/pkg/testing/sdsc/sdsclient.go, 所以这里就以客户端为切入点.

    // security/pkg/testing/sdsc/sdsclient.go
    func constructSDSRequestContext() (context.Context, error) {
        // Read from the designated location for Kubernetes JWT.
        content, err := ioutil.ReadFile(authn_model.K8sSATrustworthyJwtFileName)
        ...
        // 注意是/var/run/secrets/tokens/istio-token里的内容
        md := metadata.New(map[string]string{
            authn_model.K8sSAJwtTokenHeaderKey: string(content),
        })
        return metadata.NewOutgoingContext(context.Background(), md), nil
    }
    func NewClient(opt ClientOptions) (*Client, error) {
        conn, err := grpc.Dial(opt.ServerAddress, grpc.WithInsecure())
        ...
        client := sds.NewSecretDiscoveryServiceClient(conn)
        ctx, err := constructSDSRequestContext()
        ...
        stream, err := client.StreamSecrets(ctx)
        ...
        return &Client{
            stream:        stream,
            conn:          conn,
            updateChan:    make(chan xdsapi.DiscoveryResponse, 1),
            serverAddress: opt.ServerAddress,
        }, nil
    }
    

    server端:

    func (s *sdsservice) StreamSecrets(stream sds.SecretDiscoveryService_StreamSecretsServer) error {
        ...
        reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
        con := newSDSConnection(stream)
        // 从客户端接收信息
        go receiveThread(con, reqChannel, &receiveError)
        var node *core.Node
        for {
            // Block until a request is received.
            select {
            case discReq, ok := <-reqChannel:
                ...
                if con.conID == "" {
                    // first request
                    ...
                    // 添加到sdsclient
                    addConn(key, con)
                }
                ...
                // 如果nodeagent的cache secret可以匹配请求中的内容<token, resourceName, Version> 那表明这是一个ACK请求
                if discReq.VersionInfo != "" && s.st.SecretExist(conID, resourceName, token, discReq.VersionInfo) {
                    sdsServiceLog.Debugf("%s received SDS ACK from proxy %q, version info %q, "+
                        "error details %s\n", conIDresourceNamePrefix, discReq.Node.Id, discReq.VersionInfo,
                        discReq.ErrorDetail.GoString())
                    continue
                }
                ...
                // 在ingress gateway agent模式, 如果第一个sds请求已经收到但是k8s中还没有对应的secret, 在发送response之前先等待一下这个secret
                if s.st.ShouldWaitForIngressGatewaySecret(conID, resourceName, token) {
                    ...
                    continue
                }
                // 获取secret 如果是ingress gateway agent模式, 那该secret是从k8s中获得
                // 如果不是 则自己生成并请求签名
                secret, err := s.st.GenerateSecret(ctx, conID, resourceName, token)
                ...
                con.mutex.Lock()
                con.secret = secret
                con.mutex.Unlock()
                // 向客户端发送response
                if err := pushSDS(con); err != nil {
                    ...
                    return err
                }
            case <-con.pushChannel:
                // server端主动向client端push的信号
                ...
                // 向客户端发送response
                if err := pushSDS(con); err != nil {
                    ...
                    return err
                }
            }
        }
    }
    

    这里主要有两个分支:
    1. <-reqChannel: 这里的来源是通过receiveThread得到client端的请求并放入到reqChannel中.

    1.1 如果是第一次请求, 则通过addConn方法加入到sdsclient中.
    1.2 如果nodeagentcache secret可以匹配请求中的内容<token, resourceName, Version> 那表明这是一个ACK请求.
    1.3ingress gateway agent模式, 如果sds请求已经收到但是k8s中还没有对应的secret或者被删除, 在发送responseclient端之前先等待一下这个secret被创建.
    1.4 通过s.st.GenerateSecret获得对应的secret, 如果是ingress gateway agent模式, 那该secret是从k8s中获得. 如果不是, 则自己生成并请求签名.
    1.5 调用pushSDS向客户端发送response.

    2. <-con.pushChannel: 代表的是server端主动向clientpush的信号, 然后直接通过pushSDS发送当前存在在con里的内容.

    func NotifyProxy(connKey cache.ConnKey, secret *model.SecretItem) error {
        conIDresourceNamePrefix := sdsLogPrefix(connKey.ConnectionID, connKey.ResourceName)
        sdsClientsMutex.Lock()
        conn := sdsClients[connKey]
        ...
        conn.secret = secret
        conn.pushChannel <- &sdsEvent{}
        return nil
    }
    

    可以看到NotifyProxy方法传入了secret, 所以上游可以根据secret的变化来调用NotifyProxy方法来主动push信息到client端.

    4. secretFetcher 和 secretCache

    4.1 secretFetcher

    func NewSecretFetcher(ingressGatewayAgent bool, endpoint, caProviderName string, tlsFlag bool,
        tlsRootCert []byte, vaultAddr, vaultRole, vaultAuthPath, vaultSignCsrPath string) (*SecretFetcher, error) {
        ret := &SecretFetcher{}
        if ingressGatewayAgent {
            // 如果是ingress gateway模式, 则监控k8s中的secret并从中获取信息
            ret.UseCaClient = false
            cs, err := kube.CreateClientset("", "")
            ...
            ret.FallbackSecretName = ingressFallbackSecret
            secretFetcherLog.Debugf("SecretFetcher set fallback secret name %s", ret.FallbackSecretName)
            ret.InitWithKubeClient(cs.CoreV1())
        } else {
            // 如果是workload agent模式, 则创建ca client 从citadel中获得签名证书等
            caClient, err := ca.NewCAClient(endpoint, caProviderName, tlsFlag, tlsRootCert,
                vaultAddr, vaultRole, vaultAuthPath, vaultSignCsrPath)
            ...
            ret.UseCaClient = true
            ret.CaClient = caClient
        }
        return ret, nil
    }
    func (sf *SecretFetcher) InitWithKubeClient(core corev1.CoreV1Interface) { // nolint:interfacer
        ...
        sf.scrtStore, sf.scrtController =
            cache.NewInformer(scrtLW, &v1.Secret{}, resyncPeriod, cache.ResourceEventHandlerFuncs{
                AddFunc:    sf.scrtAdded,
                DeleteFunc: sf.scrtDeleted,
                UpdateFunc: sf.scrtUpdated,
            })
        ...
    }
    

    scrtAdded, scrtDeletedscrtUpdated 在获取secretkey and cert信息后会通过sf.AddCache, sf.DeleteCachesf.UpdateCache来保存到cache中.

    var (
        ...
        rootCmd = &cobra.Command{
            Use:   "nodeagent",
            Short: "Citadel agent",
            RunE: func(c *cobra.Command, args []string) error {
                ...
                workloadSecretCache, gatewaySecretCache := newSecretCache(serverOptions)
                ...
                server, err := sds.NewServer(serverOptions, workloadSecretCache, gatewaySecretCache)
                defer server.Stop()
                ...
            },
        }
    )
    func newSecretCache(serverOptions sds.Options) (workloadSecretCache, gatewaySecretCache *cache.SecretCache) {
        ...
        if serverOptions.EnableIngressGatewaySDS {
            gSecretFetcher, err := secretfetcher.NewSecretFetcher(true, "", "", false, nil, "", "", "", "")
            ...
            gatewaySecretChan = make(chan struct{})
            gSecretFetcher.Run(gatewaySecretChan)
            gatewaySecretCache = cache.NewSecretCache(gSecretFetcher, sds.NotifyProxy, gatewaySdsCacheOptions)
        } else {
            gatewaySecretCache = nil
        }
        return workloadSecretCache, gatewaySecretCache
    }
    
    func NewSecretCache(fetcher *secretfetcher.SecretFetcher, notifyCb func(ConnKey, *model.SecretItem) error, options Options) *SecretCache {
        ...
        fetcher.AddCache = ret.UpdateK8sSecret
        fetcher.DeleteCache = ret.DeleteK8sSecret
        fetcher.UpdateCache = ret.UpdateK8sSecret
        ...
        return ret
    }
    func (sc *SecretCache) UpdateK8sSecret(secretName string, ns model.SecretItem) {
        ...
        sc.secrets.Range(func(k interface{}, v interface{}) bool {
            ...
            if connKey.ResourceName == secretName {
                ...
                go func() {
                    ...
                    sc.callbackWithTimeout(connKey, newSecret)
                }()
                return false
            }
            return true
        })
        ...
    }
    func (sc *SecretCache) callbackWithTimeout(connKey ConnKey, secret *model.SecretItem) {
        ...
        go func() {
            ...
            if sc.notifyCallback != nil {
                if err := sc.notifyCallback(connKey, secret); err != nil {
                    ...
                }
            } ...
        }()
        select {
        ...
        }
    }
    

    k8s-apiserver -> SecretFetcher.scrtAdded -> SecretCache.UpdateK8sSecret(SecretFetcher.AddCache) -> sc.callbackWithTimeout -> sc.notifyCallback(NotifyProxy).

    5. 总结

    node_k8s_agent.png

    相关文章

      网友评论

          本文标题:[istio源码分析][citadel] citadel之nod

          本文链接:https://www.haomeiwen.com/subject/eiiuxhtx.html