背景
本文针对的是cni为flannel,cri为containerd的场景
k8s版本v1.23.7
简单总结
![](https://img.haomeiwen.com/i11851608/a828fdd81c6309fd.png)
相关源码
k8s的pkg/kubelet/kuberuntime/kuberuntime_manager.go中
pod创建会进入到syncpod
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
...
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
...
}
k8s的pkg/kubelet/kuberuntime/kuberuntime_sandbox.go中
创建sandbox
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
...
podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
if err != nil {
message := fmt.Sprintf("Failed to create sandbox for pod %q: %v", format.Pod(pod), err)
klog.ErrorS(err, "Failed to create sandbox for pod", "pod", klog.KObj(pod))
return "", message, err
}
...
}
k8s的pkg/kubelet/cri/remote/remote_runtime.go中
调用cri的RunPodSandbox接口
func (r *remoteRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
...
if r.useV1API() {
resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
Config: config,
RuntimeHandler: runtimeHandler,
})
if err != nil {
klog.ErrorS(err, "RunPodSandbox from runtime service failed")
return "", err
}
podSandboxID = resp.PodSandboxId
} else {
resp, err := r.runtimeClientV1alpha2.RunPodSandbox(ctx, &runtimeapiV1alpha2.RunPodSandboxRequest{
Config: v1alpha2PodSandboxConfig(config),
RuntimeHandler: runtimeHandler,
})
if err != nil {
klog.ErrorS(err, "RunPodSandbox from runtime service failed")
return "", err
}
podSandboxID = resp.PodSandboxId
}
...
}
containerd的pkg/cri/server/sandbox_run.go中
实际上创建和运行sandbox,其中包含设置pod网络
func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (_ *runtime.RunPodSandboxResponse, retErr error) {
...
if err := c.setupPodNetwork(ctx, &sandbox); err != nil {
return nil, fmt.Errorf("failed to setup network for sandbox %q: %w", id, err)
}
...
}
func (c *criService) setupPodNetwork(ctx context.Context, sandbox *sandboxstore.Sandbox) error {
...
if c.config.CniConfig.NetworkPluginSetupSerially {
result, err = netPlugin.SetupSerially(ctx, id, path, opts...)
} else {
result, err = netPlugin.Setup(ctx, id, path, opts...)
}
...
}
containerd的vendor/github.com/containerd/go-cni/cni.go中
创建网络命名空间,设置pod网络
func (c *libcni) Setup(ctx context.Context, id string, path string, opts ...NamespaceOpts) (*Result, error) {
if err := c.Status(); err != nil {
return nil, err
}
ns, err := newNamespace(id, path, opts...)
if err != nil {
return nil, err
}
result, err := c.attachNetworks(ctx, ns)
if err != nil {
return nil, err
}
return c.createResult(result)
}
func asynchAttach(ctx context.Context, index int, n *Network, ns *Namespace, wg *sync.WaitGroup, rc chan asynchAttachResult) {
defer wg.Done()
r, err := n.Attach(ctx, ns)
rc <- asynchAttachResult{index: index, res: r, err: err}
}
func (c *libcni) attachNetworks(ctx context.Context, ns *Namespace) ([]*types100.Result, error) {
var wg sync.WaitGroup
var firstError error
results := make([]*types100.Result, len(c.Networks()))
rc := make(chan asynchAttachResult)
for i, network := range c.Networks() {
wg.Add(1)
go asynchAttach(ctx, i, network, ns, &wg, rc)
}
for range c.Networks() {
rs := <-rc
if rs.err != nil && firstError == nil {
firstError = rs.err
}
results[rs.index] = rs.res
}
wg.Wait()
return results, firstError
}
containerd的vendor/github.com/containerd/go-cni/namespace.go中
设置pod命名空间的网络
func (n *Network) Attach(ctx context.Context, ns *Namespace) (*types100.Result, error) {
r, err := n.cni.AddNetworkList(ctx, n.config, ns.config(n.ifName))
if err != nil {
return nil, err
}
return types100.NewResultFromResult(r)
}
containerd的vendor/github.com/containernetworking/cni/libcni/api.go中
准备命令和配置,调用命令执行
func (c *CNIConfig) AddNetworkList(ctx context.Context, list *NetworkConfigList, rt *RuntimeConf) (types.Result, error) {
var err error
var result types.Result
for _, net := range list.Plugins {
result, err = c.addNetwork(ctx, list.Name, list.CNIVersion, net, result, rt)
if err != nil {
return nil, fmt.Errorf("plugin %s failed (add): %w", pluginDescription(net.Network), err)
}
}
if err = c.cacheAdd(result, list.Bytes, list.Name, rt); err != nil {
return nil, fmt.Errorf("failed to set network %q cached result: %w", list.Name, err)
}
return result, nil
}
func (c *CNIConfig) addNetwork(ctx context.Context, name, cniVersion string, net *NetworkConfig, prevResult types.Result, rt *RuntimeConf) (types.Result, error) {
c.ensureExec()
pluginPath, err := c.exec.FindInPath(net.Network.Type, c.Path)
if err != nil {
return nil, err
}
if err := utils.ValidateContainerID(rt.ContainerID); err != nil {
return nil, err
}
if err := utils.ValidateNetworkName(name); err != nil {
return nil, err
}
if err := utils.ValidateInterfaceName(rt.IfName); err != nil {
return nil, err
}
newConf, err := buildOneConfig(name, cniVersion, net, prevResult, rt)
if err != nil {
return nil, err
}
return invoke.ExecPluginWithResult(ctx, pluginPath, newConf.Bytes, c.args("ADD", rt), c.exec)
}
containerd的vendor/github.com/containernetworking/cni/pkg/invoke/exec.go中
执行命令
func ExecPluginWithResult(ctx context.Context, pluginPath string, netconf []byte, args CNIArgs, exec Exec) (types.Result, error) {
if exec == nil {
exec = defaultExec
}
stdoutBytes, err := exec.ExecPlugin(ctx, pluginPath, netconf, args.AsEnv())
if err != nil {
return nil, err
}
resultVersion, fixedBytes, err := fixupResultVersion(netconf, stdoutBytes)
if err != nil {
return nil, err
}
return create.Create(resultVersion, fixedBytes)
}
flannel-cni的flannel.go中
cni入口
func main() {
fullVer := fmt.Sprintf("CNI Plugin %s version %s (%s/%s) commit %s built on %s", Program, Version, runtime.GOOS, runtime.GOARCH, Commit, buildDate)
skel.PluginMain(cmdAdd, cmdCheck, cmdDel, cni.All, fullVer)
}
配置pod网络
func cmdAdd(args *skel.CmdArgs) error {
n, err := loadFlannelNetConf(args.StdinData)
if err != nil {
return fmt.Errorf("loadFlannelNetConf failed: %w", err)
}
fenv, err := loadFlannelSubnetEnv(n.SubnetFile)
if err != nil {
return fmt.Errorf("loadFlannelSubnetEnv failed: %w", err)
}
if n.Delegate == nil {
n.Delegate = make(map[string]interface{})
} else {
if hasKey(n.Delegate, "type") && !isString(n.Delegate["type"]) {
return fmt.Errorf("'delegate' dictionary, if present, must have (string) 'type' field")
}
if hasKey(n.Delegate, "name") {
return fmt.Errorf("'delegate' dictionary must not have 'name' field, it'll be set by flannel")
}
if hasKey(n.Delegate, "ipam") {
return fmt.Errorf("'delegate' dictionary must not have 'ipam' field, it'll be set by flannel")
}
}
if n.RuntimeConfig != nil {
n.Delegate["runtimeConfig"] = n.RuntimeConfig
}
return doCmdAdd(args, n, fenv)
}
flannel-cni的flannel_linux.go中
linux下配置pod网络入口
func doCmdAdd(args *skel.CmdArgs, n *NetConf, fenv *subnetEnv) error {
n.Delegate["name"] = n.Name
if !hasKey(n.Delegate, "type") {
n.Delegate["type"] = "bridge"
}
if !hasKey(n.Delegate, "ipMasq") {
// if flannel is not doing ipmasq, we should
ipmasq := !*fenv.ipmasq
n.Delegate["ipMasq"] = ipmasq
}
if !hasKey(n.Delegate, "mtu") {
mtu := fenv.mtu
n.Delegate["mtu"] = mtu
}
if n.Delegate["type"].(string) == "bridge" {
if !hasKey(n.Delegate, "isGateway") {
n.Delegate["isGateway"] = true
}
}
if n.CNIVersion != "" {
n.Delegate["cniVersion"] = n.CNIVersion
}
ipam, err := getDelegateIPAM(n, fenv)
if err != nil {
return fmt.Errorf("failed to assemble Delegate IPAM: %w", err)
}
n.Delegate["ipam"] = ipam
fmt.Fprintf(os.Stderr, "\n%#v\n", n.Delegate)
return delegateAdd(args.ContainerID, n.DataDir, n.Delegate)
}
获取ipam配置
func getDelegateIPAM(n *NetConf, fenv *subnetEnv) (map[string]interface{}, error) {
ipam := n.IPAM
if ipam == nil {
ipam = map[string]interface{}{}
}
if !hasKey(ipam, "type") {
ipam["type"] = "host-local"
}
var rangesSlice [][]map[string]interface{}
if fenv.sn != nil && fenv.sn.String() != "" {
rangesSlice = append(rangesSlice, []map[string]interface{}{
{"subnet": fenv.sn.String()},
},
)
}
if fenv.ip6Sn != nil && fenv.ip6Sn.String() != "" {
rangesSlice = append(rangesSlice, []map[string]interface{}{
{"subnet": fenv.ip6Sn.String()},
},
)
}
ipam["ranges"] = rangesSlice
rtes, err := getIPAMRoutes(n)
if err != nil {
return nil, fmt.Errorf("failed to read IPAM routes: %w", err)
}
for _, nw := range fenv.nws {
if nw != nil {
rtes = append(rtes, types.Route{Dst: *nw})
}
}
for _, nw := range fenv.ip6Nws {
if nw != nil {
rtes = append(rtes, types.Route{Dst: *nw})
}
}
ipam["routes"] = rtes
return ipam, nil
}
flannel-cni的flannel.go中
最终下一个cni环节,这个场景下是host-local
func delegateAdd(cid, dataDir string, netconf map[string]interface{}) error {
netconfBytes, err := json.Marshal(netconf)
fmt.Fprintf(os.Stderr, "delegateAdd: netconf sent to delegate plugin:\n")
os.Stderr.Write(netconfBytes)
if err != nil {
return fmt.Errorf("error serializing delegate netconf: %v", err)
}
// save the rendered netconf for cmdDel
if err = saveScratchNetConf(cid, dataDir, netconfBytes); err != nil {
return err
}
result, err := invoke.DelegateAdd(context.TODO(), netconf["type"].(string), netconfBytes, nil)
if err != nil {
err = fmt.Errorf("failed to delegate add: %w", err)
return err
}
return result.Print()
}
网友评论