美文网首页
Kuberneters源码分析 - Ingress nginx

Kuberneters源码分析 - Ingress nginx

作者: 何约什 | 来源:发表于2018-08-07 08:23 被阅读266次

    Ingress Nginx主流程一文中,已经介绍了Ingress Nginx的驱动流程,系统的主要驱动逻辑是监控API Server中相关资源的变化,并通过事件回调去驱动系统的运行。本篇主要介绍关键的部分,如何准实时的更新Nginx的配置,与Ingress资源保持同步。

    数据驱动回顾

    基于SharedIndexInformer,我们可以实时感知到Ingress、CongMap、Secret、Endpoint、Service等资源的变化,并通过事件回调处理,通知到同步任务队列去定期更新Nginx的配置文件。

    资源变化,同步事件的生成代码为例子为:

                updateCh.In() <- Event{
                    Type: CreateEvent,
                    Obj:  obj,
                }
    

    目前支持的事件类型有:

    const (
        // CreateEvent event associated with new objects in an informer
        CreateEvent EventType = "CREATE"
        // UpdateEvent event associated with an object update in an informer
        UpdateEvent EventType = "UPDATE"
        // DeleteEvent event associated when an object is removed from an informer
        DeleteEvent EventType = "DELETE"
        // ConfigurationEvent event associated when a controller configuration object is created or updated
        ConfigurationEvent EventType = "CONFIGURATION"
    )
    

    Event的Obj成员一般就是发生变化的资源数据。

    同步任务队列

    同步任务队列负责接收同步事件,并保持Nginx的配置与资源的同步。

    n.syncQueue = task.NewTaskQueue(n.syncIngress)
    func NewTaskQueue(syncFn func(interface{}) error) *Queue {
        return NewCustomTaskQueue(syncFn, nil)
    }
    
    // NewCustomTaskQueue ...
    func NewCustomTaskQueue(syncFn func(interface{}) error, fn func(interface{}) (interface{}, error)) *Queue {
        q := &Queue{
            queue:      workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
            sync:       syncFn,
            workerDone: make(chan bool),
            fn:         fn,
        }
    
        if fn == nil {
            q.fn = q.defaultKeyFunc
        }
    
        return q
    }
    

    同步任务队列负责把为每个入队的元素调用syncFn,而这里的syncFn实际调用NGINXController.syncIngress方法。

    这里有一个问题要考虑的就是如果资源变化非常频繁,那会不会造成我们要频繁的去更新Nginx的配置文件,频发的发起Nginx的Reload操作,这个操作对于业务量大的服务来说,还是影响比较大的。

    上面这个问题,其实就跟这个任务队列的实现有关了,任务队列实现了流控,并且还会针对变更资源的时间进行一些处理,忽略掉一些变更事件比同步事件旧的变更事件。这里暂时不做分析。

    同步处理

    同步处理是syncIngress方法,虽然他带了一个参数,但是我们可以看到这个参数基本是忽略的,syncIngress对每类资源的变更都是执行相同的操作。

    // syncIngress collects all the pieces required to assemble the NGINX
    // configuration file and passes the resulting data structures to the backend
    // (OnUpdate) when a reload is deemed necessary.
    func (n *NGINXController) syncIngress(interface{}) error {
        n.syncRateLimiter.Accept() // 流控处理,防止更新太频繁
    
        if n.syncQueue.IsShuttingDown() {
            return nil
        }
    
        // sort Ingresses using the ResourceVersion field
        ings := n.store.ListIngresses() // 从缓存中获取所有的Ingress服务
        sort.SliceStable(ings, func(i, j int) bool {
            ir := ings[i].ResourceVersion
            jr := ings[j].ResourceVersion
            return ir < jr
        })
    
        upstreams, servers := n.getBackendServers(ings) // 从ings中提取所有的upstreams和servers
    // 这样做的主要原因在于:相同的upstream可能被多个servers共享(条件:namespace、service、port相同即可)
        var passUpstreams []*ingress.SSLPassthroughBackend
    
        for _, server := range servers {
            if !server.SSLPassthrough {
                continue
            }
    
            for _, loc := range server.Locations {
                if loc.Path != rootLocation {
                    glog.Warningf("Ignoring SSL Passthrough for location %q in server %q", loc.Path, server.Hostname)
                    continue
                }
                passUpstreams = append(passUpstreams, &ingress.SSLPassthroughBackend{
                    Backend:  loc.Backend,
                    Hostname: server.Hostname,
                    Service:  loc.Service,
                    Port:     loc.Port,
                })
                break
            }
        }
    
        pcfg := &ingress.Configuration{ // 生成新的ingress配置
            Backends:              upstreams,
            Servers:               servers,
            TCPEndpoints:          n.getStreamServices(n.cfg.TCPConfigMapName, apiv1.ProtocolTCP),
            UDPEndpoints:          n.getStreamServices(n.cfg.UDPConfigMapName, apiv1.ProtocolUDP),
            PassthroughBackends:   passUpstreams,
            BackendConfigChecksum: n.store.GetBackendConfiguration().Checksum,
        }
    
        if n.runningConfig.Equal(pcfg) { // 判断新旧配置是否相同,相同则不用继续处理
            glog.V(3).Infof("No configuration change detected, skipping backend reload.")
            return nil
        }
    
            // 是否启动了动态配置支持并且目前的变更能够支持动态配置
        if n.cfg.DynamicConfigurationEnabled && n.IsDynamicConfigurationEnough(pcfg) {
            glog.Infof("Changes handled by the dynamic configuration, skipping backend reload.") // 符合动态条件,就不用变更配置文件
        } else { // 不符合,就需要变更配置文件
            glog.Infof("Configuration changes detected, backend reload required.")
    
            hash, _ := hashstructure.Hash(pcfg, &hashstructure.HashOptions{
                TagName: "json",
            }) // 生成配置文件的hash值,看起来主要是做metric监控收集使用
    
            pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash) 
    
            err := n.OnUpdate(*pcfg) // 变更Nginx配置文件,后面详细分析
            if err != nil {
                n.metricCollector.IncReloadErrorCount()
                n.metricCollector.ConfigSuccess(hash, false)
                glog.Errorf("Unexpected failure reloading the backend:\n%v", err)
                return err
            }
    
            glog.Infof("Backend successfully reloaded.")
            n.metricCollector.ConfigSuccess(hash, true)
            n.metricCollector.IncReloadCount()
            n.metricCollector.SetSSLExpireTime(servers)
        }
    
        if n.cfg.DynamicConfigurationEnabled { // 这里走到动态配置变更功能
            isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})
            go func(isFirstSync bool) {
                if isFirstSync {
                    glog.Infof("Initial synchronization of the NGINX configuration.")
    
                    // it takes time for NGINX to start listening on the configured ports
                    time.Sleep(1 * time.Second)
                }
                err := configureDynamically(pcfg, n.cfg.ListenPorts.Status) // 动态配置处理,这块也会在后面分析
                if err == nil {
                    glog.Infof("Dynamic reconfiguration succeeded.")
                } else {
                    glog.Warningf("Dynamic reconfiguration failed: %v", err)
                }
            }(isFirstSync)
        }
    
        ri := getRemovedIngresses(n.runningConfig, pcfg)
        re := getRemovedHosts(n.runningConfig, pcfg)
        n.metricCollector.RemoveMetrics(ri, re)
    
        n.runningConfig = pcfg // 修改更新后的配置
    
        return nil
    }
    

    syncIngress方法的逻辑比较清晰,主要的功能都写在注释中了。下面详细讲述OnUpdate方法,该方法实现了Nginx配置文件的更新和reload。

    OnUpdate

    OnUpdate是在同步操作时发现需要变更Nginx配置文件时,就会被调用。后端配置与ConfigMap的配置会合并之后再创建最终的配置文件。

    • PassthroughBackends
      TODO:
    func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
        cfg := n.store.GetBackendConfiguration()
        cfg.Resolver = n.resolver
    
        if n.cfg.EnableSSLPassthrough {
            servers := []*TCPServer{}
            for _, pb := range ingressCfg.PassthroughBackends {
                svc := pb.Service
                if svc == nil {
                    glog.Warningf("Missing Service for SSL Passthrough backend %q", pb.Backend)
                    continue
                }
                port, err := strconv.Atoi(pb.Port.String())
                if err != nil {
                    for _, sp := range svc.Spec.Ports {
                        if sp.Name == pb.Port.String() {
                            port = int(sp.Port)
                            break
                        }
                    }
                } else {
                    for _, sp := range svc.Spec.Ports {
                        if sp.Port == int32(port) {
                            port = int(sp.Port)
                            break
                        }
                    }
                }
    
                // TODO: Allow PassthroughBackends to specify they support proxy-protocol
                servers = append(servers, &TCPServer{
                    Hostname:      pb.Hostname,
                    IP:            svc.Spec.ClusterIP,
                    Port:          port,
                    ProxyProtocol: false,
                })
            }
    
            n.Proxy.ServerList = servers
        }
        .......
    }
    
    • ServerNameHash
        if cfg.ServerNameHashBucketSize == 0 {
            nameHashBucketSize := nginxHashBucketSize(longestName)
            glog.V(3).Infof("Adjusting ServerNameHashBucketSize variable to %q", nameHashBucketSize)
            cfg.ServerNameHashBucketSize = nameHashBucketSize
        }
        serverNameHashMaxSize := nextPowerOf2(serverNameBytes)
        if cfg.ServerNameHashMaxSize < serverNameHashMaxSize {
            glog.V(3).Infof("Adjusting ServerNameHashMaxSize variable to %q", serverNameHashMaxSize)
            cfg.ServerNameHashMaxSize = serverNameHashMaxSize
        }
    
    • Headers
      Headers包括setHeaders和addHeaders两种,他们的都是放在configMap中。
      这里再提一下configMap是通过参数--configmap=ingress-nginx/nginx-configuration带入的,举例说明:
    #  cat configmap.yaml 
    apiVersion: v1
    data:
      proxy-set-headers: "ingress-nginx/custom-headers"
      add-headers: "ingress-nginx/custom-headers"
    kind: ConfigMap
    metadata:
      name: nginx-configuration
      namespace: ingress-nginx
      labels:
        app: ingress-nginx
    

    我们在上面的configmap中,指定了proxy-set-headers和add-headers的值都是ingrss-nginx/custom-headers,这里的ingress-nginx/custom-headers也是一个configmap,创建customer-headers的脚本如下:

    curl https://raw.githubusercontent.com/kubernetes/ingress-nginx/master/docs/examples/customization/custom-headers/custom-headers.yaml \
        | kubectl apply -f -
    
    custom-headres.yaml的实际内容为:
    
    apiVersion: v1
    data:
      X-Different-Name: "true"
      X-Request-Start: t=${msec}
      X-Using-Nginx-Controller: "true"
    kind: ConfigMap
    metadata:
      name: custom-headers
      namespace: ingress-nginx
    

    下面是处理setHeaders和addHeaders的代码:

            setHeaders := map[string]string{}
        if cfg.ProxySetHeaders != "" {
            cmap, err := n.store.GetConfigMap(cfg.ProxySetHeaders)
            if err != nil {
                glog.Warningf("Error reading ConfigMap %q from local store: %v", cfg.ProxySetHeaders, err)
            }
    
            setHeaders = cmap.Data
        }
    
        addHeaders := map[string]string{}
        if cfg.AddHeaders != "" {
            cmap, err := n.store.GetConfigMap(cfg.AddHeaders)
            if err != nil {
                glog.Warningf("Error reading ConfigMap %q from local store: %v", cfg.AddHeaders, err)
            }
    
            addHeaders = cmap.Data
        }
    

    通过上面的代码,基于我们的例子配置来说,cfg.ProxySetHeaders值为“ingress-nginx/custom-headers”,通过n.store.GetConfigMap调用,最终得到setHeaders值为

    map[string]string{
      "X-Different-Name": "true",
      "X-Request-Start": "t=${msec}",
      "X-Using-Nginx-Controller": "true"
    }
    
    • 生成配置文件内容

    代码如下所示,这块主要使用了text/template模块的功能来生成配置文件。主要功能在于模板的内容,后面再具体分析这块内容。

            // 构建模板配置参数
        tc := ngx_config.TemplateConfig{
            ProxySetHeaders:             setHeaders,
            AddHeaders:                  addHeaders,
            MaxOpenFiles:                maxOpenFiles,
            BacklogSize:                 sysctlSomaxconn(),
            Backends:                    ingressCfg.Backends,
            PassthroughBackends:         ingressCfg.PassthroughBackends,
            Servers:                     ingressCfg.Servers,
            TCPBackends:                 ingressCfg.TCPEndpoints,
            UDPBackends:                 ingressCfg.UDPEndpoints,
            HealthzURI:                  ngxHealthPath,
            CustomErrors:                len(cfg.CustomHTTPErrors) > 0,
            Cfg:                         cfg,
            IsIPV6Enabled:               n.isIPV6Enabled && !cfg.DisableIpv6,
            NginxStatusIpv4Whitelist:    cfg.NginxStatusIpv4Whitelist,
            NginxStatusIpv6Whitelist:    cfg.NginxStatusIpv6Whitelist,
            RedirectServers:             redirectServers,
            IsSSLPassthroughEnabled:     n.cfg.EnableSSLPassthrough,
            ListenPorts:                 n.cfg.ListenPorts,
            PublishService:              n.GetPublishService(),
            DynamicConfigurationEnabled: n.cfg.DynamicConfigurationEnabled,
            DisableLua:                  n.cfg.DisableLua,
        }
    
        tc.Cfg.Checksum = ingressCfg.ConfigurationChecksum
            // 调用模板,生成配置文本内容
        content, err := n.t.Write(tc)
        if err != nil {
            return err
        }
    
    • 生成opentracing配置文件
      如果启动了opentracing,就需要生成opentracing配置文件,opentracing的配置文件为/etc/nginx/opentracing.json,它支持两种zipkin、jaeger两种opentracing系统。它们的模板文件如下所示:
    const zipkinTmpl = `{
      "service_name": "{{ .ZipkinServiceName }}",
      "collector_host": "{{ .ZipkinCollectorHost }}",
      "collector_port": {{ .ZipkinCollectorPort }},
      "sample_rate": {{ .ZipkinSampleRate }}
    }`
    
    const jaegerTmpl = `{
      "service_name": "{{ .JaegerServiceName }}",
      "sampler": {
        "type": "{{ .JaegerSamplerType }}",
        "param": {{ .JaegerSamplerParam }}
      },
      "reporter": {
        "localAgentHostPort": "{{ .JaegerCollectorHost }}:{{ .JaegerCollectorPort }}"
      }
    }`
    
    • 测试配置文件的正确性
      由NGINXController.testTemplate方法实现,主要功能为:
      1)生成测试配置文件
      2)调用nginx -c ${Config} -t来测试配置文件的正确性
    • 生成配置文件并重载配置文件
      生成/etc/nginx/nginx.conf文件,调用nginx -s reload触发配置加载。

    Ingress的更新分析完了,到这里我们还遗留了两个问题:
    1)动态配置功能
    2)配置文件模板

    动态配置功能

    动态配置把一个Backend封装成JSON格式,并把内容POST到内部的被Lua处理的HTTP服务。

    Backend描述了关联到service的一个或者多个远程服务(endpoints)。
    譬如,下面是一个ingress的yaml文件,对应的backend的概念就是对应的rules->host:foo.bar.com->paths->path->backend

    apiVersion: extensions/v1beta1
    kind: Ingress
    metadata:
      name: test-ingress
      annotations:
        ingress.kubernetes.io/rewrite-target: /
      namespace: ingress-nginx
    spec:
      rules:
      -  host: foo.bar.com
         http:
          paths:
          - path: /
            backend:
              serviceName: my-service
              servicePort: 1005
    

    动态配置代码如下:

    func configureDynamically(pcfg *ingress.Configuration, port int) error {
            // 生成新的backends,TODO:为什么这里要生成新的,不直接用pcfg.Backends???
            // 看起来是去掉了service字段信息
        backends := make([]*ingress.Backend, len(pcfg.Backends))
    
        for i, backend := range pcfg.Backends {
            luaBackend := &ingress.Backend{
                Name:            backend.Name,   // <namespace>-<name>-<port>
                Port:            backend.Port,
                Secure:          backend.Secure,
                SSLPassthrough:  backend.SSLPassthrough,
                SessionAffinity: backend.SessionAffinity,
                UpstreamHashBy:  backend.UpstreamHashBy,
                LoadBalancing:   backend.LoadBalancing,
            }
    
            var endpoints []ingress.Endpoint
            for _, endpoint := range backend.Endpoints {
                endpoints = append(endpoints, ingress.Endpoint{
                    Address:     endpoint.Address,
                    FailTimeout: endpoint.FailTimeout,
                    MaxFails:    endpoint.MaxFails,
                    Port:        endpoint.Port,
                })
            }
    
            luaBackend.Endpoints = endpoints
            backends[i] = luaBackend
        }
    
            // 把所有的backends打包成json
        buf, err := json.Marshal(backends)
        if err != nil {
            return err
        }
    
        glog.V(2).Infof("Posting backends configuration: %s", buf)
            // post backends数据到nginx的http服务,让lua去处理
        url := fmt.Sprintf("http://localhost:%d/configuration/backends", port)
        resp, err := http.Post(url, "application/json", bytes.NewReader(buf))
        if err != nil {
            return err
        }
            ......
        return nil
    }
    

    对于configuration/backends的endpoint的处理逻辑,我们可以看一下nginx.tmpl文件中的片段:

            {{ if $all.DynamicConfigurationEnabled }}
            location /configuration {
                {{ if $cfg.EnableOpentracing }}
                opentracing off;
                {{ end }}
    
                allow 127.0.0.1;
                {{ if $IsIPV6Enabled }}
                allow ::1;
                {{ end }}
                deny all;
    
                # this should be equals to configuration_data dict
                client_max_body_size                    "10m";
                proxy_buffering                         off;
    
                content_by_lua_block {
                  configuration.call()
                }
            }
            {{ end }}
    

    最终调用了lua代码configuration.call()。

    配置文件模板

    nginx的配置文件模板是基于text/template模块实现的,这里就不详细介绍text/template了。

    前面我们知道,模板中使用的变量是TemplateConfig结构,TemplateConfig的结构定义如下:

    type TemplateConfig struct {
        ProxySetHeaders             map[string]string // SetHeaders中的内容
        AddHeaders                  map[string]string    // AddHeaders中的内容
        MaxOpenFiles                int              // 最大打开文件数
        BacklogSize                 int
        Backends                    []*ingress.Backend  // 所以的backends
        PassthroughBackends         []*ingress.SSLPassthroughBackend
        Servers                     []*ingress.Server  // servers信息
        TCPBackends                 []ingress.L4Service  // TCP反向代理
        UDPBackends                 []ingress.L4Service  // UDP反向代理
        HealthzURI                  string
        CustomErrors                bool
        Cfg                         Configuration
        IsIPV6Enabled               bool
        IsSSLPassthroughEnabled     bool
        NginxStatusIpv4Whitelist    []string
        NginxStatusIpv6Whitelist    []string
        RedirectServers             map[string]string
        ListenPorts                 *ListenPorts
        PublishService              *apiv1.Service
        DynamicConfigurationEnabled bool
        DisableLua                  bool
    }
    

    nginx的配置文件模板为/etc/nginx/template/nginx.tmpl,在前面动态配置分析中,我们列举了一部分内容。

    相关文章

      网友评论

          本文标题:Kuberneters源码分析 - Ingress nginx

          本文链接:https://www.haomeiwen.com/subject/hfzvvftx.html