美文网首页
k8s 之 kube-proxy 源码简单分析

k8s 之 kube-proxy 源码简单分析

作者: wwq2020 | 来源:发表于2020-07-17 17:00 被阅读0次

    简介

    kube-proxy 监听 apiserver 获取 endpoint 变化,然后写入到 iptables

    cmd/kube-proxy/proxy.go 中

    func main() {
        ...
        command := app.NewProxyCommand()
        ...
        if err := command.Execute(); err != nil {
            os.Exit(1)
        }
    }
    
    

    cmd/kube-proxy/app/server.go 中

    func NewProxyCommand() *cobra.Command {
        ...
        if err := opts.Run(); err != nil {
                    klog.Exit(err)
        }
        ...
    }
    
    func (o *Options) Run() error {
        defer close(o.errCh)
        if len(o.WriteConfigTo) > 0 {
            return o.writeConfigFile()
        }
    
        proxyServer, err := NewProxyServer(o)
        if err != nil {
            return err
        }
    
        if o.CleanupAndExit {
            return proxyServer.CleanupAndExit()
        }
    
        o.proxyServer = proxyServer
        return o.runLoop()
    }
    
    func (o *Options) runLoop() error {
        if o.watcher != nil {
            o.watcher.Run()
        }
    
        // run the proxy in goroutine
        go func() {
            err := o.proxyServer.Run()
            o.errCh <- err
        }()
    
        for {
            err := <-o.errCh
            if err != nil {
                return err
            }
        }
    }
    
    func (s *ProxyServer) Run() error {
        ...
        endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
        endpointsConfig.RegisterEventHandler(s.Proxier)
        go endpointsConfig.Run(wait.NeverStop)
        ...
        go s.Proxier.SyncLoop()
    
        return <-errCh
    }
    

    cmd/kube-proxy/app/server_others.go 中

    func NewProxyServer(o *Options) (*ProxyServer, error) {
        return newProxyServer(o.config, o.CleanupAndExit, o.master)
    }
    
    func newProxyServer(
        config *proxyconfigapi.KubeProxyConfiguration,
        cleanupAndExit bool,
        master string) (*ProxyServer, error) {
        if proxyMode == proxyModeIPTables {
            ...
            proxier, err = iptables.NewProxier(
                    iptInterface,
                    utilsysctl.New(),
                    execer,
                    config.IPTables.SyncPeriod.Duration,
                    config.IPTables.MinSyncPeriod.Duration,
                    config.IPTables.MasqueradeAll,
                    int(*config.IPTables.MasqueradeBit),
                    localDetector,
                    hostname,
                    nodeIP,
                    recorder,
                    healthzServer,
                    config.NodePortAddresses,
                )
            ...
        }
        ...
        return &ProxyServer{
            Client:                 client,
            EventClient:            eventClient,
            IptInterface:           iptInterface,
            IpvsInterface:          ipvsInterface,
            IpsetInterface:         ipsetInterface,
            execer:                 execer,
            Proxier:                proxier,
            Broadcaster:            eventBroadcaster,
            Recorder:               recorder,
            ConntrackConfiguration: config.Conntrack,
            Conntracker:            &realConntracker{},
            ProxyMode:              proxyMode,
            NodeRef:                nodeRef,
            MetricsBindAddress:     config.MetricsBindAddress,
            BindAddressHardFail:    config.BindAddressHardFail,
            EnableProfiling:        config.EnableProfiling,
            OOMScoreAdj:            config.OOMScoreAdj,
            ConfigSyncPeriod:       config.ConfigSyncPeriod.Duration,
            HealthzServer:          healthzServer,
            UseEndpointSlices:      utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying),
        }, nil
    }
    
    

    pkg/proxy/iptables/proxier.go 中

    func NewProxier(ipt utiliptables.Interface,
        sysctl utilsysctl.Interface,
        exec utilexec.Interface,
        syncPeriod time.Duration,
        minSyncPeriod time.Duration,
        masqueradeAll bool,
        masqueradeBit int,
        localDetector proxyutiliptables.LocalTrafficDetector,
        hostname string,
        nodeIP net.IP,
        recorder record.EventRecorder,
        healthzServer healthcheck.ProxierHealthUpdater,
        nodePortAddresses []string,
    ) (*Proxier, error) {
        // Set the route_localnet sysctl we need for
        if err := utilproxy.EnsureSysctl(sysctl, sysctlRouteLocalnet, 1); err != nil {
            return nil, err
        }
    
        // Proxy needs br_netfilter and bridge-nf-call-iptables=1 when containers
        // are connected to a Linux bridge (but not SDN bridges).  Until most
        // plugins handle this, log when config is missing
        if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
            klog.Warning("missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended")
        }
    
        // Generate the masquerade mark to use for SNAT rules.
        masqueradeValue := 1 << uint(masqueradeBit)
        masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
        klog.V(2).Infof("iptables(%s) masquerade mark: %s", ipt.Protocol(), masqueradeMark)
    
        endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying)
    
        serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)
    
        isIPv6 := ipt.IsIPv6()
        var incorrectAddresses []string
        nodePortAddresses, incorrectAddresses = utilproxy.FilterIncorrectCIDRVersion(nodePortAddresses, isIPv6)
        if len(incorrectAddresses) > 0 {
            klog.Warning("NodePortAddresses of wrong family; ", incorrectAddresses)
        }
        proxier := &Proxier{
            portsMap:                 make(map[utilproxy.LocalPort]utilproxy.Closeable),
            serviceMap:               make(proxy.ServiceMap),
            serviceChanges:           proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
            endpointsMap:             make(proxy.EndpointsMap),
            endpointsChanges:         proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, &isIPv6, recorder, endpointSlicesEnabled),
            syncPeriod:               syncPeriod,
            iptables:                 ipt,
            masqueradeAll:            masqueradeAll,
            masqueradeMark:           masqueradeMark,
            exec:                     exec,
            localDetector:            localDetector,
            hostname:                 hostname,
            nodeIP:                   nodeIP,
            portMapper:               &listenPortOpener{},
            recorder:                 recorder,
            serviceHealthServer:      serviceHealthServer,
            healthzServer:            healthzServer,
            precomputedProbabilities: make([]string, 0, 1001),
            iptablesData:             bytes.NewBuffer(nil),
            existingFilterChainsData: bytes.NewBuffer(nil),
            filterChains:             bytes.NewBuffer(nil),
            filterRules:              bytes.NewBuffer(nil),
            natChains:                bytes.NewBuffer(nil),
            natRules:                 bytes.NewBuffer(nil),
            nodePortAddresses:        nodePortAddresses,
            networkInterfacer:        utilproxy.RealNetwork{},
        }
    
        burstSyncs := 2
        klog.V(2).Infof("iptables(%s) sync params: minSyncPeriod=%v, syncPeriod=%v, burstSyncs=%d",
            ipt.Protocol(), minSyncPeriod, syncPeriod, burstSyncs)
        // We pass syncPeriod to ipt.Monitor, which will call us only if it needs to.
        // We need to pass *some* maxInterval to NewBoundedFrequencyRunner anyway though.
        // time.Hour is arbitrary.
        proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs)
    
        go ipt.Monitor(utiliptables.Chain("KUBE-PROXY-CANARY"),
            []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter},
            proxier.syncProxyRules, syncPeriod, wait.NeverStop)
    
        if ipt.HasRandomFully() {
            klog.V(2).Infof("iptables(%s) supports --random-fully", ipt.Protocol())
        } else {
            klog.V(2).Infof("iptables(%s) does not support --random-fully", ipt.Protocol())
        }
    
        return proxier, nil
    }
    
    func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
        if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() {
            proxier.Sync()
        }
    }
    
    func (proxier *Proxier) Sync() {
        if proxier.healthzServer != nil {
            proxier.healthzServer.QueuedUpdate()
        }
        metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
        proxier.syncRunner.Run()
    }
    
    func (proxier *Proxier) SyncLoop() {
        // Update healthz timestamp at beginning in case Sync() never succeeds.
        if proxier.healthzServer != nil {
            proxier.healthzServer.Updated()
        }
    
        // synthesize "last change queued" time as the informers are syncing.
        metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
        proxier.syncRunner.Loop(wait.NeverStop)
    }
    
    // The only other iptables rules are those that are setup in iptablesInit()
    // This assumes proxier.mu is NOT held
    func (proxier *Proxier) syncProxyRules() {
        proxier.mu.Lock()
        defer proxier.mu.Unlock()
    
        // don't sync rules till we've received services and endpoints
        if !proxier.isInitialized() {
            klog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
            return
        }
    
        // Keep track of how long syncs take.
        start := time.Now()
        defer func() {
            metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
            klog.V(2).Infof("syncProxyRules took %v", time.Since(start))
        }()
    
        localAddrs, err := utilproxy.GetLocalAddrs()
        if err != nil {
            klog.Errorf("Failed to get local addresses during proxy sync: %v, assuming external IPs are not local", err)
        } else if len(localAddrs) == 0 {
            klog.Warning("No local addresses found, assuming all external IPs are not local")
        }
    
        localAddrSet := utilnet.IPSet{}
        localAddrSet.Insert(localAddrs...)
    
        nodeAddresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
        if err != nil {
            klog.Errorf("Failed to get node ip address matching nodeport cidrs %v, services with nodeport may not work as intended: %v", proxier.nodePortAddresses, err)
        }
    
        // We assume that if this was called, we really want to sync them,
        // even if nothing changed in the meantime. In other words, callers are
        // responsible for detecting no-op changes and not calling this function.
        serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
        endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
    
        staleServices := serviceUpdateResult.UDPStaleClusterIP
        // merge stale services gathered from updateEndpointsMap
        for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
            if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
                klog.V(2).Infof("Stale %s service %v -> %s", strings.ToLower(string(svcInfo.Protocol())), svcPortName, svcInfo.ClusterIP().String())
                staleServices.Insert(svcInfo.ClusterIP().String())
                for _, extIP := range svcInfo.ExternalIPStrings() {
                    staleServices.Insert(extIP)
                }
            }
        }
    
        klog.V(2).Info("Syncing iptables rules")
    
        success := false
        defer func() {
            if !success {
                klog.Infof("Sync failed; retrying in %s", proxier.syncPeriod)
                proxier.syncRunner.RetryAfter(proxier.syncPeriod)
            }
        }()
    
        // Create and link the kube chains.
        for _, jump := range iptablesJumpChains {
            if _, err := proxier.iptables.EnsureChain(jump.table, jump.dstChain); err != nil {
                klog.Errorf("Failed to ensure that %s chain %s exists: %v", jump.table, jump.dstChain, err)
                return
            }
            args := append(jump.extraArgs,
                "-m", "comment", "--comment", jump.comment,
                "-j", string(jump.dstChain),
            )
            if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jump.table, jump.srcChain, args...); err != nil {
                klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", jump.table, jump.srcChain, jump.dstChain, err)
                return
            }
        }
    
        //
        // Below this point we will not return until we try to write the iptables rules.
        //
    
        // Get iptables-save output so we can check for existing chains and rules.
        // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
        existingFilterChains := make(map[utiliptables.Chain][]byte)
        proxier.existingFilterChainsData.Reset()
        err = proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.existingFilterChainsData)
        if err != nil { // if we failed to get any rules
            klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
        } else { // otherwise parse the output
            existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.existingFilterChainsData.Bytes())
        }
    
        // IMPORTANT: existingNATChains may share memory with proxier.iptablesData.
        existingNATChains := make(map[utiliptables.Chain][]byte)
        proxier.iptablesData.Reset()
        err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData)
        if err != nil { // if we failed to get any rules
            klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
        } else { // otherwise parse the output
            existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes())
        }
    
        // Reset all buffers used later.
        // This is to avoid memory reallocations and thus improve performance.
        proxier.filterChains.Reset()
        proxier.filterRules.Reset()
        proxier.natChains.Reset()
        proxier.natRules.Reset()
    
        // Write table headers.
        writeLine(proxier.filterChains, "*filter")
        writeLine(proxier.natChains, "*nat")
    
        // Make sure we keep stats for the top-level chains, if they existed
        // (which most should have because we created them above).
        for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} {
            if chain, ok := existingFilterChains[chainName]; ok {
                writeBytesLine(proxier.filterChains, chain)
            } else {
                writeLine(proxier.filterChains, utiliptables.MakeChainLine(chainName))
            }
        }
        for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} {
            if chain, ok := existingNATChains[chainName]; ok {
                writeBytesLine(proxier.natChains, chain)
            } else {
                writeLine(proxier.natChains, utiliptables.MakeChainLine(chainName))
            }
        }
    
        // Install the kubernetes-specific postrouting rules. We use a whole chain for
        // this so that it is easier to flush and change, for example if the mark
        // value should ever change.
        // NB: THIS MUST MATCH the corresponding code in the kubelet
        writeLine(proxier.natRules, []string{
            "-A", string(kubePostroutingChain),
            "-m", "mark", "!", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark),
            "-j", "RETURN",
        }...)
        // Clear the mark to avoid re-masquerading if the packet re-traverses the network stack.
        writeLine(proxier.natRules, []string{
            "-A", string(kubePostroutingChain),
            // XOR proxier.masqueradeMark to unset it
            "-j", "MARK", "--xor-mark", proxier.masqueradeMark,
        }...)
        masqRule := []string{
            "-A", string(kubePostroutingChain),
            "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
            "-j", "MASQUERADE",
        }
        if proxier.iptables.HasRandomFully() {
            masqRule = append(masqRule, "--random-fully")
        }
        writeLine(proxier.natRules, masqRule...)
    
        // Install the kubernetes-specific masquerade mark rule. We use a whole chain for
        // this so that it is easier to flush and change, for example if the mark
        // value should ever change.
        writeLine(proxier.natRules, []string{
            "-A", string(KubeMarkMasqChain),
            "-j", "MARK", "--or-mark", proxier.masqueradeMark,
        }...)
    
        // Accumulate NAT chains to keep.
        activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set
    
        // Accumulate the set of local ports that we will be holding open once this update is complete
        replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{}
    
        // We are creating those slices ones here to avoid memory reallocations
        // in every loop. Note that reuse the memory, instead of doing:
        //   slice = <some new slice>
        // you should always do one of the below:
        //   slice = slice[:0] // and then append to it
        //   slice = append(slice[:0], ...)
        endpoints := make([]*endpointsInfo, 0)
        endpointChains := make([]utiliptables.Chain, 0)
        // To avoid growing this slice, we arbitrarily set its size to 64,
        // there is never more than that many arguments for a single line.
        // Note that even if we go over 64, it will still be correct - it
        // is just for efficiency, not correctness.
        args := make([]string, 64)
    
        // Compute total number of endpoint chains across all services.
        proxier.endpointChainsNumber = 0
        for svcName := range proxier.serviceMap {
            proxier.endpointChainsNumber += len(proxier.endpointsMap[svcName])
        }
    
        // Build rules for each service.
        for svcName, svc := range proxier.serviceMap {
            svcInfo, ok := svc.(*serviceInfo)
            if !ok {
                klog.Errorf("Failed to cast serviceInfo %q", svcName.String())
                continue
            }
            isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
            protocol := strings.ToLower(string(svcInfo.Protocol()))
            svcNameString := svcInfo.serviceNameString
    
            allEndpoints := proxier.endpointsMap[svcName]
    
            hasEndpoints := len(allEndpoints) > 0
    
            // Service Topology will not be enabled in the following cases:
            // 1. externalTrafficPolicy=Local (mutually exclusive with service topology).
            // 2. ServiceTopology is not enabled.
            // 3. EndpointSlice is not enabled (service topology depends on endpoint slice
            // to get topology information).
            if !svcInfo.OnlyNodeLocalEndpoints() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) {
                allEndpoints = proxy.FilterTopologyEndpoint(proxier.nodeLabels, svcInfo.TopologyKeys(), allEndpoints)
                hasEndpoints = len(allEndpoints) > 0
            }
    
            svcChain := svcInfo.servicePortChainName
            if hasEndpoints {
                // Create the per-service chain, retaining counters if possible.
                if chain, ok := existingNATChains[svcChain]; ok {
                    writeBytesLine(proxier.natChains, chain)
                } else {
                    writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain))
                }
                activeNATChains[svcChain] = true
            }
    
            svcXlbChain := svcInfo.serviceLBChainName
            if svcInfo.OnlyNodeLocalEndpoints() {
                // Only for services request OnlyLocal traffic
                // create the per-service LB chain, retaining counters if possible.
                if lbChain, ok := existingNATChains[svcXlbChain]; ok {
                    writeBytesLine(proxier.natChains, lbChain)
                } else {
                    writeLine(proxier.natChains, utiliptables.MakeChainLine(svcXlbChain))
                }
                activeNATChains[svcXlbChain] = true
            }
    
            // Capture the clusterIP.
            if hasEndpoints {
                args = append(args[:0],
                    "-A", string(kubeServicesChain),
                    "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
                    "-m", protocol, "-p", protocol,
                    "-d", utilproxy.ToCIDR(svcInfo.ClusterIP()),
                    "--dport", strconv.Itoa(svcInfo.Port()),
                )
                if proxier.masqueradeAll {
                    writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
                } else if proxier.localDetector.IsImplemented() {
                    // This masquerades off-cluster traffic to a service VIP.  The idea
                    // is that you can establish a static route for your Service range,
                    // routing to any node, and that node will bridge into the Service
                    // for you.  Since that might bounce off-node, we masquerade here.
                    // If/when we support "Local" policy for VIPs, we should update this.
                    writeLine(proxier.natRules, proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain))...)
                }
                writeLine(proxier.natRules, append(args, "-j", string(svcChain))...)
            } else {
                // No endpoints.
                writeLine(proxier.filterRules,
                    "-A", string(kubeServicesChain),
                    "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
                    "-m", protocol, "-p", protocol,
                    "-d", utilproxy.ToCIDR(svcInfo.ClusterIP()),
                    "--dport", strconv.Itoa(svcInfo.Port()),
                    "-j", "REJECT",
                )
            }
    
            // Capture externalIPs.
            for _, externalIP := range svcInfo.ExternalIPStrings() {
                // If the "external" IP happens to be an IP that is local to this
                // machine, hold the local port open so no other process can open it
                // (because the socket might open but it would never work).
                if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(net.ParseIP(externalIP)) {
                    lp := utilproxy.LocalPort{
                        Description: "externalIP for " + svcNameString,
                        IP:          externalIP,
                        Port:        svcInfo.Port(),
                        Protocol:    protocol,
                    }
                    if proxier.portsMap[lp] != nil {
                        klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
                        replacementPortsMap[lp] = proxier.portsMap[lp]
                    } else {
                        socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6)
                        if err != nil {
                            msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err)
    
                            proxier.recorder.Eventf(
                                &v1.ObjectReference{
                                    Kind:      "Node",
                                    Name:      proxier.hostname,
                                    UID:       types.UID(proxier.hostname),
                                    Namespace: "",
                                }, v1.EventTypeWarning, err.Error(), msg)
                            klog.Error(msg)
                            continue
                        }
                        replacementPortsMap[lp] = socket
                    }
                }
    
                if hasEndpoints {
                    args = append(args[:0],
                        "-A", string(kubeServicesChain),
                        "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString),
                        "-m", protocol, "-p", protocol,
                        "-d", utilproxy.ToCIDR(net.ParseIP(externalIP)),
                        "--dport", strconv.Itoa(svcInfo.Port()),
                    )
    
                    destChain := svcXlbChain
                    // We have to SNAT packets to external IPs if externalTrafficPolicy is cluster.
                    if !(utilfeature.DefaultFeatureGate.Enabled(features.ExternalPolicyForExternalIP) && svcInfo.OnlyNodeLocalEndpoints()) {
                        destChain = svcChain
                        writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
                    }
    
                    // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
                    // nor from a local process to be forwarded to the service.
                    // This rule roughly translates to "all traffic from off-machine".
                    // This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later.
                    externalTrafficOnlyArgs := append(args,
                        "-m", "physdev", "!", "--physdev-is-in",
                        "-m", "addrtype", "!", "--src-type", "LOCAL")
                    writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", string(destChain))...)
                    dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
                    // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local.
                    // This covers cases like GCE load-balancers which get added to the local routing table.
                    writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(destChain))...)
                } else {
                    // No endpoints.
                    writeLine(proxier.filterRules,
                        "-A", string(kubeExternalServicesChain),
                        "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
                        "-m", protocol, "-p", protocol,
                        "-d", utilproxy.ToCIDR(net.ParseIP(externalIP)),
                        "--dport", strconv.Itoa(svcInfo.Port()),
                        "-j", "REJECT",
                    )
                }
            }
    
            // Capture load-balancer ingress.
            fwChain := svcInfo.serviceFirewallChainName
            for _, ingress := range svcInfo.LoadBalancerIPStrings() {
                if ingress != "" {
                    if hasEndpoints {
                        // create service firewall chain
                        if chain, ok := existingNATChains[fwChain]; ok {
                            writeBytesLine(proxier.natChains, chain)
                        } else {
                            writeLine(proxier.natChains, utiliptables.MakeChainLine(fwChain))
                        }
                        activeNATChains[fwChain] = true
                        // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
                        // This currently works for loadbalancers that preserves source ips.
                        // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
    
                        args = append(args[:0],
                            "-A", string(kubeServicesChain),
                            "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
                            "-m", protocol, "-p", protocol,
                            "-d", utilproxy.ToCIDR(net.ParseIP(ingress)),
                            "--dport", strconv.Itoa(svcInfo.Port()),
                        )
                        // jump to service firewall chain
                        writeLine(proxier.natRules, append(args, "-j", string(fwChain))...)
    
                        args = append(args[:0],
                            "-A", string(fwChain),
                            "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
                        )
    
                        // Each source match rule in the FW chain may jump to either the SVC or the XLB chain
                        chosenChain := svcXlbChain
                        // If we are proxying globally, we need to masquerade in case we cross nodes.
                        // If we are proxying only locally, we can retain the source IP.
                        if !svcInfo.OnlyNodeLocalEndpoints() {
                            writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
                            chosenChain = svcChain
                        }
    
                        if len(svcInfo.LoadBalancerSourceRanges()) == 0 {
                            // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain
                            writeLine(proxier.natRules, append(args, "-j", string(chosenChain))...)
                        } else {
                            // firewall filter based on each source range
                            allowFromNode := false
                            for _, src := range svcInfo.LoadBalancerSourceRanges() {
                                writeLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...)
                                // ignore error because it has been validated
                                _, cidr, _ := net.ParseCIDR(src)
                                if cidr.Contains(proxier.nodeIP) {
                                    allowFromNode = true
                                }
                            }
                            // generally, ip route rule was added to intercept request to loadbalancer vip from the
                            // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
                            // Need to add the following rule to allow request on host.
                            if allowFromNode {
                                writeLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(net.ParseIP(ingress)), "-j", string(chosenChain))...)
                            }
                        }
    
                        // If the packet was able to reach the end of firewall chain, then it did not get DNATed.
                        // It means the packet cannot go thru the firewall, then mark it for DROP
                        writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...)
                    } else {
                        // No endpoints.
                        writeLine(proxier.filterRules,
                            "-A", string(kubeServicesChain),
                            "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
                            "-m", protocol, "-p", protocol,
                            "-d", utilproxy.ToCIDR(net.ParseIP(ingress)),
                            "--dport", strconv.Itoa(svcInfo.Port()),
                            "-j", "REJECT",
                        )
                    }
                }
            }
    
            // Capture nodeports.  If we had more than 2 rules it might be
            // worthwhile to make a new per-service chain for nodeport rules, but
            // with just 2 rules it ends up being a waste and a cognitive burden.
            if svcInfo.NodePort() != 0 {
                // Hold the local port open so no other process can open it
                // (because the socket might open but it would never work).
                if len(nodeAddresses) == 0 {
                    continue
                }
    
                lps := make([]utilproxy.LocalPort, 0)
                for address := range nodeAddresses {
                    lp := utilproxy.LocalPort{
                        Description: "nodePort for " + svcNameString,
                        IP:          address,
                        Port:        svcInfo.NodePort(),
                        Protocol:    protocol,
                    }
                    if utilproxy.IsZeroCIDR(address) {
                        // Empty IP address means all
                        lp.IP = ""
                        lps = append(lps, lp)
                        // If we encounter a zero CIDR, then there is no point in processing the rest of the addresses.
                        break
                    }
                    lps = append(lps, lp)
                }
    
                // For ports on node IPs, open the actual port and hold it.
                for _, lp := range lps {
                    if proxier.portsMap[lp] != nil {
                        klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
                        replacementPortsMap[lp] = proxier.portsMap[lp]
                    } else if svcInfo.Protocol() != v1.ProtocolSCTP {
                        socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6)
                        if err != nil {
                            klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
                            continue
                        }
                        if lp.Protocol == "udp" {
                            // TODO: We might have multiple services using the same port, and this will clear conntrack for all of them.
                            // This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services.
                            // This only affects UDP connections, which are not common.
                            // See issue: https://github.com/kubernetes/kubernetes/issues/49881
                            err := conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
                            if err != nil {
                                klog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err)
                            }
                        }
                        replacementPortsMap[lp] = socket
                    }
                }
    
                if hasEndpoints {
                    args = append(args[:0],
                        "-A", string(kubeNodePortsChain),
                        "-m", "comment", "--comment", svcNameString,
                        "-m", protocol, "-p", protocol,
                        "--dport", strconv.Itoa(svcInfo.NodePort()),
                    )
                    if !svcInfo.OnlyNodeLocalEndpoints() {
                        // Nodeports need SNAT, unless they're local.
                        writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
                        // Jump to the service chain.
                        writeLine(proxier.natRules, append(args, "-j", string(svcChain))...)
                    } else {
                        // TODO: Make all nodePorts jump to the firewall chain.
                        // Currently we only create it for loadbalancers (#33586).
    
                        // Fix localhost martian source error
                        loopback := "127.0.0.0/8"
                        if isIPv6 {
                            loopback = "::1/128"
                        }
                        writeLine(proxier.natRules, append(args, "-s", loopback, "-j", string(KubeMarkMasqChain))...)
                        writeLine(proxier.natRules, append(args, "-j", string(svcXlbChain))...)
                    }
                } else {
                    // No endpoints.
                    writeLine(proxier.filterRules,
                        "-A", string(kubeExternalServicesChain),
                        "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
                        "-m", "addrtype", "--dst-type", "LOCAL",
                        "-m", protocol, "-p", protocol,
                        "--dport", strconv.Itoa(svcInfo.NodePort()),
                        "-j", "REJECT",
                    )
                }
            }
    
            if !hasEndpoints {
                continue
            }
    
            // Generate the per-endpoint chains.  We do this in multiple passes so we
            // can group rules together.
            // These two slices parallel each other - keep in sync
            endpoints = endpoints[:0]
            endpointChains = endpointChains[:0]
            var endpointChain utiliptables.Chain
            for _, ep := range allEndpoints {
                epInfo, ok := ep.(*endpointsInfo)
                if !ok {
                    klog.Errorf("Failed to cast endpointsInfo %q", ep.String())
                    continue
                }
    
                endpoints = append(endpoints, epInfo)
                endpointChain = epInfo.endpointChain(svcNameString, protocol)
                endpointChains = append(endpointChains, endpointChain)
    
                // Create the endpoint chain, retaining counters if possible.
                if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok {
                    writeBytesLine(proxier.natChains, chain)
                } else {
                    writeLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain))
                }
                activeNATChains[endpointChain] = true
            }
    
            // First write session affinity rules, if applicable.
            if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
                for _, endpointChain := range endpointChains {
                    args = append(args[:0],
                        "-A", string(svcChain),
                    )
                    args = proxier.appendServiceCommentLocked(args, svcNameString)
                    args = append(args,
                        "-m", "recent", "--name", string(endpointChain),
                        "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
                        "-j", string(endpointChain),
                    )
                    writeLine(proxier.natRules, args...)
                }
            }
    
            // Now write loadbalancing & DNAT rules.
            n := len(endpointChains)
            localEndpointChains := make([]utiliptables.Chain, 0)
            for i, endpointChain := range endpointChains {
                // Write ingress loadbalancing & DNAT rules only for services that request OnlyLocal traffic.
                if svcInfo.OnlyNodeLocalEndpoints() && endpoints[i].IsLocal {
                    localEndpointChains = append(localEndpointChains, endpointChains[i])
                }
    
                epIP := endpoints[i].IP()
                if epIP == "" {
                    // Error parsing this endpoint has been logged. Skip to next endpoint.
                    continue
                }
    
                // Balancing rules in the per-service chain.
                args = append(args[:0], "-A", string(svcChain))
                args = proxier.appendServiceCommentLocked(args, svcNameString)
                if i < (n - 1) {
                    // Each rule is a probabilistic match.
                    args = append(args,
                        "-m", "statistic",
                        "--mode", "random",
                        "--probability", proxier.probability(n-i))
                }
                // The final (or only if n == 1) rule is a guaranteed match.
                args = append(args, "-j", string(endpointChain))
                writeLine(proxier.natRules, args...)
    
                // Rules in the per-endpoint chain.
                args = append(args[:0], "-A", string(endpointChain))
                args = proxier.appendServiceCommentLocked(args, svcNameString)
                // Handle traffic that loops back to the originator with SNAT.
                writeLine(proxier.natRules, append(args,
                    "-s", utilproxy.ToCIDR(net.ParseIP(epIP)),
                    "-j", string(KubeMarkMasqChain))...)
                // Update client-affinity lists.
                if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
                    args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
                }
                // DNAT to final destination.
                args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].Endpoint)
                writeLine(proxier.natRules, args...)
            }
    
            // The logic below this applies only if this service is marked as OnlyLocal
            if !svcInfo.OnlyNodeLocalEndpoints() {
                continue
            }
    
            // First rule in the chain redirects all pod -> external VIP traffic to the
            // Service's ClusterIP instead. This happens whether or not we have local
            // endpoints; only if localDetector is implemented
            if proxier.localDetector.IsImplemented() {
                args = append(args[:0],
                    "-A", string(svcXlbChain),
                    "-m", "comment", "--comment",
                    `"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`,
                )
                writeLine(proxier.natRules, proxier.localDetector.JumpIfLocal(args, string(svcChain))...)
            }
    
            // Next, redirect all src-type=LOCAL -> LB IP to the service chain for externalTrafficPolicy=Local
            // This allows traffic originating from the host to be redirected to the service correctly,
            // otherwise traffic to LB IPs are dropped if there are no local endpoints.
            args = append(args[:0], "-A", string(svcXlbChain))
            writeLine(proxier.natRules, append(args,
                "-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s LB IP"`, svcNameString),
                "-m", "addrtype", "--src-type", "LOCAL", "-j", string(KubeMarkMasqChain))...)
            writeLine(proxier.natRules, append(args,
                "-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString),
                "-m", "addrtype", "--src-type", "LOCAL", "-j", string(svcChain))...)
    
            numLocalEndpoints := len(localEndpointChains)
            if numLocalEndpoints == 0 {
                // Blackhole all traffic since there are no local endpoints
                args = append(args[:0],
                    "-A", string(svcXlbChain),
                    "-m", "comment", "--comment",
                    fmt.Sprintf(`"%s has no local endpoints"`, svcNameString),
                    "-j",
                    string(KubeMarkDropChain),
                )
                writeLine(proxier.natRules, args...)
            } else {
                // First write session affinity rules only over local endpoints, if applicable.
                if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
                    for _, endpointChain := range localEndpointChains {
                        writeLine(proxier.natRules,
                            "-A", string(svcXlbChain),
                            "-m", "comment", "--comment", svcNameString,
                            "-m", "recent", "--name", string(endpointChain),
                            "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
                            "-j", string(endpointChain))
                    }
                }
    
                // Setup probability filter rules only over local endpoints
                for i, endpointChain := range localEndpointChains {
                    // Balancing rules in the per-service chain.
                    args = append(args[:0],
                        "-A", string(svcXlbChain),
                        "-m", "comment", "--comment",
                        fmt.Sprintf(`"Balancing rule %d for %s"`, i, svcNameString),
                    )
                    if i < (numLocalEndpoints - 1) {
                        // Each rule is a probabilistic match.
                        args = append(args,
                            "-m", "statistic",
                            "--mode", "random",
                            "--probability", proxier.probability(numLocalEndpoints-i))
                    }
                    // The final (or only if n == 1) rule is a guaranteed match.
                    args = append(args, "-j", string(endpointChain))
                    writeLine(proxier.natRules, args...)
                }
            }
        }
    
        // Delete chains no longer in use.
        for chain := range existingNATChains {
            if !activeNATChains[chain] {
                chainString := string(chain)
                if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") {
                    // Ignore chains that aren't ours.
                    continue
                }
                // We must (as per iptables) write a chain-line for it, which has
                // the nice effect of flushing the chain.  Then we can remove the
                // chain.
                writeBytesLine(proxier.natChains, existingNATChains[chain])
                writeLine(proxier.natRules, "-X", chainString)
            }
        }
    
        // Finally, tail-call to the nodeports chain.  This needs to be after all
        // other service portal rules.
        isIPv6 := proxier.iptables.IsIPv6()
        for address := range nodeAddresses {
            // TODO(thockin, m1093782566): If/when we have dual-stack support we will want to distinguish v4 from v6 zero-CIDRs.
            if utilproxy.IsZeroCIDR(address) {
                args = append(args[:0],
                    "-A", string(kubeServicesChain),
                    "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
                    "-m", "addrtype", "--dst-type", "LOCAL",
                    "-j", string(kubeNodePortsChain))
                writeLine(proxier.natRules, args...)
                // Nothing else matters after the zero CIDR.
                break
            }
            // Ignore IP addresses with incorrect version
            if isIPv6 && !utilnet.IsIPv6String(address) || !isIPv6 && utilnet.IsIPv6String(address) {
                klog.Errorf("IP address %s has incorrect IP version", address)
                continue
            }
            // create nodeport rules for each IP one by one
            args = append(args[:0],
                "-A", string(kubeServicesChain),
                "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
                "-d", address,
                "-j", string(kubeNodePortsChain))
            writeLine(proxier.natRules, args...)
        }
    
        // Drop the packets in INVALID state, which would potentially cause
        // unexpected connection reset.
        // https://github.com/kubernetes/kubernetes/issues/74839
        writeLine(proxier.filterRules,
            "-A", string(kubeForwardChain),
            "-m", "conntrack",
            "--ctstate", "INVALID",
            "-j", "DROP",
        )
    
        // If the masqueradeMark has been added then we want to forward that same
        // traffic, this allows NodePort traffic to be forwarded even if the default
        // FORWARD policy is not accept.
        writeLine(proxier.filterRules,
            "-A", string(kubeForwardChain),
            "-m", "comment", "--comment", `"kubernetes forwarding rules"`,
            "-m", "mark", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark),
            "-j", "ACCEPT",
        )
    
        // The following two rules ensure the traffic after the initial packet
        // accepted by the "kubernetes forwarding rules" rule above will be
        // accepted.
        writeLine(proxier.filterRules,
            "-A", string(kubeForwardChain),
            "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`,
            "-m", "conntrack",
            "--ctstate", "RELATED,ESTABLISHED",
            "-j", "ACCEPT",
        )
        writeLine(proxier.filterRules,
            "-A", string(kubeForwardChain),
            "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`,
            "-m", "conntrack",
            "--ctstate", "RELATED,ESTABLISHED",
            "-j", "ACCEPT",
        )
    
        // Write the end-of-table markers.
        writeLine(proxier.filterRules, "COMMIT")
        writeLine(proxier.natRules, "COMMIT")
    
        // Sync rules.
        // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table
        proxier.iptablesData.Reset()
        proxier.iptablesData.Write(proxier.filterChains.Bytes())
        proxier.iptablesData.Write(proxier.filterRules.Bytes())
        proxier.iptablesData.Write(proxier.natChains.Bytes())
        proxier.iptablesData.Write(proxier.natRules.Bytes())
    
        klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
        err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
        if err != nil {
            klog.Errorf("Failed to execute iptables-restore: %v", err)
            metrics.IptablesRestoreFailuresTotal.Inc()
            // Revert new local ports.
            klog.V(2).Infof("Closing local ports after iptables-restore failure")
            utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
            return
        }
        success = true
    
        for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
            for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
                latency := metrics.SinceInSeconds(lastChangeTriggerTime)
                metrics.NetworkProgrammingLatency.Observe(latency)
                klog.V(4).Infof("Network programming of %s took %f seconds", name, latency)
            }
        }
    
        // Close old local ports and save new ones.
        for k, v := range proxier.portsMap {
            if replacementPortsMap[k] == nil {
                v.Close()
            }
        }
        proxier.portsMap = replacementPortsMap
    
        if proxier.healthzServer != nil {
            proxier.healthzServer.Updated()
        }
        metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
    
        // Update service healthchecks.  The endpoints list might include services that are
        // not "OnlyLocal", but the services list will not, and the serviceHealthServer
        // will just drop those endpoints.
        if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
            klog.Errorf("Error syncing healthcheck services: %v", err)
        }
        if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
            klog.Errorf("Error syncing healthcheck endpoints: %v", err)
        }
    
        // Finish housekeeping.
        // TODO: these could be made more consistent.
        for _, svcIP := range staleServices.UnsortedList() {
            if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
                klog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
            }
        }
        proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
    }
    

    pkg/util/async/bounded_frequency_runner.go 中

    func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
        timer := &realTimer{timer: time.NewTimer(0)} // will tick immediately
        <-timer.C()                                  // consume the first tick
        return construct(name, fn, minInterval, maxInterval, burstRuns, timer)
    }
    
    func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner {
        if maxInterval < minInterval {
            panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, maxInterval, minInterval))
        }
        if timer == nil {
            panic(fmt.Sprintf("%s: timer must be non-nil", name))
        }
    
        bfr := &BoundedFrequencyRunner{
            name:        name,
            fn:          fn,
            minInterval: minInterval,
            maxInterval: maxInterval,
            run:         make(chan struct{}, 1),
            retry:       make(chan struct{}, 1),
            timer:       timer,
        }
        if minInterval == 0 {
            bfr.limiter = nullLimiter{}
        } else {
            // allow burst updates in short succession
            qps := float32(time.Second) / float32(minInterval)
            bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)
        }
        return bfr
    }
    
    func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
        klog.V(3).Infof("%s Loop running", bfr.name)
        bfr.timer.Reset(bfr.maxInterval)
        for {
            select {
            case <-stop:
                bfr.stop()
                klog.V(3).Infof("%s Loop stopping", bfr.name)
                return
            case <-bfr.timer.C():
                bfr.tryRun()
            case <-bfr.run:
                bfr.tryRun()
            case <-bfr.retry:
                bfr.doRetry()
            }
        }
    }
    
    func (bfr *BoundedFrequencyRunner) Run() {
        // If it takes a lot of time to run the underlying function, noone is really
        // processing elements from <run> channel. So to avoid blocking here on the
        // putting element to it, we simply skip it if there is already an element
        // in it.
        select {
        case bfr.run <- struct{}{}:
        default:
        }
    }
    
    func (bfr *BoundedFrequencyRunner) tryRun() {
        bfr.mu.Lock()
        defer bfr.mu.Unlock()
    
        if bfr.limiter.TryAccept() {
            // We're allowed to run the function right now.
            bfr.fn()
            bfr.lastRun = bfr.timer.Now()
            bfr.timer.Stop()
            bfr.timer.Reset(bfr.maxInterval)
            klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
            return
        }
    
        // It can't run right now, figure out when it can run next.
        elapsed := bfr.timer.Since(bfr.lastRun)   // how long since last run
        nextPossible := bfr.minInterval - elapsed // time to next possible run
        nextScheduled := bfr.timer.Remaining()    // time to next scheduled run
        klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)
    
        // It's hard to avoid race conditions in the unit tests unless we always reset
        // the timer here, even when it's unchanged
        if nextPossible < nextScheduled {
            nextScheduled = nextPossible
        }
        bfr.timer.Stop()
        bfr.timer.Reset(nextScheduled)
    }
    
    
    func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyncPeriod time.Duration) *EndpointsConfig {
        result := &EndpointsConfig{
            listerSynced: endpointsInformer.Informer().HasSynced,
        }
    
        endpointsInformer.Informer().AddEventHandlerWithResyncPeriod(
            cache.ResourceEventHandlerFuncs{
                AddFunc:    result.handleAddEndpoints,
                UpdateFunc: result.handleUpdateEndpoints,
                DeleteFunc: result.handleDeleteEndpoints,
            },
            resyncPeriod,
        )
    
        return result
    }
    
    func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) {
        c.eventHandlers = append(c.eventHandlers, handler)
    }
    
    func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
        klog.Info("Starting endpoints config controller")
    
        if !cache.WaitForNamedCacheSync("endpoints config", stopCh, c.listerSynced) {
            return
        }
    
        for i := range c.eventHandlers {
            klog.V(3).Infof("Calling handler.OnEndpointsSynced()")
            c.eventHandlers[i].OnEndpointsSynced()
        }
    }
    

    相关文章

      网友评论

          本文标题:k8s 之 kube-proxy 源码简单分析

          本文链接:https://www.haomeiwen.com/subject/gitnhktx.html