美文网首页
kube-proxy分析

kube-proxy分析

作者: 陈先生_9e91 | 来源:发表于2018-09-27 12:20 被阅读0次

    kube-proxy分析

    service是一个反向代理,为后端Pod提供统一入口。可以通过cluster IP 、 dns、Node IP进行访问。kube-proxy帮助service完成具体操作,service-controller基本没干什么事情。

    iptables

    默认采用iptables来实现负载均衡,Headless service不需要iptables实现负载均衡,直接就可以获得Pod IP,所以没有相应的iptables规则。

    iptables -t nat -S

    ClusterIP

    // 在PREROUTING & OUTPUT之后跳转KUBE-SERVICES链
    -A PREROUTING -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
    -A OUTPUT -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
    
    // 发往clusterIP & nodePort的流量跳转KUBE-SVC-VWFCBIJMABKDN6RN链
    -A KUBE-SERVICES -d 169.169.197.95/32 -p tcp -m comment --comment "default/nt-svc: cluster IP" -m tcp --dport 23423 -j KUBE-SVC-VWFCBIJMABKDN6RN
    
    // 1/replicas概率发给其中一个pod-10.1.67.6
    -A KUBE-SVC-VWFCBIJMABKDN6RN -m statistic --mode random --probability 0.50000000000 -j KUBE-SEP-ISPC6NJY64FLTZW5
    -A KUBE-SEP-ISPC6NJY64FLTZW5 -p tcp -m tcp -j DNAT --to-destination 10.1.67.6:23423
    -A KUBE-SEP-ISPC6NJY64FLTZW5 -s 10.1.67.6/32 -j KUBE-MARK-MASQ
    
    // 1/replicas概率发给其中一个pod-10.1.69.2
    -A KUBE-SVC-VWFCBIJMABKDN6RN -j KUBE-SEP-7KPB6CDAAPY2W5YP
    -A KUBE-SEP-7KPB6CDAAPY2W5YP -p tcp -m tcp -j DNAT --to-destination 10.1.69.2:23423
    -A KUBE-SEP-7KPB6CDAAPY2W5YP -s 10.1.69.2/32 -j KUBE-MARK-MASQ
    

    NodePort

    与clusterIP不同,nodePort通过每个node上的kube-proxy作为代理,对外提供服务。所以nodePort需要增加一条iptables规则,将发给nodePort的流量跳转给svc。

    -A KUBE-NODEPORTS -p tcp -m comment --comment "mind-automl/mongo:" -m tcp --dport 27017 -j KUBE-SVC-3SO264LNGXICPHPS
    

    问题

    华为云在 K8S 大规模场景下的 Service 性能优化实践

    • 规则线性匹配: 时间复杂度O(N);
    • 规则更新时延:非增量式,哪怕修改一条规则,也是整体修改Netfilter规则表;
    • 可扩展性:当系统存在大量iptables规则链是,增加/删除规则会出现kernel lock;
    • 性能非常差。

    code

    kube-proxy watch Service & Endpoint,然后管理iptables路由规则。

    k8s.io\kubernetes\cmd\kube-proxy\app\server.go

    // Run runs the specified ProxyServer.  This should never exit (unless CleanupAndExit is set).
    func (s *ProxyServer) Run() error { 
        informerFactory := informers.NewSharedInformerFactory(s.Client, s.ConfigSyncPeriod)
    
        // Create configs (i.e. Watches for Services and Endpoints)
        // Note: RegisterHandler() calls need to happen before creation of Sources because sources
        // only notify on changes, and the initial update (on process start) may be lost if no handlers
        // are registered yet.
        serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
        serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
        go serviceConfig.Run(wait.NeverStop)
    
        endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
        endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler)
        go endpointsConfig.Run(wait.NeverStop)
    
        // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
        // functions must configure their shared informer event handlers first.
        go informerFactory.Start(wait.NeverStop)
    

    重点关注ServiceEventHandlerEndpointsEventHandler

    k8s.io\kubernetes\pkg\proxy\config\config.go

    // ServiceHandler is an abstract interface of objects which receive
    // notifications about service object changes.
    type ServiceHandler interface {
        // OnServiceAdd is called whenever creation of new service object
        // is observed.
        OnServiceAdd(service *v1.Service)
        // OnServiceUpdate is called whenever modification of an existing
        // service object is observed.
        OnServiceUpdate(oldService, service *v1.Service)
        // OnServiceDelete is called whenever deletion of an existing service
        // object is observed.
        OnServiceDelete(service *v1.Service)
        // OnServiceSynced is called once all the initial even handlers were
        // called and the state is fully propagated to local cache.
        OnServiceSynced()
    }
    
    // EndpointsHandler is an abstract interface of objects which receive
    // notifications about endpoints object changes.
    type EndpointsHandler interface {
        // OnEndpointsAdd is called whenever creation of new endpoints object
        // is observed.
        OnEndpointsAdd(endpoints *v1.Endpoints)
        // OnEndpointsUpdate is called whenever modification of an existing
        // endpoints object is observed.
        OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints)
        // OnEndpointsDelete is called whever deletion of an existing endpoints
        // object is observed.
        OnEndpointsDelete(endpoints *v1.Endpoints)
        // OnEndpointsSynced is called once all the initial event handlers were
        // called and the state is fully propagated to local cache.
        OnEndpointsSynced()
    }
    

    这两个接口封装了svc 和 ep的处理函数,由于我们这边采用iptables,所以他们的具体实现是k8s.io\kubernetes\pkg\proxy\iptables\proxier.go

    // Proxier is an iptables based proxy for connections between a localhost:lport
    // and services that provide the actual backends.
    type Proxier struct {
        // endpointsChanges and serviceChanges contains all changes to endpoints and
        // services that happened since iptables was synced. For a single object,
        // changes are accumulated, i.e. previous is state from before all of them,
        // current is state after applying all of those.
        endpointsChanges *proxy.EndpointChangeTracker
        serviceChanges   *proxy.ServiceChangeTracker
    
        mu           sync.Mutex // protects the following fields
        serviceMap   proxy.ServiceMap
        endpointsMap proxy.EndpointsMap
        portsMap     map[utilproxy.LocalPort]utilproxy.Closeable
        // endpointsSynced and servicesSynced are set to true when corresponding
        // objects are synced after startup. This is used to avoid updating iptables
        // with some partial data after kube-proxy restart.
        endpointsSynced bool
        servicesSynced  bool
        initialized     int32
        syncRunner      *async.BoundedFrequencyRunner // governs calls to syncProxyRules
    
        // These are effectively const and do not need the mutex to be held.
        iptables       utiliptables.Interface
        masqueradeAll  bool
        masqueradeMark string
        exec           utilexec.Interface
        clusterCIDR    string
        hostname       string
        nodeIP         net.IP
        portMapper     utilproxy.PortOpener
        recorder       record.EventRecorder
        healthChecker  healthcheck.Server
        healthzServer  healthcheck.HealthzUpdater
    
        // Since converting probabilities (floats) to strings is expensive
        // and we are using only probabilities in the format of 1/n, we are
        // precomputing some number of those and cache for future reuse.
        precomputedProbabilities []string
    
        // The following buffers are used to reuse memory and avoid allocations
        // that are significantly impacting performance.
        iptablesData *bytes.Buffer
        filterChains *bytes.Buffer
        filterRules  *bytes.Buffer
        natChains    *bytes.Buffer
        natRules     *bytes.Buffer
    
        // endpointChainsNumber is the total amount of endpointChains across all
        // services that we will generate (it is computed at the beginning of
        // syncProxyRules method). If that is large enough, comments in some
        // iptable rules are dropped to improve performance.
        endpointChainsNumber int
    
        // Values are as a parameter to select the interfaces where nodeport works.
        nodePortAddresses []string
        // networkInterfacer defines an interface for several net library functions.
        // Inject for test purpose.
        networkInterfacer utilproxy.NetworkInterfacer
    }
    
    func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
        proxier.OnServiceUpdate(nil, service)
    }
    
    func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
        if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
            proxier.syncRunner.Run()
        }
    }
    
    func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
        proxier.OnServiceUpdate(service, nil)
    }
    

    主要看Update方法,其中serviceChanges保存了service的变化,然后通过syncRunner进行同步。

    proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
    

    syncRunner是一个定时任务,除了手动触发以外还支持定时触发。具体的任务就是syncProxyRules方法。

    // This is where all of the iptables-save/restore calls happen.
    // The only other iptables rules are those that are setup in iptablesInit()
    // This assumes proxier.mu is NOT held
    func (proxier *Proxier) syncProxyRules() {
        ...
    }
    

    syncProxyRules方法太长(700+)就不扣细节了。

    相关文章

      网友评论

          本文标题:kube-proxy分析

          本文链接:https://www.haomeiwen.com/subject/bvkeoftx.html