美文网首页
k8s kube-proxy源码分析

k8s kube-proxy源码分析

作者: 分享放大价值 | 来源:发表于2022-05-04 20:44 被阅读0次

    service

    service简介

    service是为了给一组pod提供负载均衡功能的服务。

    Service的类型

    a. clusterIP:此类型服务只能在集群内部访问,比如在pod内部或者在worker node上,不能被集群外访问。如果不指定类型,默认就是此类型;
    b. NodePort: 此类型服务会在每个节点上开放一个相同的端口号,允许在集群外部通过nodeip:nodeport的方式访问服务。
    c. LoadBalancer: 此类型服务也会在每个节点开放一个相同的端口号,每个nodeip:nodeport作为外部负载均衡器的后端rs。此类型需要公有云支持。
    d. ExternalIP:此类型服务通过指定一个外部可访问的node上的ip来实现被集群外访问。

    service配置清单

    apiVersion: v1
    kind: Service
    metadata:
      ...
    spec:
      type <string>  //Service类型,默认为ClusterIP
      selector <map[string]string> //选择器,用来选择pod
      ports: //Service的端口列表
      - name <string>//名称
        protocol <string> //协议
        port <integer> //Service的端口号
        targetPort <string> //目标端口号,后端pod实际监听端口号
        nodePort <integer> //节点端口号,仅适用于NodePort和LoadBalancer类型
      clusterIP <string> //Service的集群IP
      externalTrafficPolicy <string> //外部流量策略处理方式,Local表示由当前节点处理,#Cluster表示向集群范围调度
      loadBalancerIP <string> //外部负载均衡器使用的IP地址,仅适用于LoadBlancer
      externalIP <string>  //外部ip
    

    kube-proxy

    转发模式

    kube-proxy支持三种转发模式: userspace,iptables和ipvs。

    工作原理

    kube-proxy通过informer机制监听service和endpoint类型资源,将其保存在本地缓存,并转换成对应的iptables/ipvs规则。
    这里要注意的是,service资源是用户根据service配置清单指定的,而endpoint资源是endpoint controller监听service资源自动生成的。

    源码分析

    这里只以iptables转发模式为例分析大概流程,不会太注意细枝末节。
    a. kube-proxy进程启动流程

    //cmd/kube-proxy/proxy.go
    main
        command := app.NewProxyCommand()
        command.Execute()
        
    //cmd/kube-proxy/app/server.go
    func NewProxyCommand() *cobra.Command
        //解析配置文件
        opts.Complete();
        //参数验证
        opts.Validate();
        opts.Run();
    
    //opts.Run()
    // Run runs the specified ProxyServer.
    func (o *Options) Run() error
        o.writeConfigFile();
        //创建proxyserver
        proxyServer, err := NewProxyServer(o)
        
        o.proxyServer = proxyServer
        
        return o.runLoop()
            // run the proxy in goroutine
            //初始化完成,启动协程执行proxyserver的run流程
            go func() {
                err := o.proxyServer.Run()
                o.errCh <- err
            }()
            //如果有错误发生,则结束进程
            for {
                err := <-o.errCh
                if err != nil {
                    return err
                }
            }
    

    创建ProxyServer,其中会根据proxymode初始化不同的proxier

    cmd/kube-proxy/app/server_others.go
    // NewProxyServer returns a new ProxyServer.
    func NewProxyServer(o *Options) (*ProxyServer, error) {
        return newProxyServer(o.config, o.CleanupAndExit, o.master)
    }
    
    func newProxyServer(
        kernelHandler = ipvs.NewLinuxKernelHandler()
        ipsetInterface = utilipset.New(execer)
        canUseIPVS, err := ipvs.CanUseIPVSProxier(kernelHandler, ipsetInterface, config.IPVS.Scheduler)
        if string(config.Mode) == proxyModeIPVS && err != nil {
            klog.ErrorS(err, "Can't use the IPVS proxier")
        }
        
        if canUseIPVS {
            ipvsInterface = utilipvs.New(execer)
        }
        
        hostname, err := utilnode.GetHostname(config.HostnameOverride)
        
        client, eventClient, err := createClients(config.ClientConnection, master)
        
        nodeIP := detectNodeIP(client, hostname, config.BindAddress)
        klog.InfoS("Detected node IP", "address", nodeIP.String())
    
        proxyMode := getProxyMode(string(config.Mode), canUseIPVS, iptables.LinuxKernelCompatTester{})
        
        var ipt [2]utiliptables.Interface
        dualStack := utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) && proxyMode != proxyModeUserspace
        
        if proxyMode == proxyModeIPTables {
            if dualStack {
                proxier, err = iptables.NewDualStackProxier(...)
            } else {
                proxier, err = iptables.NewProxier(...)
            }
        } else if proxyMode == proxyModeIPVS {
            if dualStack {
                proxier, err = ipvs.NewDualStackProxier()
            } else {
                proxier, err = ipvs.NewProxier()
            }
        } else {
            proxier, err = userspace.NewProxier()
        }
    
        return &ProxyServer{
            Client:                 client,
            EventClient:            eventClient,
            IptInterface:           iptInterface,
            IpvsInterface:          ipvsInterface,
            IpsetInterface:         ipsetInterface,
            execer:                 execer,
            Proxier:                proxier,
            Broadcaster:            eventBroadcaster,
            Recorder:               recorder,
            ConntrackConfiguration: config.Conntrack,
            Conntracker:            &realConntracker{},
            ProxyMode:              proxyMode,
            NodeRef:                nodeRef,
            MetricsBindAddress:     config.MetricsBindAddress,
            BindAddressHardFail:    config.BindAddressHardFail,
            EnableProfiling:        config.EnableProfiling,
            OOMScoreAdj:            config.OOMScoreAdj,
            ConfigSyncPeriod:       config.ConfigSyncPeriod.Duration,
            HealthzServer:          healthzServer,
            UseEndpointSlices:      useEndpointSlices,
        }, nil
    
    //pkg/proxy/iptables/proxier.go
    func NewProxier(...)
        proxier := &Proxier{
        }
    

    Proxier提供了如下接口,iptables/ipvs/userspace是其三种实现

    //pkg/proxy/type.go
    // Provider is the interface provided by proxier implementations.
    type Provider interface {
        config.EndpointsHandler
        config.EndpointSliceHandler
        config.ServiceHandler
        config.NodeHandler
    
        // Sync immediately synchronizes the Provider's current state to proxy rules.
        Sync()
        // SyncLoop runs periodic work.
        // This is expected to run as a goroutine or as the main loop of the app.
        // It does not return.
        SyncLoop()
    }
    

    下面是iptables对Proxier的实现,endpointsChanges和serviceChanges用来保存监听到的endpoints和service的资源变化,serviceMap和endpointsMap用来保存监听到的endpoints和service的资源。

    //pkg/proxy/iptables/proxier.go
    // Proxier is an iptables based proxy for connections between a localhost:lport
    // and services that provide the actual backends.
    type Proxier struct {
        // endpointsChanges and serviceChanges contains all changes to endpoints and
        // services that happened since iptables was synced. For a single object,
        // changes are accumulated, i.e. previous is state from before all of them,
        // current is state after applying all of those.
        endpointsChanges *proxy.EndpointChangeTracker
        serviceChanges   *proxy.ServiceChangeTracker
    
        mu           sync.Mutex // protects the following fields
        serviceMap   proxy.ServiceMap
        endpointsMap proxy.EndpointsMap
        ...
        // These are effectively const and do not need the mutex to be held.
        //用来和iptables交互的接口
        iptables       utiliptables.Interface
        ...
        // The following buffers are used to reuse memory and avoid allocations
        // that are significantly impacting performance.
        //保存iptables的规则
        iptablesData             *bytes.Buffer
        existingFilterChainsData *bytes.Buffer
        filterChains             *bytes.Buffer
        filterRules              *bytes.Buffer
        natChains                *bytes.Buffer
        natRules                 *bytes.Buffer
        ...
    }
    

    b. ProxyServer run流程

    //cmd/kube-proxy/app/server.go
    func (s *ProxyServer) Run() error {
        if s.Conntracker != nil {
        }
    
        noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
        noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
        labelSelector := labels.NewSelector()
        labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)
    
        //创建informer,用来监听资源变化
        // Make informers that filter out objects that want a non-default service proxy.
        informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
            informers.WithTweakListOptions(func(options *metav1.ListOptions) {
                options.LabelSelector = labelSelector.String()
            }))
            
        serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
            result := &ServiceConfig{
                listerSynced: serviceInformer.Informer().HasSynced,
            }
    
            serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
                cache.ResourceEventHandlerFuncs{
                    AddFunc:    result.handleAddService,
                    UpdateFunc: result.handleUpdateService,
                    DeleteFunc: result.handleDeleteService,
                },
                resyncPeriod,
            )
    
            return result
    
        //注册service事件处理函数,收到service的add/delete/update事件后,将信息更新到 proxier.serviceChanges
        serviceConfig.RegisterEventHandler(s.Proxier)
            c.eventHandlers = append(c.eventHandlers, handler)
            
        go serviceConfig.Run(wait.NeverStop)
            klog.Info("Starting service config controller")
    
            if !cache.WaitForNamedCacheSync("service config", stopCh, c.listerSynced) {
                return
            }
    
            for i := range c.eventHandlers {
                klog.V(3).Info("Calling handler.OnServiceSynced()")
                c.eventHandlers[i].OnServiceSynced()
            }
    
        //s.UseEndpointSlices 默认为 true
        if s.UseEndpointSlices {
            endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1().EndpointSlices(), s.ConfigSyncPeriod)
                result := &EndpointSliceConfig{
                    listerSynced: endpointSliceInformer.Informer().HasSynced,
                }
    
                endpointSliceInformer.Informer().AddEventHandlerWithResyncPeriod(
                    cache.ResourceEventHandlerFuncs{
                        AddFunc:    result.handleAddEndpointSlice,
                        UpdateFunc: result.handleUpdateEndpointSlice,
                        DeleteFunc: result.handleDeleteEndpointSlice,
                    },
                    resyncPeriod,
                )
    
                return result
    
            //注册endpointslice事件处理函数,收到endpointslice的add/delete/update事件后,将信息更新到 proxier.endpointsChanges
            endpointSliceConfig.RegisterEventHandler(s.Proxier)
                c.eventHandlers = append(c.eventHandlers, handler)
    
            go endpointSliceConfig.Run(wait.NeverStop)
                klog.Info("Starting endpoint slice config controller")
    
                if !cache.WaitForNamedCacheSync("endpoint slice config", stopCh, c.listerSynced) {
                    return
                }
    
                for _, h := range c.eventHandlers {
                    klog.V(3).Infof("Calling handler.OnEndpointSlicesSynced()")
                    h.OnEndpointSlicesSynced()
                }
    
        } else {
            endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
            endpointsConfig.RegisterEventHandler(s.Proxier)
            go endpointsConfig.Run(wait.NeverStop)
        }
        
        // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
        // functions must configure their shared informer event handlers first.
        informerFactory.Start(wait.NeverStop)
        
        // Birth Cry after the birth is successful
        s.birthCry()
        //最后启动SyncLoop
        go s.Proxier.SyncLoop()
    
        return <-errCh
    }
    
    //pkg/proxy/iptables/proxier.go
    // SyncLoop runs periodic work.  This is expected to run as a goroutine or as the main loop of the app.  It does not return.
    func (proxier *Proxier) SyncLoop() {
        // Update healthz timestamp at beginning in case Sync() never succeeds.
        if proxier.healthzServer != nil {
            proxier.healthzServer.Updated()
        }
    
        // synthesize "last change queued" time as the informers are syncing.
        metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
        //周期执行 proxier.syncProxyRules
        proxier.syncRunner.Loop(wait.NeverStop)
    }
    

    c. syncProxyRules
    根据监听到的service和endpoint变化更新iptables规则

    // This is where all of the iptables-save/restore calls happen.
    // The only other iptables rules are those that are setup in iptablesInit()
    // This assumes proxier.mu is NOT held
    func (proxier *Proxier) syncProxyRules() {
        //ServiceMap用来保存每个service信息,key为service名字,value为具体的信息
        // ServiceMap maps a service to its ServicePort.
        //type ServiceMap map[ServicePortName]ServicePort
        //EndpointsMap用来保存endpoint信息,key为service名字,value为endpoint列表
        // EndpointsMap maps a service name to a list of all its Endpoints.
        //type EndpointsMap map[ServicePortName][]Endpoint
        //将 proxier.serviceChanges 中的信息更新到 proxier.serviceMap
        //将 proxier.endpointsChanges 中的信息更新到 proxier.endpointsMap
        // We assume that if this was called, we really want to sync them,
        // even if nothing changed in the meantime. In other words, callers are
        // responsible for detecting no-op changes and not calling this function.
        serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges)
        endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
    
        //创建如下链
        //iptables -t filter -N KUBE-EXTERNAL-SERVICES
        //iptables -t filter -N KUBE-NODEPORTS
        //iptables -t filter -N KUBE-SERVICES
        //iptables -t nat -N KUBE-SERVICES
        //iptables -t nat -N KUBE-POSTROUTING
        //添加如下规则
        //iptables -t filetr -I INPUT -j KUBE-EXTERNAL-SERVICES
        //iptables -t filetr -I FORWARD -j KUBE-EXTERNAL-SERVICES
        //iptables -t filetr -I INPUT -j KUBE-NODEPORTS
        //iptables -t filetr -I FORWARD -j KUBE-SERVICES
        //iptables -t filetr -I OUTPUT -j KUBE-SERVICES
        //iptables -t filetr -I FORWARD -j FORWARD
        //iptables -t nat -I OUTPUT -j KUBE-SERVICES
        //iptables -t nat -I PREROUTING -j KUBE-SERVICES
        //iptables -t nat -I POSTROUTING -j KUBE-POSTROUTING
        // Create and link the kube chains.
        for _, jump := range iptablesJumpChains {
            //确保filter表上的KUBE-EXTERNAL-SERVICES,KUBE-NODEPORTS,KUBE-SERVICES和KUBE-FORWARD链存在,
            //nat表上的KUBE-SERVICES和KUBE-POSTROUTING链存在
            if _, err := proxier.iptables.EnsureChain(jump.table, jump.dstChain); err != nil {
                klog.ErrorS(err, "Failed to ensure chain exists", "table", jump.table, "chain", jump.dstChain)
                return
            }
            args := append(jump.extraArgs,
                "-m", "comment", "--comment", jump.comment,
                "-j", string(jump.dstChain),
            )
            //根据iptablesJumpChains指定的规则,在filter和nat表上添加rule,跳转到k8s的链
            if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jump.table, jump.srcChain, args...); err != nil {
                klog.ErrorS(err, "Failed to ensure chain jumps", "table", jump.table, "srcChain", jump.srcChain, "dstChain", jump.dstChain)
                return
            }
        }
    
        //创建如下链
        //iptables -t nat -N KUBE-MARK-DROP
        // ensure KUBE-MARK-DROP chain exist but do not change any rules
        for _, ch := range iptablesEnsureChains {
            if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil {
                klog.ErrorS(err, "Failed to ensure chain exists", "table", ch.table, "chain", ch.chain)
                return
            }
        }
        
        //iptables-save 获取filter表上的所有规则,保存到 existingFilterChains
        // Get iptables-save output so we can check for existing chains and rules.
        // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
        existingFilterChains := make(map[utiliptables.Chain][]byte)
        proxier.existingFilterChainsData.Reset()
        err := proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.existingFilterChainsData)
        if err != nil { // if we failed to get any rules
            klog.ErrorS(err, "Failed to execute iptables-save, syncing all rules")
        } else { // otherwise parse the output
            existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.existingFilterChainsData.Bytes())
        }
        
        //iptables-save 获取nat表上的所有规则,保存到 existingNATChains
        // IMPORTANT: existingNATChains may share memory with proxier.iptablesData.
        existingNATChains := make(map[utiliptables.Chain][]byte)
        proxier.iptablesData.Reset()
        err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData)
        if err != nil { // if we failed to get any rules
            klog.ErrorS(err, "Failed to execute iptables-save, syncing all rules")
        } else { // otherwise parse the output
            existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes())
        }
        
        // Reset all buffers used later.
        // This is to avoid memory reallocations and thus improve performance.
        proxier.filterChains.Reset()
        proxier.filterRules.Reset()
        proxier.natChains.Reset()
        proxier.natRules.Reset()
    
        //先写iptables的表头。使用iptables-save保存的表格式是以"*"+表名开始的
        // Write table headers.
        utilproxy.WriteLine(proxier.filterChains, "*filter")
        utilproxy.WriteLine(proxier.natChains, "*nat")
    
        //在filter和nat表中写k8s的的链,前面已经保证有这些链了
        // Make sure we keep stats for the top-level chains, if they existed
        // (which most should have because we created them above).
        for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} {
            if chain, ok := existingFilterChains[chainName]; ok {
                utilproxy.WriteBytesLine(proxier.filterChains, chain)
            } else {
                utilproxy.WriteLine(proxier.filterChains, utiliptables.MakeChainLine(chainName))
            }
        }
        for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} {
            if chain, ok := existingNATChains[chainName]; ok {
                utilproxy.WriteBytesLine(proxier.natChains, chain)
            } else {
                utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(chainName))
            }
        }
    
        //iptables -t nat -A KUBE-POSTROUTING -m mark ! --mark masqueradeMark/masqueradeMark -j RETURN
        //iptables -t nat -A KUBE-POSTROUTING -j MARK --xor-mark  masqueradeMark
        //iptables -t nat -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
        // Install the kubernetes-specific postrouting rules. We use a whole chain for
        // this so that it is easier to flush and change, for example if the mark
        // value should ever change.
        // NB: THIS MUST MATCH the corresponding code in the kubelet
        utilproxy.WriteLine(proxier.natRules, []string{
            "-A", string(kubePostroutingChain),
            "-m", "mark", "!", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark),
            "-j", "RETURN",
        }...)
        // Clear the mark to avoid re-masquerading if the packet re-traverses the network stack.
        utilproxy.WriteLine(proxier.natRules, []string{
            "-A", string(kubePostroutingChain),
            // XOR proxier.masqueradeMark to unset it
            "-j", "MARK", "--xor-mark", proxier.masqueradeMark,
        }...)
        masqRule := []string{
            "-A", string(kubePostroutingChain),
            "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
            "-j", "MASQUERADE",
        }
        if proxier.iptables.HasRandomFully() {
            masqRule = append(masqRule, "--random-fully")
        }
        utilproxy.WriteLine(proxier.natRules, masqRule...)
    
        // Install the kubernetes-specific masquerade mark rule. We use a whole chain for
        // this so that it is easier to flush and change, for example if the mark
        // value should ever change.
        utilproxy.WriteLine(proxier.natRules, []string{
            "-A", string(KubeMarkMasqChain),
            "-j", "MARK", "--or-mark", proxier.masqueradeMark,
        }...)
        
        // Build rules for each service.
        for svcName, svc := range proxier.serviceMap {
            svcInfo, ok := svc.(*serviceInfo)
            protocol := strings.ToLower(string(svcInfo.Protocol()))
            svcNameString := svcInfo.serviceNameString
    
            //获取此service对应的endpoint
            allEndpoints := proxier.endpointsMap[svcName]
    
            // Filtering for topology aware endpoints. This function will only
            // filter endpoints if appropriate feature gates are enabled and the
            // Service does not have conflicting configuration such as
            // externalTrafficPolicy=Local.
            allEndpoints = proxy.FilterEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
            hasEndpoints := len(allEndpoints) > 0
    
            //servicePortChainName 为 KUBE-SVC-xxx
            svcChain := svcInfo.servicePortChainName
            //如果此service对应的endpoint不为空,需要在nat表中添加链 KUBE-SVC-xxx
            if hasEndpoints {
                // Create the per-service chain, retaining counters if possible.
                if chain, ok := existingNATChains[svcChain]; ok {
                    utilproxy.WriteBytesLine(proxier.natChains, chain)
                } else {
                    utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(svcChain))
                }
                activeNATChains[svcChain] = true
            }
    
            //serviceLBChainName 为 KUBE-XLB-xxx
            svcXlbChain := svcInfo.serviceLBChainName
            //如果service配置service.spec.externalTrafficPolicy为Local,则需要在nat表中添加链 KUBE-XLB-xxx
            if svcInfo.NodeLocalExternal() {
                // Only for services request OnlyLocal traffic
                // create the per-service LB chain, retaining counters if possible.
                if lbChain, ok := existingNATChains[svcXlbChain]; ok {
                    utilproxy.WriteBytesLine(proxier.natChains, lbChain)
                } else {
                    utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(svcXlbChain))
                }
                activeNATChains[svcXlbChain] = true
            }
            
            //如果此service对应的endpoint不为空,则需要在nat表添加如下规则,如果masqueradeAll为true,则跳转到 KUBE-MARK-MASQ
            //iptables -t nat -A KUBE-SVC -d cluster_ip -p tcp --dport 80 -m --comment "KUBE-SVC-XXX cluster ip" -j KUBE-SVC-XXX
            //否则在filter表中添加如下规则:
            //iptables -t filetr -A KUBE-SVC -d cluster_ip -p tcp --dport 80 -m --comment "KUBE-SVC-XXX has no endpoints" -j REJECT
            // Capture the clusterIP.
            if hasEndpoints {
                args = append(args[:0],
                    "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
                    "-m", protocol, "-p", protocol,
                    "-d", utilproxy.ToCIDR(svcInfo.ClusterIP()),
                    "--dport", strconv.Itoa(svcInfo.Port()),
                )
                if proxier.masqueradeAll {
                    utilproxy.WriteRuleLine(proxier.natRules, string(svcChain), append(args, "-j", string(KubeMarkMasqChain))...)
                } else if proxier.localDetector.IsImplemented() {
                    // This masquerades off-cluster traffic to a service VIP.  The idea
                    // is that you can establish a static route for your Service range,
                    // routing to any node, and that node will bridge into the Service
                    // for you.  Since that might bounce off-node, we masquerade here.
                    // If/when we support "Local" policy for VIPs, we should update this.
                    utilproxy.WriteRuleLine(proxier.natRules, string(svcChain), proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain))...)
                }
                utilproxy.WriteRuleLine(proxier.natRules, string(kubeServicesChain), append(args, "-j", string(svcChain))...)
            } else {
                // No endpoints.
                utilproxy.WriteLine(proxier.filterRules,
                    "-A", string(kubeServicesChain),
                    "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
                    "-m", protocol, "-p", protocol,
                    "-d", utilproxy.ToCIDR(svcInfo.ClusterIP()),
                    "--dport", strconv.Itoa(svcInfo.Port()),
                    "-j", "REJECT",
                )
            }
    
            // Capture externalIPs.
            for _, externalIP := range svcInfo.ExternalIPStrings() {
                //如果指定了外部ip,并且此ip是在本节点上,并且协议不是sctp,则需要通过socket打开service指定的端口,
                //如果不打开此端口,本节点上其他进程用了此端口,则会造成externalIP不工作
                // If the "external" IP happens to be an IP that is local to this
                // machine, hold the local port open so no other process can open it
                // (because the socket might open but it would never work).
                if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(netutils.ParseIPSloppy(externalIP)) {
                    lp := netutils.LocalPort{
                        Description: "externalIP for " + svcNameString,
                        IP:          externalIP,
                        IPFamily:    localPortIPFamily,
                        Port:        svcInfo.Port(),
                        Protocol:    netutils.Protocol(svcInfo.Protocol()),
                    }
                    if proxier.portsMap[lp] != nil {
                        klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String())
                        replacementPortsMap[lp] = proxier.portsMap[lp]
                    } else {
                        socket, err := proxier.portMapper.OpenLocalPort(&lp)
                        klog.V(2).InfoS("Opened local port", "port", lp.String())
                        replacementPortsMap[lp] = socket
                    }
                }
    
                //如果此service对应的endpoint不为空,并且NodeLocalExternal为cluster,则需要在nat表添加如下规则,
                //iptables -t nat -A KUBE-SVC -d externalIP -p tcp --dport 80 -m --comment "KUBE-SVC-XXX external ip" -j KUBE-SVC-XXX
                //如果NodeLocalExternal为local,则跳转到 -j KUBE-XLB-XXX
                //如果此service对应的endpoint为空,则添加如下规则:
                //iptables -t nat -A KUBE-SVC -d externalIP -p tcp --dport 80 -m --comment "KUBE-SVC-XXX has no endpoints" -j REJECT
                if hasEndpoints {
                    args = append(args[:0],
                        "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString),
                        "-m", protocol, "-p", protocol,
                        "-d", utilproxy.ToCIDR(netutils.ParseIPSloppy(externalIP)),
                        "--dport", strconv.Itoa(svcInfo.Port()),
                    )
    
                    destChain := svcXlbChain
                    // We have to SNAT packets to external IPs if externalTrafficPolicy is cluster
                    // and the traffic is NOT Local. Local traffic coming from Pods and Nodes will
                    // be always forwarded to the corresponding Service, so no need to SNAT
                    // If we can't differentiate the local traffic we always SNAT.
                    if !svcInfo.NodeLocalExternal() {
                        destChain = svcChain
                        // This masquerades off-cluster traffic to a External IP.
                        if proxier.localDetector.IsImplemented() {
                            utilproxy.WriteRuleLine(proxier.natRules, string(svcChain), proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain))...)
                        } else {
                            utilproxy.WriteRuleLine(proxier.natRules, string(svcChain), append(args, "-j", string(KubeMarkMasqChain))...)
                        }
                    }
                    // Send traffic bound for external IPs to the service chain.
                    utilproxy.WriteRuleLine(proxier.natRules, string(kubeServicesChain), append(args, "-j", string(destChain))...)
                } else {
                    // No endpoints.
                    utilproxy.WriteLine(proxier.filterRules,
                        "-A", string(kubeExternalServicesChain),
                        "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
                        "-m", protocol, "-p", protocol,
                        "-d", utilproxy.ToCIDR(netutils.ParseIPSloppy(externalIP)),
                        "--dport", strconv.Itoa(svcInfo.Port()),
                        "-j", "REJECT",
                    )
                }
            
            //如果service的类型为LoadBalancer,并且成功分配到了ip,即LoadBalancer Ingress不为空,则添加如下规则:
            //iptables -t nat -A KUBE-SVC -d ingress -p tcp --dport 80 -m --comment "KUBE-SVC-XXX loadbalancer IP" -j KUBE-FW-xxx
            //正常来说,外部的流量是通过lb访问service的,这里的规则是给本节点访问service设置的,这样就不用转发到真实的lb了,减少延迟
            // Capture load-balancer ingress.
            fwChain := svcInfo.serviceFirewallChainName
            for _, ingress := range svcInfo.LoadBalancerIPStrings() {
                if ingress != "" {
                    if hasEndpoints {
                        //确保nat表上的 KUBE-FW-xxx 链存在
                        // create service firewall chain
                        if chain, ok := existingNATChains[fwChain]; ok {
                            utilproxy.WriteBytesLine(proxier.natChains, chain)
                        } else {
                            utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(fwChain))
                        }
                        activeNATChains[fwChain] = true
                        // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
                        // This currently works for loadbalancers that preserves source ips.
                        // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
    
                        args = append(args[:0],
                            "-A", string(kubeServicesChain),
                            "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
                            "-m", protocol, "-p", protocol,
                            "-d", utilproxy.ToCIDR(netutils.ParseIPSloppy(ingress)),
                            "--dport", strconv.Itoa(svcInfo.Port()),
                        )
                        // jump to service firewall chain
                        utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(fwChain))...)
    
                        args = append(args[:0],
                            "-A", string(fwChain),
                            "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
                        )
    
                        // Each source match rule in the FW chain may jump to either the SVC or the XLB chain
                        chosenChain := svcXlbChain
                        // If we are proxying globally, we need to masquerade in case we cross nodes.
                        // If we are proxying only locally, we can retain the source IP.
                        if !svcInfo.NodeLocalExternal() {
                            utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
                            chosenChain = svcChain
                        }
    
                        if len(svcInfo.LoadBalancerSourceRanges()) == 0 {
                            // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain
                            utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(chosenChain))...)
                        } else {
                            // firewall filter based on each source range
                            allowFromNode := false
                            for _, src := range svcInfo.LoadBalancerSourceRanges() {
                                utilproxy.WriteLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...)
                                _, cidr, err := netutils.ParseCIDRSloppy(src)
                                if err != nil {
                                    klog.ErrorS(err, "Error parsing CIDR in LoadBalancerSourceRanges, dropping it", "cidr", cidr)
                                } else if cidr.Contains(proxier.nodeIP) {
                                    allowFromNode = true
                                }
                            }
                            // generally, ip route rule was added to intercept request to loadbalancer vip from the
                            // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
                            // Need to add the following rule to allow request on host.
                            if allowFromNode {
                                utilproxy.WriteLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(netutils.ParseIPSloppy(ingress)), "-j", string(chosenChain))...)
                            }
                        }
    
                        // If the packet was able to reach the end of firewall chain, then it did not get DNATed.
                        // It means the packet cannot go thru the firewall, then mark it for DROP
                        utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...)
                    } else {
                        // No endpoints.
                        utilproxy.WriteLine(proxier.filterRules,
                            "-A", string(kubeExternalServicesChain),
                            "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
                            "-m", protocol, "-p", protocol,
                            "-d", utilproxy.ToCIDR(netutils.ParseIPSloppy(ingress)),
                            "--dport", strconv.Itoa(svcInfo.Port()),
                            "-j", "REJECT",
                        )
                    }
                }
            }
            
            如果service的类型为NodePort,则打开service对应的端口,添加相关规则
            // Capture nodeports.  If we had more than 2 rules it might be
            // worthwhile to make a new per-service chain for nodeport rules, but
            // with just 2 rules it ends up being a waste and a cognitive burden.
            if svcInfo.NodePort() != 0 {
                lps := make([]netutils.LocalPort, 0)
                for address := range nodeAddresses {
                    lp := netutils.LocalPort{
                        Description: "nodePort for " + svcNameString,
                        IP:          address,
                        IPFamily:    localPortIPFamily,
                        Port:        svcInfo.NodePort(),
                        Protocol:    netutils.Protocol(svcInfo.Protocol()),
                    }
                    if utilproxy.IsZeroCIDR(address) {
                        // Empty IP address means all
                        lp.IP = ""
                        lps = append(lps, lp)
                        // If we encounter a zero CIDR, then there is no point in processing the rest of the addresses.
                        break
                    }
                    lps = append(lps, lp)
                }
    
                // For ports on node IPs, open the actual port and hold it.
                for _, lp := range lps {
                    if proxier.portsMap[lp] != nil {
                        klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String())
                        replacementPortsMap[lp] = proxier.portsMap[lp]
                    } else if svcInfo.Protocol() != v1.ProtocolSCTP {
                        socket, err := proxier.portMapper.OpenLocalPort(&lp)
                        klog.V(2).InfoS("Opened local port", "port", lp.String())
                        replacementPortsMap[lp] = socket
                    }
                }
    
                如果NodeLocalExternal为cluster,则添加如下规则:
                //iptables -t nat -A KUBE-SVC -p tcp --dport 80 -m --comment "KUBE-SVC port" -j KUBE-MARK-MASQ
                //iptables -t nat -A KUBE-NODEPORTS -p tcp --dport 80 -m --comment "KUBE-SVC port" -j KUBE-FW-xxx
                如果NodeLocalExternal为local,则添加如下规则:
                //iptables -t nat -A KUBE-SVC -p tcp --dport 80 -m --comment "KUBE-SVC port" -s 127.0.0.0/8 -j KUBE-MARK-MASQ
                //iptables -t nat -A KUBE-NODEPORTS -p tcp --dport 80 -m --comment "KUBE-SVC port" -j KUBE-XLB-xxx
                if hasEndpoints {
                    args = append(args[:0],
                        "-m", "comment", "--comment", svcNameString,
                        "-m", protocol, "-p", protocol,
                        "--dport", strconv.Itoa(svcInfo.NodePort()),
                    )
                    if !svcInfo.NodeLocalExternal() {
                        // Nodeports need SNAT, unless they're local.
                        utilproxy.WriteRuleLine(proxier.natRules, string(svcChain), append(args, "-j", string(KubeMarkMasqChain))...)
                        // Jump to the service chain.
                        utilproxy.WriteRuleLine(proxier.natRules, string(kubeNodePortsChain), append(args, "-j", string(svcChain))...)
                    } else {
                        // TODO: Make all nodePorts jump to the firewall chain.
                        // Currently we only create it for loadbalancers (#33586).
    
                        // Fix localhost martian source error
                        loopback := "127.0.0.0/8"
                        if isIPv6 {
                            loopback = "::1/128"
                        }
                        utilproxy.WriteRuleLine(proxier.natRules, string(kubeNodePortsChain), append(args, "-s", loopback, "-j", string(KubeMarkMasqChain))...)
                        utilproxy.WriteRuleLine(proxier.natRules, string(kubeNodePortsChain), append(args, "-j", string(svcXlbChain))...)
                    }
                } else {
                    // No endpoints.
                    utilproxy.WriteLine(proxier.filterRules,
                        "-A", string(kubeExternalServicesChain),
                        "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
                        "-m", "addrtype", "--dst-type", "LOCAL",
                        "-m", protocol, "-p", protocol,
                        "--dport", strconv.Itoa(svcInfo.NodePort()),
                        "-j", "REJECT",
                    )
                }
            }
    
            //遍历此service对应的所有的endpoint,创建对应的规则。每个endpoint对应一个pod,有可能在本节点,也有可能在其他节点上
            endpoints = endpoints[:0]
            endpointChains = endpointChains[:0]
            for _, ep := range allEndpoints {
                epInfo, ok := ep.(*endpointsInfo)
                endpoints = append(endpoints, epInfo)
                //对于每个endpoint,生成一条链 KUBE-SEP-xxx
                endpointChain = epInfo.endpointChain(svcNameString, protocol)
                endpointChains = append(endpointChains, endpointChain)
                
                //确保nat表中存在链 KUBE-SEP-xxx
                // Create the endpoint chain, retaining counters if possible.
                if chain, ok := existingNATChains[endpointChain]; ok {
                    utilproxy.WriteBytesLine(proxier.natChains, chain)
                } else {
                    utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain))
                }
                activeNATChains[endpointChain] = true
            }
            
            //遍历所有的endpoint链,将其分类存放
            readyEndpointChains = readyEndpointChains[:0]
            readyEndpoints := readyEndpoints[:0]
            localReadyEndpointChains := localReadyEndpointChains[:0]
            localServingTerminatingEndpointChains := localServingTerminatingEndpointChains[:0]
            for i, endpointChain := range endpointChains {
                if endpoints[i].Ready {
                    readyEndpointChains = append(readyEndpointChains, endpointChain)
                    readyEndpoints = append(readyEndpoints, endpoints[i])
                }
    
                if svc.NodeLocalExternal() && endpoints[i].IsLocal {
                    if endpoints[i].Ready {
                        localReadyEndpointChains = append(localReadyEndpointChains, endpointChain)
                    } else if endpoints[i].Serving && endpoints[i].Terminating {
                        localServingTerminatingEndpointChains = append(localServingTerminatingEndpointChains, endpointChain)
                    }
                }
            }
            
            //创建负载均衡规则,如果只有一个endpoint,则添加如下规则:
            //iptables -t nat -A KUBE-SVC-xxx -j KUBE-SEP-xxx
            //如果有多个endpoint,则按照随机值生成负载均衡规则,如下
            //iptables -t nat -A KUBE-SVC-xxx -m statistic --mode random --probability 0.5 -j KUBE-SEP-xxx
            //iptables -t nat -A KUBE-SVC-xxx -m statistic --mode random --probability 0.5 -j KUBE-SEP-yyy
            // Now write loadbalancing & DNAT rules.
            numReadyEndpoints := len(readyEndpointChains)
            for i, endpointChain := range readyEndpointChains {
                epIP := readyEndpoints[i].IP()
                if epIP == "" {
                    // Error parsing this endpoint has been logged. Skip to next endpoint.
                    continue
                }
    
                // Balancing rules in the per-service chain.
                args = append(args[:0], "-A", string(svcChain))
                args = proxier.appendServiceCommentLocked(args, svcNameString)
                if i < (numReadyEndpoints - 1) {
                    // Each rule is a probabilistic match.
                    args = append(args,
                        "-m", "statistic",
                        "--mode", "random",
                        "--probability", proxier.probability(numReadyEndpoints-i))
                }
                // The final (or only if n == 1) rule is a guaranteed match.
                args = append(args, "-j", string(endpointChain))
                utilproxy.WriteLine(proxier.natRules, args...)
            }
    
            //在每个endpoint链上添加DNAT规则,如下
            //iptables -t nat -A KUBE-SEP-xxx -s endpointip -j KUBE-MARK-MASQ
            //iptables -t nat -A KUBE-SEP-xxx -m protocol -p tcp -j DNAT --to-destination endpointip
            // Every endpoint gets a chain, regardless of its state. This is required later since we may
            // want to jump to endpoint chains that are terminating.
            for i, endpointChain := range endpointChains {
                epIP := endpoints[i].IP()
                if epIP == "" {
                    // Error parsing this endpoint has been logged. Skip to next endpoint.
                    continue
                }
    
                // Rules in the per-endpoint chain.
                args = append(args[:0], "-A", string(endpointChain))
                args = proxier.appendServiceCommentLocked(args, svcNameString)
                // Handle traffic that loops back to the originator with SNAT.
                utilproxy.WriteLine(proxier.natRules, append(args,
                    "-s", utilproxy.ToCIDR(netutils.ParseIPSloppy(epIP)),
                    "-j", string(KubeMarkMasqChain))...)
                // Update client-affinity lists.
                if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
                    args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
                }
                // DNAT to final destination.
                args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].Endpoint)
                utilproxy.WriteLine(proxier.natRules, args...)
            }
            
            //前面有规则在NodeLocalExternal为local时,将规则跳转到链 KUBE-XLB-xxx 上,这里添加此链上的规则:
            //如果没有local的endpoint,则添加:
            //iptables -t nat -A KUBE-XLB-xxx -m comment --comment "svcx has no local endpoints" -j KUBE-MARK-DROP
            //如果有local的endpoint,则添加:
            //iptables -t nat -A KUBE-XLB-xxx -m comment --comment "Balancing rule 0 for svcx" -m statistic --mode random --probability 0.5 -j KUBE-SEP-xxx
            //iptables -t nat -A KUBE-XLB-xxx -m comment --comment "Balancing rule 1 for svcx" -m statistic --mode random --probability 0.5 -j KUBE-SEP-yyy
            numLocalEndpoints := len(localEndpointChains)
            if numLocalEndpoints == 0 {
                // Blackhole all traffic since there are no local endpoints
                args = append(args[:0],
                    "-A", string(svcXlbChain),
                    "-m", "comment", "--comment",
                    fmt.Sprintf(`"%s has no local endpoints"`, svcNameString),
                    "-j",
                    string(KubeMarkDropChain),
                )
                utilproxy.WriteLine(proxier.natRules, args...)
            } else {
                ...
    
                // Setup probability filter rules only over local endpoints
                for i, endpointChain := range localEndpointChains {
                    // Balancing rules in the per-service chain.
                    args = append(args[:0],
                        "-A", string(svcXlbChain),
                        "-m", "comment", "--comment",
                        fmt.Sprintf(`"Balancing rule %d for %s"`, i, svcNameString),
                    )
                    if i < (numLocalEndpoints - 1) {
                        // Each rule is a probabilistic match.
                        args = append(args,
                            "-m", "statistic",
                            "--mode", "random",
                            "--probability", proxier.probability(numLocalEndpoints-i))
                    }
                    // The final (or only if n == 1) rule is a guaranteed match.
                    args = append(args, "-j", string(endpointChain))
                    utilproxy.WriteLine(proxier.natRules, args...)
                }
            }
        }
        
        //删除nat表上不再需要的链和规则
        // Delete chains no longer in use.
        for chain := range existingNATChains {
            if !activeNATChains[chain] {
                chainString := string(chain)
                if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") {
                    // Ignore chains that aren't ours.
                    continue
                }
                // We must (as per iptables) write a chain-line for it, which has
                // the nice effect of flushing the chain.  Then we can remove the
                // chain.
                utilproxy.WriteBytesLine(proxier.natChains, existingNATChains[chain])
                utilproxy.WriteLine(proxier.natRules, "-X", chainString)
            }
        }
    
        //最后添加跳转到nodeport的规则
        // Finally, tail-call to the nodeports chain.  This needs to be after all
        // other service portal rules.
        isIPv6 := proxier.iptables.IsIPv6()
        for address := range nodeAddresses {
            // TODO(thockin, m1093782566): If/when we have dual-stack support we will want to distinguish v4 from v6 zero-CIDRs.
            if utilproxy.IsZeroCIDR(address) {
                args = append(args[:0],
                    "-A", string(kubeServicesChain),
                    "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
                    "-m", "addrtype", "--dst-type", "LOCAL",
                    "-j", string(kubeNodePortsChain))
                utilproxy.WriteLine(proxier.natRules, args...)
                // Nothing else matters after the zero CIDR.
                break
            }
            // Ignore IP addresses with incorrect version
            if isIPv6 && !netutils.IsIPv6String(address) || !isIPv6 && netutils.IsIPv6String(address) {
                klog.ErrorS(nil, "IP has incorrect IP version", "ip", address)
                continue
            }
            // create nodeport rules for each IP one by one
            args = append(args[:0],
                "-A", string(kubeServicesChain),
                "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
                "-d", address,
                "-j", string(kubeNodePortsChain))
            utilproxy.WriteLine(proxier.natRules, args...)
        }
        
        ...
        
        //写表结束标志
        // Write the end-of-table markers.
        utilproxy.WriteLine(proxier.filterRules, "COMMIT")
        utilproxy.WriteLine(proxier.natRules, "COMMIT")
    
        // Sync rules.
        // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table
        proxier.iptablesData.Reset()
        proxier.iptablesData.Write(proxier.filterChains.Bytes())
        proxier.iptablesData.Write(proxier.filterRules.Bytes())
        proxier.iptablesData.Write(proxier.natChains.Bytes())
        proxier.iptablesData.Write(proxier.natRules.Bytes())
    
        //将iptables规则重载一下,相当于执行 iptables-restore < xxx
        klog.V(5).InfoS("Restoring iptables", "rules", proxier.iptablesData.Bytes())
        err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
    }
    

    相关文章

      网友评论

          本文标题:k8s kube-proxy源码分析

          本文链接:https://www.haomeiwen.com/subject/yuslertx.html