美文网首页Docker容器
k8s网络系列学习笔记之三----kube-proxy原理分析

k8s网络系列学习笔记之三----kube-proxy原理分析

作者: 何约什 | 来源:发表于2019-03-06 09:24 被阅读12次

    本篇主要从代码的角度分析一下基于ipvs模式的实现原理。

    ProxyServer的创建

    系统代码基于Cobra实现,前面的代码逻辑很清晰,就不做分析,kube-proxy的关键的对象是ProxyServer,初始化的过程就是升恒ProxyServer对象,并执行ProxyServer.Run()。
    下面我们先看看ProxyServer的定义:

    type ProxyServer struct {
        Client                 clientset.Interface
        EventClient            v1core.EventsGetter
        IptInterface           utiliptables.Interface
        IpvsInterface          utilipvs.Interface
        IpsetInterface         utilipset.Interface
        execer                 exec.Interface
        Proxier                proxy.ProxyProvider
        Broadcaster            record.EventBroadcaster
        Recorder               record.EventRecorder
        ConntrackConfiguration kubeproxyconfig.KubeProxyConntrackConfiguration
        Conntracker            Conntracker // if nil, ignored
        ProxyMode              string
        NodeRef                *v1.ObjectReference
        CleanupAndExit         bool
        CleanupIPVS            bool
        MetricsBindAddress     string
        EnableProfiling        bool
        OOMScoreAdj            *int32
        ResourceContainer      string
        ConfigSyncPeriod       time.Duration
        ServiceEventHandler    config.ServiceHandler
        EndpointsEventHandler  config.EndpointsHandler
        HealthzServer          *healthcheck.HealthzServer
    }
    

    底层命令接口

    下面分析一下这个结构体的成员,client和EventClient就不啰嗦了,iptInterface、IpvsInterface 、IpsetInterface、execer这四个变量对应到底层命令:iptables,ipvsadm, ipset。

        iptInterface = utiliptables.New(execer, dbus, protocol) // 用于修改iptables
        ipvsInterface = utilipvs.New(execer) // 用于操作ipvs配置
        kernelHandler = ipvs.NewLinuxKernelHandler()  // 用于获取ipvs相关的内核模块信息,没有配置相关的内核模块,将会降级为iptables代理模式
        ipsetInterface = utilipset.New(execer) // 执行ipset命令,维护ip地址组
    

    这几个成员的作用如注释中锁描述的,关键的成员是Proxyier,kube-proxy支持user space、iptables和ipvs三种代理模式,不同代理模式使用不同的Proxier实例。
    如下所示,用判断kube-proxy具体采用的模式。

    proxyMode := getProxyMode(string(config.Mode), iptInterface, kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{})
    

    config.Mode是在启动kube-proxy进程时配置的模式,下面是kube-proxy的一个配置文件例子,其中mode:"ipvs"描述期望采用ipvs模式。

        apiVersion: kubeproxy.config.k8s.io/v1alpha1
        bindAddress: 0.0.0.0
        clientConnection:
          acceptContentTypes: ""
          burst: 10
          contentType: application/vnd.kubernetes.protobuf
          kubeconfig: /var/lib/kube-proxy/kubeconfig.conf
          qps: 5
        clusterCIDR: 10.244.0.0/16
        configSyncPeriod: 15m0s
        conntrack:
          max: null
          maxPerCore: 32768
          min: 131072
          tcpCloseWaitTimeout: 1h0m0s
          tcpEstablishedTimeout: 24h0m0s
        enableProfiling: false
        healthzBindAddress: 0.0.0.0:10256
        hostnameOverride: ""
        iptables:
          masqueradeAll: false
          masqueradeBit: 14
          minSyncPeriod: 0s
          syncPeriod: 30s
        ipvs:
          excludeCIDRs: null
          minSyncPeriod: 0s
          scheduler: ""
          syncPeriod: 30s
        kind: KubeProxyConfiguration
        metricsBindAddress: 127.0.0.1:10249
        mode: "ipvs"
        nodePortAddresses: null
        oomScoreAdj: -999
        portRange: ""
        resourceContainer: /kube-proxy
        udpIdleTimeout: 250ms
    

    虽然这里配置了ipvs模式,但是是不是最终采用ipvs,还要检查一下是否满足条件,可参考学习笔记2。具体的代码就不展示了。如果最终确定采用ipvs模式,那么Proxier成员为ipvs.Proxyer类型。

    ProxyServer的启动

      1. 启动EventBroadcaster,事件同步到API-Server
            // 创建eventBroadcaster
        hostname := utilnode.GetHostname(config.HostnameOverride)
        eventBroadcaster := record.NewBroadcaster()
        recorder := eventBroadcaster.NewRecorder(scheme, v1.EventSource{Component: "kube-proxy", Host: hostname})
    
            // 启动时间记录eventBroadcaster到Api-Server
        s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")})
    
      1. 启动health服务接口

    代码逻辑比较清晰,就不列举了,我们可以通过下面的工具来检查health信息。

    root@bogon2:~/k8s-yml-tests# curl http://127.0.0.1:10256/healthz
    {"lastUpdated": "2019-02-22 07:10:11.258317649 +0000 UTC m=+109458.567293187","currentTime": "2019-02-22 07:10:37.209843398 +0000 UTC m=+109484.518818898"}
    
      1. 启动metrics服务接口
        启动好以后,可以查看metrics数据,这是通过promethues工具统计的数据,可以由promethues收集后,进行汇总展示。
    root@bogon2:~/k8s-yml-tests# curl http://127.0.0.1:10249/metrics
    # HELP apiserver_audit_event_total Counter of audit events generated and sent to the audit backend.
    # TYPE apiserver_audit_event_total counter
    apiserver_audit_event_total 0
    # HELP apiserver_audit_requests_rejected_total Counter of apiserver requests rejected due to an error in audit logging backend.
    # TYPE apiserver_audit_requests_rejected_total counter
    apiserver_audit_requests_rejected_total 0
    ......
    rest_client_request_latency_seconds_sum{url="https://172.25.39.6:6443/%7Bprefix%7D",verb="POST"} 0.007607341
    rest_client_request_latency_seconds_count{url="https://172.25.39.6:6443/%7Bprefix%7D",verb="POST"} 1
    # HELP rest_client_requests_total Number of HTTP requests, partitioned by status code, method, and host.
    # TYPE rest_client_requests_total counter
    rest_client_requests_total{code="200",host="172.25.39.6:6443",method="GET"} 490
    rest_client_requests_total{code="201",host="172.25.39.6:6443",method="POST"} 1
    
      1. conntracker参数维护
        conntracker跟踪并且记录连接状态。Linux为每一个经过网络堆栈的数据包,生成一个新的连接记录项 (Connection entry)。此后,所有属于此连接的数据包都被唯一地分配给这个连接,并标识连接的状态。连接跟踪是防火墙模块的状态检测的基础,同时也是地址转换中实 现SNAT和DNAT的前提。

    那么Netfilter又是如何生成连接记录项的呢?每一个数据,都有“来源”与“目的”主机,发起连接的主机称为“来源”,响应“来源”的请求的主机即 为目的,所谓生成记录项,就是对每一个这样的连接的产生、传输及终止进行跟踪记录。由所有记录项产生的表,即称为连接跟踪表。

    kube-proxy通过结构体KubeProxyContrackConfiguration来设置conntrack参数信息,对应的结构如下所示:

    
    // KubeProxyConntrackConfiguration contains conntrack settings for
    // the Kubernetes proxy server.
    type KubeProxyConntrackConfiguration struct {
        // max is the maximum number of NAT connections to track (0 to
        // leave as-is).  This takes precedence over maxPerCore and min.
        Max *int32
        // maxPerCore is the maximum number of NAT connections to track
        // per CPU core (0 to leave the limit as-is and ignore min).
        MaxPerCore *int32
        // min is the minimum value of connect-tracking records to allocate,
        // regardless of maxPerCore (set maxPerCore=0 to leave the limit as-is).
        Min *int32
        // tcpEstablishedTimeout is how long an idle TCP connection will be kept open
        // (e.g. '2s').  Must be greater than 0 to set.
        TCPEstablishedTimeout *metav1.Duration
        // tcpCloseWaitTimeout is how long an idle conntrack entry
        // in CLOSE_WAIT state will remain in the conntrack
        // table. (e.g. '60s'). Must be greater than 0 to set.
        TCPCloseWaitTimeout *metav1.Duration
    }
    

    conntracker的设置是由ProxyServer中的Conntracker成员负责设置,在linux中,具体的结构为:type realConntracker struct{},下面设设置Max参数的代码:

    func (rct realConntracker) SetMax(max int) error {
        if err := rct.setIntSysCtl("nf_conntrack_max", max); err != nil {
            return err
        }
        glog.Infof("Setting nf_conntrack_max to %d", max)
    
        // Linux does not support writing to /sys/module/nf_conntrack/parameters/hashsize
        // when the writer process is not in the initial network namespace
        // (https://github.com/torvalds/linux/blob/v4.10/net/netfilter/nf_conntrack_core.c#L1795-L1796).
        // Usually that's fine. But in some configurations such as with github.com/kinvolk/kubeadm-nspawn,
        // kube-proxy is in another netns.
        // Therefore, check if writing in hashsize is necessary and skip the writing if not.
        hashsize, err := readIntStringFile("/sys/module/nf_conntrack/parameters/hashsize")
        if err != nil {
            return err
        }
        if hashsize >= (max / 4) {
            return nil
        }
    
        // sysfs is expected to be mounted as 'rw'. However, it may be
        // unexpectedly mounted as 'ro' by docker because of a known docker
        // issue (https://github.com/docker/docker/issues/24000). Setting
        // conntrack will fail when sysfs is readonly. When that happens, we
        // don't set conntrack hashsize and return a special error
        // readOnlySysFSError here. The caller should deal with
        // readOnlySysFSError differently.
        writable, err := isSysFSWritable()
        if err != nil {
            return err
        }
        if !writable {
            return readOnlySysFSError
        }
        // TODO: generify this and sysctl to a new sysfs.WriteInt()
        glog.Infof("Setting conntrack hashsize to %d", max/4)
        return writeIntStringFile("/sys/module/nf_conntrack/parameters/hashsize", max/4)
    }
    

    从代码中可以看出,通过sysctl修改/proc/sys/net/netfilter/nf_conntrack_max文件中的参数值,然后通过代码直接修改/sys/module/nf_conntrack/parameters/hashsize的值为max的1/4。conntrack-tcp-timeout-established与–conntrack-tcp-timeout-close-wait参数的设置同理,最后对应的文件路径分别为/proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established和/proc/sys/net/netfilter/nf_conntrack_tcp_timeout_close_wait。

    这些参数会影响后面的网络连接,所以设置的时候要慎重,最好与系统规划时的配置一致,设置过小,可能会导致nf_conntrack:table full,dropping packet的错误。

      1. 侦听Service和Endpoints的变化

    这里初始化了serviceConfig和endpointsConfig实例,并启动它们,如下所示:

          serviceConfig := config.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), s.ConfigSyncPeriod)
        serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
        go serviceConfig.Run(wait.NeverStop)
    
        endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), s.ConfigSyncPeriod)
        endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler)
        go endpointsConfig.Run(wait.NeverStop)
    
        // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
        // functions must configure their shared informer event handlers first.
        go informerFactory.Start(wait.NeverStop)
        // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
        // functions must configure their shared informer event handlers first.
        go informerFactory.Start(wait.NeverStop)
    

    serviceConfig和endpointsConfig注册的事件对象实际都是proxier,也就是前面初始化的ipvs.Proxier实例对象。serviceConfig和endpointsConfig的Run逻辑是等待相应的Infomer同步完成后,回调Proxier的OnServiceSynced和OnEndpointsSynced方法。

    相应对象的事件回调方法,在ServiceConfig和EndpointsConfig对象的构建中完成,如下所示:

    func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
        result := &ServiceConfig{
            lister:       serviceInformer.Lister(),
            listerSynced: serviceInformer.Informer().HasSynced,
        }
            // 这里注册回调事件
        serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
            cache.ResourceEventHandlerFuncs{
                AddFunc:    result.handleAddService,
                UpdateFunc: result.handleUpdateService,
                DeleteFunc: result.handleDeleteService,
            },
            resyncPeriod,
        )
    
        return result
    }
    

    核心的业务逻辑主要是就是对于各种事件的监听和处理,后面专门进行分析。

      1. 启动Proxier的循环处理请求

    循环处理的驱动有事件驱动和超时驱动两种,代码如下所示:

    // SyncLoop runs periodic work.  This is expected to run as a goroutine or as the main loop of the app.  It does not return.
    func (proxier *Proxier) SyncLoop() {
        // Update healthz timestamp at beginning in case Sync() never succeeds.
        if proxier.healthzServer != nil {
            proxier.healthzServer.UpdateTimestamp()
        }
        proxier.syncRunner.Loop(wait.NeverStop)
    }
    
    
    // Loop handles the periodic timer and run requests.  This is expected to be
    // called as a goroutine.
    func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
        glog.V(3).Infof("%s Loop running", bfr.name)
        bfr.timer.Reset(bfr.maxInterval)
        for {
            select {
            case <-stop:
                bfr.stop()
                glog.V(3).Infof("%s Loop stopping", bfr.name)
                return
            case <-bfr.timer.C():
                bfr.tryRun()
            case <-bfr.run:
                bfr.tryRun()
            }
        }
    }
    

    有Service、Endpoints事件或者超时时,都会调用BoundedFrequencyRunner.tryRun方法,BoundedFrequencyRunner的结构体如下所示:

    // BoundedFrequencyRunner manages runs of a user-provided function.
    // See NewBoundedFrequencyRunner for examples.
    type BoundedFrequencyRunner struct {
        name        string        // the name of this instance
        minInterval time.Duration // the min time between runs, modulo bursts
        maxInterval time.Duration // the max time between runs
    
        run chan struct{} // try an async run
    
        mu      sync.Mutex  // guards runs of fn and all mutations
        fn      func()      // function to run
        lastRun time.Time   // time of last run
        timer   timer       // timer for deferred runs
        limiter rateLimiter // rate limiter for on-demand runs
    }
    

    在tryRun方法中,会调用fn成员函数,该成员函数为proxyier.syncProxyRules方法。在后面的处理中会单独分析这个方法。

    核心业务逻辑----更新代理规则

    在上一章节,最后两个步骤的处理是核心业务逻辑的,第5步负责注册事件回调,在事件回调方法中,驱动第6步中的逻辑的(syncProxyRules方法)处理。

    事件回调函数

    在看事件回调函数之前,我们先来看一下Proxier的两个成员:

    type Proxier struct {
        // endpointsChanges and serviceChanges contains all changes to endpoints and
        // services that happened since last syncProxyRules call. For a single object,
        // changes are accumulated, i.e. previous is state from before all of them,
        // current is state after applying all of those.
        endpointsChanges endpointsChangeMap //  endpoints变更记录
        serviceChanges   serviceChangeMap       // service变更记录
    
        serviceMap   proxyServiceMap                 // service记录
        endpointsMap proxyEndpointsMap           // endpoints记录
        portsMap     map[utilproxy.LocalPort]utilproxy.Closeable
        ......
    }
    

    endpointsChanges和serviceChanges分别用于记录service和endpoints的变更记录信息,而serviceMap、endpointsMap和portsMap则记录了实际的服务、EP和端口等信息内容,也就是所有的变更操作,都会更新到这些成员中。

    事件回调方法OnServiceAdd的代码如下:

    // OnServiceAdd is called whenever creation of new service object is observed.
    func (proxier *Proxier) OnServiceAdd(service *api.Service) {
        namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
        if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() {
            proxier.syncRunner.Run()
        }
    }
    

    在事件回调方法中,主要完成两个工作:

    • 更新变更记录
    • 驱动runner运行,做实际的规则变更处理
      这是一种异步的驱动逻辑,可以把同一时间大量的变更处理进行合并后,批量进行处理。

    更新代理规则的处理

    这里讲只分析最常用的ClusterIP模式的Service的规则处理部分。

    前面列举过Proxier结构的核心变量,前面几次对Service和Endpoints资源的侦听函数,会实时监测到Service和Endpoints(Pod)的变化,把变化的内容更新到Proxyier的endpointsChanges和serviceChanges成员后,触发更新代理规则的动作。
    更新代理规则这块逻辑在Proxier.syncProxyRules中完成,下面将分步骤来进行说明:

      1. 检查确定真正需要进行的变更内容

    K8S事件机制能够保证通知的幂等性,收到service和endpoints的变更内容,但是有可能变更的内容是一些周边资料信息,那这些不会影响实际的代理规则,同时也难保事件的重复触发,所以首先需要排除这些情况,把changes与实际的内存中存储的数据进行对比,得出真正的有意义的变化内容,确保我们后续的操作的内容是真正需要进行规则变更的。

    整个过程是通过updateServiceMap和updateEndpointsMap来进行检查的。

    func (proxier *Proxier) syncProxyRules() {
        proxier.mu.Lock()
        defer proxier.mu.Unlock()
            ......
        serviceUpdateResult := updateServiceMap(
            proxier.serviceMap, &proxier.serviceChanges)
        endpointUpdateResult := updateEndpointsMap(
            proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)
            ......
    }
    
      1. 准备安装IPtables规则

    首先介绍一下iptables中的INPUT、FORWARD等规则链和规则,
    在处理各种数据包时,根据防火墙规则的不同介入时机,iptables供涉及5种默认规则链,从应用时间点的角度理解这些链:

    INPUT链:当接收到防火墙本机地址的数据包(入站)时,应用此链中的规则。
    OUTPUT链:当防火墙本机向外发送数据包(出站)时,应用此链中的规则。
    FORWARD链:当接收到需要通过防火墙发送给其他地址的数据包(转发)时,应用此链中的规则。
    PREROUTING链:在对数据包作路由选择之前,应用此链中的规则,如DNAT。
    POSTROUTING链:在对数据包作路由选择之后,应用此链中的规则,如SNAT。

    -->PREROUTING-->[ROUTE]-->FORWARD-->POSTROUTING-->
         mangle        |       mangle        ^ mangle
          nat          |       filter        |  nat
                       |                     |
                       |                     |
                       v                     |
                     INPUT                 OUTPUT
                       | mangle              ^ mangle
                       | filter              |  nat
                       v ------>local------->| filter
    

    我们可以通过iptables-save命令来备份iptables规则,以iptables-save的格式进行简单举例:

    root@ubuntu:/home/yuxianbing# iptables-save -t nat
    # Generated by iptables-save v1.6.0 on Tue Mar  5 09:26:02 2019 
    注释内容
    *nat
    :PREROUTING ACCEPT [14:12354]
    -- :PREROUTING ACCEPT,表示nat表中的PREROUTING 链默认报文策略是接受(匹配不到规则继续) ,
    -- [14:12354] 即[packet, bytes],表示当前有14个包(12354字节)经过nat表的PREROUTING 链
    :INPUT ACCEPT [0:0]
    :OUTPUT ACCEPT [4:222]
    :POSTROUTING ACCEPT [3:149]
    :CNI-DN-603508538baa710bd2110 - [0:0]
    :CNI-HOSTPORT-DNAT - [0:0]
    :CNI-HOSTPORT-SNAT - [0:0]
    :CNI-SN-603508538baa710bd2110 - [0:0]
    :KUBE-FIRE-WALL - [0:0]
    :KUBE-MARK-DROP - [0:0]
    :KUBE-MARK-MASQ - [0:0]
    :KUBE-POSTROUTING - [0:0]
    :KUBE-SERVICES - [0:0]
    -- 解释同上(这些是自定义链)
    
    ---------- 下面开始按条输出所有规则----------
    -A PREROUTING -m addrtype --dst-type LOCAL -j CNI-HOSTPORT-DNAT
    ---- 这是用iptables命令配置此规则的命令(详解选项可参考iptables帮助)。
    -A PREROUTING -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
    -A OUTPUT -m addrtype --dst-type LOCAL -j CNI-HOSTPORT-DNAT
    -A OUTPUT -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
    -A POSTROUTING -s 127.0.0.1/32 ! -d 127.0.0.1/32 -j CNI-HOSTPORT-SNAT
    -A POSTROUTING -m comment --comment "kubernetes postrouting rules" -j KUBE-POSTROUTING
    -A POSTROUTING -s 10.0.0.0/8 ! -d 10.0.0.0/8 -j MASQUERADE
    -A CNI-DN-603508538baa710bd2110 -p tcp -m tcp --dport 8080 -j DNAT --to-destination 10.221.2.42:80
    -A CNI-HOSTPORT-DNAT -m comment --comment "dnat name: \"cni0\" id: \"42ccbebb916d082a6d872aaa48efea33c4cc33267d14779046f687f4dcddda8d\"" -j CNI-DN-603508538baa710bd2110
    -A CNI-HOSTPORT-SNAT -m comment --comment "snat name: \"cni0\" id: \"42ccbebb916d082a6d872aaa48efea33c4cc33267d14779046f687f4dcddda8d\"" -j CNI-SN-603508538baa710bd2110
    -A CNI-SN-603508538baa710bd2110 -s 127.0.0.1/32 -d 10.221.2.42/32 -p tcp -m tcp --dport 80 -j MASQUERADE
    -A KUBE-MARK-DROP -j MARK --set-xmark 0x8000/0x8000
    -A KUBE-MARK-MASQ -j MARK --set-xmark 0x4000/0x4000
    -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -m mark --mark 0x4000/0x4000 -j MASQUERADE
    -A KUBE-POSTROUTING -m set --match-set KUBE-LOOP-BACK dst,dst,src -j MASQUERADE
    -A KUBE-SERVICES -m set --match-set KUBE-CLUSTER-IP dst,dst -j KUBE-MARK-MASQ
    -A KUBE-SERVICES -p tcp -m tcp -m set --match-set KUBE-NODE-PORT-TCP dst -j KUBE-MARK-MASQ
    COMMIT
    -- 应用上述配置
    # Completed on Tue Mar  5 09:26:02 2019
    

    从iptables的NAT规则表包含两个部分:链和规则,其中链还保存了经过该链的包和字节数。

    针对iptables和IPVS规则的处理逻辑为:

        err := proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData) 
            // 1.执行iptables-save命令
        if err != nil { // if we failed to get any rules
            glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
        } else { // otherwise parse the output
            existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes()) 
                    // 2. 读取所有的链路信息
        }
            // 3. 重置iptables规则缓冲区
        proxier.natChains.Reset() 
        proxier.natRules.Reset()
        // Write table headers.
        writeLine(proxier.natChains, "*nat") // 3. 写nat表头信息
            // 4. 写POSTROUTING链路信息
        if chain, ok := existingNATChains[kubePostroutingChain]; ok {
            writeLine(proxier.natChains, chain)
        } else {
            writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain))
        }
    
            // 写POSTROUTING 规则
            // 这条POSTROUTING规则的作用是为消息打标记的包,执行MASQUERADE操作,也就是SNAT处理
            // 对于出主机的包,我们要做SNAT处理
        writeLine(proxier.natRules, []string{
            "-A", string(kubePostroutingChain),
            "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
            "-m", "mark", "--mark", proxier.masqueradeMark,
            "-j", "MASQUERADE",
        }...)
    
            // 5. KUBE-MARK-MASQ链路信息
        if chain, ok := existingNATChains[KubeMarkMasqChain]; ok {
            writeLine(proxier.natChains, chain)
        } else {
            writeLine(proxier.natChains, utiliptables.MakeChainLine(KubeMarkMasqChain))
        }
           // KUBE-MARK-MASQ规则,执行MARK,打上masquerade标记
        writeLine(proxier.natRules, []string{
            "-A", string(KubeMarkMasqChain),
            "-j", "MARK", "--set-xmark", proxier.masqueradeMark,
        }...)
    
           // 6. 安装Dummy网卡kube-ipvs0
           // 这个网卡的作用是绑定所有Service的ClusterIP到该网卡上
        _, err = proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
        if err != nil {
            glog.Errorf("Failed to create dummy interface: %s, error: %v", DefaultDummyDevice, err)
            return
        }
            // 7. ipsets初始化,建立所有的ipsets表,并清空里面的内容
        // make sure ip sets exists in the system.
        ipSets := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.externalIPSet, proxier.nodePortSetUDP, proxier.nodePortSetTCP,
            proxier.lbIngressSet, proxier.lbMasqSet, proxier.lbWhiteListCIDRSet, proxier.lbWhiteListIPSet}
        if err := ensureIPSets(ipSets...); err != nil {
            return
        }
        for i := range ipSets {
            ipSets[i].resetEntries()
        }
            ......
            
    // linkKubeServiceChain will Create chain KUBE-SERVICES and link the chin in PREROUTING and OUTPUT
    
    // Chain PREROUTING (policy ACCEPT)
    // target            prot opt source               destination
    // KUBE-SERVICES     all  --  0.0.0.0/0            0.0.0.0/0
    
    // Chain OUTPUT (policy ACCEPT)
    // target            prot opt source               destination
    // KUBE-SERVICES     all  --  0.0.0.0/0            0.0.0.0/0
    
    // Chain KUBE-SERVICES (2 references)
            // 8. 创建PrePosting与Output链,匹配所有的协议、源IP和目标IP,自动进入目标KUBE-SERVICES就行匹配和处理
            //  同时创建KUBE-SERVICES链,注意:规则后续根据Cluster Service与NodePort Service来按需创建。
            // 类似命令为: iptables -t nat -N KUBE-SERVICES
        if err := proxier.linkKubeServiceChain(existingNATChains, proxier.natChains); err != nil {
            glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err)
            return
        }
        // Kube service ipset
        if err := proxier.createKubeFireWallChain(existingNATChains, proxier.natChains); err != nil {
            glog.Errorf("Failed to create KUBE-FIRE-WALL chain: %v", err)
            return
        }
            
            // 9. 创建KUBE-FIRE-WALL链,不过目前我们系统没有相关的规则,暂时不知道用处
        if err := proxier.createKubeFireWallChain(existingNATChains, proxier.natChains); err != nil {
            glog.Errorf("Failed to create KUBE-FIRE-WALL chain: %v", err)
            return
        }
            
            // 10. 进入最关键的环节,为每个服务构建IPVS规则
            for svcName, svcInfo := range proxier.serviceMap {
            protocol := strings.ToLower(string(svcInfo.protocol))
            // Precompute svcNameString; with many services the many calls
            // to ServicePortName.String() show up in CPU profiles.
            svcNameString := svcName.String()
    
            // Handle traffic that loops back to the originator with SNAT.
                // 10.1 hairpin模式的处理,匹配ip,port,ip这种模式,这块代码不分析了,一般很少有这种现象。
                    ......
                    // 10.2 ClusterIP处理
                    // 准备 IP Set的项,存储Cluster IP和对应的Port
            entry := &utilipset.Entry{
                IP:       svcInfo.clusterIP.String(),
                Port:     svcInfo.port,
                Protocol: protocol,
                SetType:  utilipset.HashIPPort,
            }
                    // 如果kube-proxy启动设置了masqueradeAll或者clusterCIDR,则安装伪装规则,做SNAT操作
            if proxier.masqueradeAll || len(proxier.clusterCIDR) > 0 {
                            ......
                proxier.clusterIPSet.activeEntries.Insert(entry.String())
            }
            // 准备ipvs虚拟服务器
            serv := &utilipvs.VirtualServer{
                Address:   svcInfo.clusterIP,
                Port:      uint16(svcInfo.port),
                Protocol:  string(svcInfo.protocol),
                Scheduler: proxier.ipvsScheduler,
            }
            // Set session affinity flag and timeout for IPVS service
            if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
                serv.Flags |= utilipvs.FlagPersistent
                serv.Timeout = uint32(svcInfo.stickyMaxAgeSeconds)
            }
            // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
                    // 10.3 创建或者更新IPVS虚拟服务器 
                    //  把ClusterIP绑定到kube-ipvs0设备,最后一个参数bindAddr为true
            if err := proxier.syncService(svcNameString, serv, true); err == nil {
                activeIPVSServices[serv.String()] = true
                // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
                // So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
                            // 这里要看一下,传入的第二个参数中,onlyNodeLocalEndpoints传入为false,所以我们会把所有的Endpoints注册到IPVS虚拟服务器中
                            // onlyNodeLocalEndpoints只有NodePort和LB中设置才会生效,用于只绑定到本地的Endpoints
                          
                if err := proxier.syncEndpoint(svcName, false, serv); err != nil {
                    glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
                }
            } else {
                glog.Errorf("Failed to sync service: %v, err: %v", serv, err)
            }
    
            // 10.4 Capture externalIPs. 不关注,很少用
            // 10.5 Capture load-balancer ingress.  不关注,很少用,需要云厂商支持
                    ......
                    // 10.6 针对NodePort类型的Service进行处理        
            if svcInfo.nodePort != 0 {
                lp := utilproxy.LocalPort{
                    Description: "nodePort for " + svcNameString,
                    IP:          "",
                    Port:        svcInfo.nodePort,
                    Protocol:    protocol,
                }
                if proxier.portsMap[lp] != nil {
                    glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
                    replacementPortsMap[lp] = proxier.portsMap[lp]
                } else {
                                    // 侦听Host上的PORT端口(TCP/UDP),把端口占用起来,以便安全的安装IPVS规则
                    socket, err := proxier.portMapper.OpenLocalPort(&lp)
                    if err != nil {
                        glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
                        continue
                    }
                    if lp.Protocol == "udp" {
                        isIPv6 := utilproxy.IsIPv6(svcInfo.clusterIP)
                        utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.Port, isIPv6)
                    }
                    replacementPortsMap[lp] = socket
                } // We're holding the port, so it's OK to install ipvs rules.
                            // 如果没有指定onlyNodeLocalEndpoints,则需要做SNAT处理,需要把对应的端口插入到IPSET集中,这样,可以匹配到KUBE-SERVICES规则,从而做原地址伪装,实现SNAT功能。
                // Nodeports need SNAT, unless they're local.
                // ipset call
                if !svcInfo.onlyNodeLocalEndpoints {
                    entry = &utilipset.Entry{
                        // No need to provide ip info
                        Port:     svcInfo.nodePort,
                        Protocol: protocol,
                        SetType:  utilipset.BitmapPort,
                    }
                    var nodePortSet *IPSet
                    switch protocol {
                    case "tcp":
                        nodePortSet = proxier.nodePortSetTCP
                    case "udp":
                        nodePortSet = proxier.nodePortSetUDP
                    default:
                        // It should never hit
                        glog.Errorf("Unsupported protocol type: %s", protocol)
                    }
                    if nodePortSet != nil {
                        if valid := nodePortSet.validateEntry(entry); !valid {
                            glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
                            continue
                        }
                        nodePortSet.activeEntries.Insert(entry.String())
                    }
                }
                            // 为节点上的每个物理IP创建对应的IPVS虚拟服务器和对应的路由Endpoint规则
                // Build ipvs kernel routes for each node ip address
                nodeIPs, err := proxier.ipGetter.NodeIPs()
                if err != nil {
                    glog.Errorf("Failed to get node IP, err: %v", err)
                } else {
                    for _, nodeIP := range nodeIPs {
                        // ipvs call
                        serv := &utilipvs.VirtualServer{
                            Address:   nodeIP,
                            Port:      uint16(svcInfo.nodePort),
                            Protocol:  string(svcInfo.protocol),
                            Scheduler: proxier.ipvsScheduler,
                        }
                        if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
                            serv.Flags |= utilipvs.FlagPersistent
                            serv.Timeout = uint32(svcInfo.stickyMaxAgeSeconds)
                        }
                        // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
                        if err := proxier.syncService(svcNameString, serv, false); err == nil {
                            activeIPVSServices[serv.String()] = true
                            if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil {
                                glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
                            }
                        } else {
                            glog.Errorf("Failed to sync service: %v, err: %v", serv, err)
                        }
                    }
                }
            }
        }
    

    总结

    kube-proxy的IPVS Proxier是通过iptables和ipvs来实现代理功能,iptables基本上是几条固定的链路和规则,而大量的Service ClusterIP和Endpoint IP等等信息,都封装进了IPSET中,通过iptables通过match-set来匹配,大大减少了iptables规则数量,提高了iptables维护性能和匹配性能。
    iptables规则主要用于做SNAT等操作,配合ipvs完成代理服务能力。

    相关文章

      网友评论

        本文标题:k8s网络系列学习笔记之三----kube-proxy原理分析

        本文链接:https://www.haomeiwen.com/subject/cwydyqtx.html