美文网首页
k8s flannel host-gw

k8s flannel host-gw

作者: shoyu666 | 来源:发表于2022-03-18 14:52 被阅读0次

    k8s flannel cni plugin

    背景

    前面提到 k8s flannel cni插件通过 flannel 生成的 /run/flannel/subnet.env 文件,转换为cni plugin插件配置后调用插件配置网络。
    /run/flannel/subnet.env 文件由flannel运行时生成.内容如下

    FLANNEL_NETWORK=10.244.0.0/16
    //可用于bridge插件的网关ip
    FLANNEL_SUBNET=10.244.0.1/24 
    FLANNEL_MTU=1450
    FLANNEL_IPMASQ=true
    

    那么subnet.env配置是如何生成的,以及 flannel是如何管理网络的。(以host-gw为例)。

    flannel 配置文件

    先看一个配置文件 /etc/kube-flannel/net-conf.json 文件说明

       {
         //Network (string): IPv4 network in CIDR format to use for the entire flannel network. (This is the only mandatory key.)
          "Network": "10.244.0.0/16",
         //(dictionary): Type of backend to use and specific configurations for that backend. The list of available backends and the keys that can be put into the this dictionary are listed below. Defaults to udp backend.
          "Backend": {
            "Type": "host-gw"
          }
        }
    

    该配置是flannel启动的网络配置文件,指定了用于整个flannel网络的网段和后端的类型,可以通过 --net-config-path=/etc/kube-flannel/net-conf.json: path to the network configuration file to use 覆盖,默认是在 /etc/kube-flannel/net-conf.json。

    flannel 启动

    //main.go
    func main() {
            config, err := getConfig(ctx, sm)
            //创建 subnet.Manager
        sm, err := newSubnetManager(ctx)
             //确定网卡
            extIface, err = ipmatch.LookupExtIface(iface, "", ipStack, optsPublicIP)
            //创建 backend.Manager  
            bm := backend.NewManager(ctx, sm, extIface)
           //创建 backend.Backend
            be, err := bm.GetBackend(config.BackendType)
            //创建 backend.Network
            bn, err := be.RegisterNetwork(ctx, &wg, config)
           //subnet.env 配置文件生成
           if err := WriteSubnetFile(opts.subnetFile, config, opts.ipMasq, bn); err != nil {
            wg.Add(1)
        go func() {
                    //运行 backed.Network
            bn.Run(ctx)
            wg.Done()
        }()
            //lease续期
        err = MonitorLease(ctx, sm, bn, &wg)
    }
    

    subnet.Manager有2个实现 LocalManager 和 kubeSubnetManager ,区别是一个基于etcd的存储,一个是基于 k8s api的存储。 backend.Backend 后端实现 有 host-gw等。 host-gw 下 backend.Network 的实现为 RouteNetwork。
    关系如下:


    image.png

    生成配置

    config,err:=getConfig(ctx,sm)
    ....
    ....
     if err := WriteSubnetFile(opts.subnetFile, config, opts.ipMasq, bn); err != nil {
    

    根据 /etc/kube-flannel/net-conf.json 配置生成 config 配置,WriteSubnetFile将 生成的config写入/run/flannel/subnet.env

    确定flannel绑定的网卡

     extIface, err = ipmatch.LookupExtIface(iface, "", ipStack, optsPublicIP)
    

    extIface 是 flannel 绑定的网卡,如果没有指定就取默认的(ip.GetDefaultGatewayInterface())

    注册网络(host-gw)

    //backend/hostgw/hostgw.go
    
    func (be *HostgwBackend) RegisterNetwork(ctx context.Context, wg *sync.WaitGroup, config *subnet.Config) (backend.Network, error) {
        n := &backend.RouteNetwork{
            SimpleNetwork: backend.SimpleNetwork{
                ExtIface: be.extIface,
            },
            SM:          be.sm,
            BackendType: "host-gw",
            Mtu:         be.extIface.Iface.MTU,
            LinkIndex:   be.extIface.Iface.Index,
        }
    //申请 lease
    l, err := be.sm.AcquireLease(ctx, &attrs)
    

    创建RouteNetwork 和 申请 Lease,那么lease是什么?

    type Lease struct {
        EnableIPv4 bool
        EnableIPv6 bool
        Subnet     ip.IP4Net
        IPv6Subnet ip.IP6Net
        Attrs      LeaseAttrs
        Expiration time.Time
    
        Asof uint64
    }
    

    Lease 实际就是flannel各个host的子网配置,当前host和其它host各有自己的lease,watch就是当前host监听其它host的lease变化。Lease 又表示租期的意思,表明给具体某个host分配的子网是有时效性的,host要及时上报续期,如果没有及时续期,Lease 租期失效后,该子网会被回收,重新分配给其它host.

    bn.Run(ctx)

    func (n *RouteNetwork) Run(ctx context.Context) {
        wg := sync.WaitGroup{}
    
        log.Info("Watching for new subnet leases")
        evts := make(chan []subnet.Event)
        wg.Add(1)
        go func() {
            subnet.WatchLeases(ctx, n.SM, n.SubnetLease, evts)
            wg.Done()
        }()
    
        n.routes = make([]netlink.Route, 0, 10)
        wg.Add(1)
        go func() {
            n.routeCheck(ctx)
            wg.Done()
        }()
    
        defer wg.Wait()
    
        for {
            evtBatch, ok := <-evts
            if !ok {
                log.Infof("evts chan closed")
                return
            }
            n.handleSubnetEvents(evtBatch)
        }
    }
    

    run方法主要步骤:
    开启协程用于监听lease (subnet.WatchLeases)
    开启协程用于路由检查 (n.routeCheck(ctx))
    for循环用于处理 event事件

    监听lease

    func WatchLeases(ctx context.Context, sm Manager, ownLease *Lease, receiver chan []Event) {
        lw := &leaseWatcher{
            ownLease: ownLease,
        }
        var cursor interface{}
    
        for {
            res, err := sm.WatchLeases(ctx, cursor)
            if err != nil {
                if err == context.Canceled || err == context.DeadlineExceeded {
                    log.Infof("%v, close receiver chan", err)
                    close(receiver)
                    return
                }
    
                if res.Cursor != nil {
                    cursor = res.Cursor
                }
    
                log.Errorf("Watch subnets: %v", err)
                time.Sleep(time.Second)
                continue
            }
    
            cursor = res.Cursor
    
            var batch []Event
    
            if len(res.Events) > 0 {
                batch = lw.update(res.Events)
            } else {
                batch = lw.reset(res.Snapshot)
            }
    
            if len(batch) > 0 {
                receiver <- batch
            }
        }
    }
    
    type leaseWatcher struct {
        ownLease *Lease
        leases   []Lease
    }
    

    leaseWatcher 存储了host自身的lease和其它host的lease

    if len(res.Events) > 0 {
        batch = lw.update(res.Events)
            } else {
        batch = lw.reset(res.Snapshot)
    }
    

    Events表示增量的数据,Snapshot表示全量的数据。
    增量的数据分2类:新增(EventAdded)和删除(EventRemoved)。
    update 处理增量数据,reset处理全量数据。
    update 和 reset 实际是修改 leaseWatcher 的 leases

    路由检查

    n.routeCheck(ctx)
    
    
    func (n *RouteNetwork) routeCheck(ctx context.Context) {
       for {
           select {
           case <-ctx.Done():
               return
           case <-time.After(routeCheckRetries * time.Second):
               n.checkSubnetExistInV4Routes()
               n.checkSubnetExistInV6Routes()
           }
       }
    }
    
    

    定期检查路由,没有就添加,那么路由是从哪里来的?来源于下面的处理子网事件

    处理子网事件

       n.handleSubnetEvents(evtBatch)
    
    func (n *RouteNetwork) handleSubnetEvents(batch []subnet.Event) {
        for _, evt := range batch {
            switch evt.Type {
            case subnet.EventAdded:
                if evt.Lease.Attrs.BackendType != n.BackendType {
                    log.Warningf("Ignoring non-%v subnet: type=%v", n.BackendType, evt.Lease.Attrs.BackendType)
                    continue
                }
    
                if evt.Lease.EnableIPv4 {
                    log.Infof("Subnet added: %v via %v", evt.Lease.Subnet, evt.Lease.Attrs.PublicIP)
    
                    route := n.GetRoute(&evt.Lease)
                    routeAdd(route, netlink.FAMILY_V4, n.addToRouteList, n.removeFromV4RouteList)
                }
    
                if evt.Lease.EnableIPv6 {
                    log.Infof("Subnet added: %v via %v", evt.Lease.IPv6Subnet, evt.Lease.Attrs.PublicIPv6)
    
                    route := n.GetV6Route(&evt.Lease)
                    routeAdd(route, netlink.FAMILY_V6, n.addToV6RouteList, n.removeFromV6RouteList)
                }
    
            case subnet.EventRemoved:
                if evt.Lease.Attrs.BackendType != n.BackendType {
                    log.Warningf("Ignoring non-%v subnet: type=%v", n.BackendType, evt.Lease.Attrs.BackendType)
                    continue
                }
    
                if evt.Lease.EnableIPv4 {
                    log.Info("Subnet removed: ", evt.Lease.Subnet)
    
                    route := n.GetRoute(&evt.Lease)
                    // Always remove the route from the route list.
                    n.removeFromV4RouteList(*route)
    
                    if err := netlink.RouteDel(route); err != nil {
                        log.Errorf("Error deleting route to %v: %v", evt.Lease.Subnet, err)
                    }
                }
    
                if evt.Lease.EnableIPv6 {
                    log.Info("Subnet removed: ", evt.Lease.IPv6Subnet)
    
                    route := n.GetV6Route(&evt.Lease)
                    // Always remove the route from the route list.
                    n.removeFromV6RouteList(*route)
    
                    if err := netlink.RouteDel(route); err != nil {
                        log.Errorf("Error deleting route to %v: %v", evt.Lease.IPv6Subnet, err)
                    }
                }
    
            default:
                log.Error("Internal error: unknown event type: ", int(evt.Type))
            }
        }
    }
    

    监听到子网(lease) add事件对应为添加路由(routeAdd),remove事件对应为 删除路由(removeFromV4RouteList)。
    lease转换为route

    backend/hostgw/hostgw.go
            n.GetRoute = func(lease *subnet.Lease) *netlink.Route {
                return &netlink.Route{
                    Dst:        lease.Subnet.ToIPNet(),
                    Gw:        lease.Attrs.PublicIP.ToIP(),
                    LinkIndex: n.LinkIndex,
                }
    }
    

    总结:
    hostgw 主要就是监听lease,然后根据lease维护route。

    相关文章

      网友评论

          本文标题:k8s flannel host-gw

          本文链接:https://www.haomeiwen.com/subject/esmbdrtx.html