美文网首页
结合源码理解podip如何分配

结合源码理解podip如何分配

作者: wwq2020 | 来源:发表于2023-09-27 06:29 被阅读0次

背景

本文针对的是cni为flannel,cri为containerd的场景
k8s版本v1.23.7

简单总结

image.png

相关源码

k8s的pkg/kubelet/kuberuntime/kuberuntime_manager.go中

pod创建会进入到syncpod
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
  ...
        podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)

  ...
}

k8s的pkg/kubelet/kuberuntime/kuberuntime_sandbox.go中

创建sandbox
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
  ...
    podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
    if err != nil {
        message := fmt.Sprintf("Failed to create sandbox for pod %q: %v", format.Pod(pod), err)
        klog.ErrorS(err, "Failed to create sandbox for pod", "pod", klog.KObj(pod))
        return "", message, err
    }
  ...
}

k8s的pkg/kubelet/cri/remote/remote_runtime.go中

调用cri的RunPodSandbox接口
func (r *remoteRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
  ...
    if r.useV1API() {
        resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
            Config:         config,
            RuntimeHandler: runtimeHandler,
        })

        if err != nil {
            klog.ErrorS(err, "RunPodSandbox from runtime service failed")
            return "", err
        }
        podSandboxID = resp.PodSandboxId
    } else {
        resp, err := r.runtimeClientV1alpha2.RunPodSandbox(ctx, &runtimeapiV1alpha2.RunPodSandboxRequest{
            Config:         v1alpha2PodSandboxConfig(config),
            RuntimeHandler: runtimeHandler,
        })

        if err != nil {
            klog.ErrorS(err, "RunPodSandbox from runtime service failed")
            return "", err
        }
        podSandboxID = resp.PodSandboxId
    }
  ...
}

containerd的pkg/cri/server/sandbox_run.go中

实际上创建和运行sandbox,其中包含设置pod网络
func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (_ *runtime.RunPodSandboxResponse, retErr error) {
  ...
        if err := c.setupPodNetwork(ctx, &sandbox); err != nil {
            return nil, fmt.Errorf("failed to setup network for sandbox %q: %w", id, err)
        }

  ...
}

func (c *criService) setupPodNetwork(ctx context.Context, sandbox *sandboxstore.Sandbox) error {
  ...
    if c.config.CniConfig.NetworkPluginSetupSerially {
        result, err = netPlugin.SetupSerially(ctx, id, path, opts...)
    } else {
        result, err = netPlugin.Setup(ctx, id, path, opts...)
    }
  ...
}


containerd的vendor/github.com/containerd/go-cni/cni.go中

创建网络命名空间,设置pod网络
func (c *libcni) Setup(ctx context.Context, id string, path string, opts ...NamespaceOpts) (*Result, error) {
    if err := c.Status(); err != nil {
        return nil, err
    }
    ns, err := newNamespace(id, path, opts...)
    if err != nil {
        return nil, err
    }
    result, err := c.attachNetworks(ctx, ns)
    if err != nil {
        return nil, err
    }
    return c.createResult(result)
}

func asynchAttach(ctx context.Context, index int, n *Network, ns *Namespace, wg *sync.WaitGroup, rc chan asynchAttachResult) {
    defer wg.Done()
    r, err := n.Attach(ctx, ns)
    rc <- asynchAttachResult{index: index, res: r, err: err}
}

func (c *libcni) attachNetworks(ctx context.Context, ns *Namespace) ([]*types100.Result, error) {
    var wg sync.WaitGroup
    var firstError error
    results := make([]*types100.Result, len(c.Networks()))
    rc := make(chan asynchAttachResult)

    for i, network := range c.Networks() {
        wg.Add(1)
        go asynchAttach(ctx, i, network, ns, &wg, rc)
    }

    for range c.Networks() {
        rs := <-rc
        if rs.err != nil && firstError == nil {
            firstError = rs.err
        }
        results[rs.index] = rs.res
    }
    wg.Wait()

    return results, firstError
}

containerd的vendor/github.com/containerd/go-cni/namespace.go中

设置pod命名空间的网络
func (n *Network) Attach(ctx context.Context, ns *Namespace) (*types100.Result, error) {
    r, err := n.cni.AddNetworkList(ctx, n.config, ns.config(n.ifName))
    if err != nil {
        return nil, err
    }
    return types100.NewResultFromResult(r)
}

containerd的vendor/github.com/containernetworking/cni/libcni/api.go中

准备命令和配置,调用命令执行
func (c *CNIConfig) AddNetworkList(ctx context.Context, list *NetworkConfigList, rt *RuntimeConf) (types.Result, error) {
    var err error
    var result types.Result
    for _, net := range list.Plugins {
        result, err = c.addNetwork(ctx, list.Name, list.CNIVersion, net, result, rt)
        if err != nil {
            return nil, fmt.Errorf("plugin %s failed (add): %w", pluginDescription(net.Network), err)
        }
    }

    if err = c.cacheAdd(result, list.Bytes, list.Name, rt); err != nil {
        return nil, fmt.Errorf("failed to set network %q cached result: %w", list.Name, err)
    }

    return result, nil
}

func (c *CNIConfig) addNetwork(ctx context.Context, name, cniVersion string, net *NetworkConfig, prevResult types.Result, rt *RuntimeConf) (types.Result, error) {
    c.ensureExec()
    pluginPath, err := c.exec.FindInPath(net.Network.Type, c.Path)
    if err != nil {
        return nil, err
    }
    if err := utils.ValidateContainerID(rt.ContainerID); err != nil {
        return nil, err
    }
    if err := utils.ValidateNetworkName(name); err != nil {
        return nil, err
    }
    if err := utils.ValidateInterfaceName(rt.IfName); err != nil {
        return nil, err
    }

    newConf, err := buildOneConfig(name, cniVersion, net, prevResult, rt)
    if err != nil {
        return nil, err
    }

    return invoke.ExecPluginWithResult(ctx, pluginPath, newConf.Bytes, c.args("ADD", rt), c.exec)
}

containerd的vendor/github.com/containernetworking/cni/pkg/invoke/exec.go中

执行命令
func ExecPluginWithResult(ctx context.Context, pluginPath string, netconf []byte, args CNIArgs, exec Exec) (types.Result, error) {
    if exec == nil {
        exec = defaultExec
    }

    stdoutBytes, err := exec.ExecPlugin(ctx, pluginPath, netconf, args.AsEnv())
    if err != nil {
        return nil, err
    }

    resultVersion, fixedBytes, err := fixupResultVersion(netconf, stdoutBytes)
    if err != nil {
        return nil, err
    }

    return create.Create(resultVersion, fixedBytes)
}

flannel-cni的flannel.go中

cni入口
func main() {
    fullVer := fmt.Sprintf("CNI Plugin %s version %s (%s/%s) commit %s built on %s", Program, Version, runtime.GOOS, runtime.GOARCH, Commit, buildDate)
    skel.PluginMain(cmdAdd, cmdCheck, cmdDel, cni.All, fullVer)
}

配置pod网络
func cmdAdd(args *skel.CmdArgs) error {
    n, err := loadFlannelNetConf(args.StdinData)
    if err != nil {
        return fmt.Errorf("loadFlannelNetConf failed: %w", err)
    }
    fenv, err := loadFlannelSubnetEnv(n.SubnetFile)
    if err != nil {
        return fmt.Errorf("loadFlannelSubnetEnv failed: %w", err)
    }

    if n.Delegate == nil {
        n.Delegate = make(map[string]interface{})
    } else {
        if hasKey(n.Delegate, "type") && !isString(n.Delegate["type"]) {
            return fmt.Errorf("'delegate' dictionary, if present, must have (string) 'type' field")
        }
        if hasKey(n.Delegate, "name") {
            return fmt.Errorf("'delegate' dictionary must not have 'name' field, it'll be set by flannel")
        }
        if hasKey(n.Delegate, "ipam") {
            return fmt.Errorf("'delegate' dictionary must not have 'ipam' field, it'll be set by flannel")
        }
    }

    if n.RuntimeConfig != nil {
        n.Delegate["runtimeConfig"] = n.RuntimeConfig
    }

    return doCmdAdd(args, n, fenv)
}

flannel-cni的flannel_linux.go中

linux下配置pod网络入口
func doCmdAdd(args *skel.CmdArgs, n *NetConf, fenv *subnetEnv) error {
    n.Delegate["name"] = n.Name

    if !hasKey(n.Delegate, "type") {
        n.Delegate["type"] = "bridge"
    }

    if !hasKey(n.Delegate, "ipMasq") {
        // if flannel is not doing ipmasq, we should
        ipmasq := !*fenv.ipmasq
        n.Delegate["ipMasq"] = ipmasq
    }

    if !hasKey(n.Delegate, "mtu") {
        mtu := fenv.mtu
        n.Delegate["mtu"] = mtu
    }

    if n.Delegate["type"].(string) == "bridge" {
        if !hasKey(n.Delegate, "isGateway") {
            n.Delegate["isGateway"] = true
        }
    }
    if n.CNIVersion != "" {
        n.Delegate["cniVersion"] = n.CNIVersion
    }

    ipam, err := getDelegateIPAM(n, fenv)
    if err != nil {
        return fmt.Errorf("failed to assemble Delegate IPAM: %w", err)
    }
    n.Delegate["ipam"] = ipam
    fmt.Fprintf(os.Stderr, "\n%#v\n", n.Delegate)

    return delegateAdd(args.ContainerID, n.DataDir, n.Delegate)
}

获取ipam配置
func getDelegateIPAM(n *NetConf, fenv *subnetEnv) (map[string]interface{}, error) {
    ipam := n.IPAM
    if ipam == nil {
        ipam = map[string]interface{}{}
    }

    if !hasKey(ipam, "type") {
        ipam["type"] = "host-local"
    }

    var rangesSlice [][]map[string]interface{}

    if fenv.sn != nil && fenv.sn.String() != "" {
        rangesSlice = append(rangesSlice, []map[string]interface{}{
            {"subnet": fenv.sn.String()},
        },
        )
    }

    if fenv.ip6Sn != nil && fenv.ip6Sn.String() != "" {
        rangesSlice = append(rangesSlice, []map[string]interface{}{
            {"subnet": fenv.ip6Sn.String()},
        },
        )
    }

    ipam["ranges"] = rangesSlice

    rtes, err := getIPAMRoutes(n)
    if err != nil {
        return nil, fmt.Errorf("failed to read IPAM routes: %w", err)
    }

    for _, nw := range fenv.nws {
        if nw != nil {
            rtes = append(rtes, types.Route{Dst: *nw})
        }
    }

    for _, nw := range fenv.ip6Nws {
        if nw != nil {
            rtes = append(rtes, types.Route{Dst: *nw})
        }
    }

    ipam["routes"] = rtes

    return ipam, nil
}


flannel-cni的flannel.go中

最终下一个cni环节,这个场景下是host-local
func delegateAdd(cid, dataDir string, netconf map[string]interface{}) error {
    netconfBytes, err := json.Marshal(netconf)
    fmt.Fprintf(os.Stderr, "delegateAdd: netconf sent to delegate plugin:\n")
    os.Stderr.Write(netconfBytes)
    if err != nil {
        return fmt.Errorf("error serializing delegate netconf: %v", err)
    }

    // save the rendered netconf for cmdDel
    if err = saveScratchNetConf(cid, dataDir, netconfBytes); err != nil {
        return err
    }

    result, err := invoke.DelegateAdd(context.TODO(), netconf["type"].(string), netconfBytes, nil)
    if err != nil {
        err = fmt.Errorf("failed to delegate add: %w", err)
        return err
    }
    return result.Print()
}

相关文章

网友评论

      本文标题:结合源码理解podip如何分配

      本文链接:https://www.haomeiwen.com/subject/xjjhbdtx.html