接 https://www.jianshu.com/p/95cd1556966a
kubelet 会监听apiService的Pod变化,事件会发送到 listenChannel
//pkg/util/config/config.go
func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
//遍历事件
for update := range listenChannel {
m.merger.Merge(source, update)
}
}
listen方法遍历事件,调用Merge 持续处理Pod变更事件
//pkg/kubelet/config/config.go
func (s *podStorage) Merge(source string, change interface{}) error {
//分组
adds, updates, deletes, removes, reconciles := s.merge(source, change)
switch s.mode {
case PodConfigNotificationIncremental:
s.updates <- *adds
}
merge 方法根据内存中 Pod信息 将 change 分组为以下几种
//pkg/kubelet/types/pod_update.go
const (
// SET is the current pod configuration.
SET PodOperation = iota
// ADD signifies pods that are new to this source.
ADD
// DELETE signifies pods that are gracefully deleted from this source.
DELETE
// REMOVE signifies pods that have been removed from this source.
REMOVE
// UPDATE signifies pods have been updated in this source.
UPDATE
// RECONCILE signifies pods that have unexpected status in this source,
// kubelet should reconcile status with this source.
RECONCILE
)
s.mode 目前是PodConfigNotificationIncremental,s.updates <- *adds就是将要添加的Pod事件 发到 updates chan 。
//pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate,handler SyncHandler
{
select {
case u, open := <-configCh:
switch u.Op { case kubetypes.ADD:
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
configCh就是updates chan,流程到 handler.HandlePodAdditions(u.Pods)
//pkg/kubelet/kubelet.go
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
}
dispatchWork 分发任务
//pkg/kubelet/kubelet.go
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
// Run the sync in an async worker.
// 转 podWorkers 处理
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
StartTime: start,
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
请求 podWorkers.UpdatePod 参数是一个UpdatePodOptions
//pkg/kubelet/pod_workers.go
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
//获取当前的Pod sync状态
status, ok := p.podSyncStatuses[uid]
if !ok {
//获取失败说明是第一次
klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KObj(pod), "podUID", pod.UID)
status = &podSyncStatus{
syncedAt: now,
fullname: kubecontainer.GetPodFullName(pod),
}
// if this pod is being synced for the first time, we need to make sure it is an active pod
//如果是第一次 需要确保Pod是活跃的(PodFailed 或者 PodSucceeded 状态 )
if !isRuntimePod && (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) {
// check to see if the pod is not running and the pod is terminal.
// If this succeeds then record in the podWorker that it is terminated.
if statusCache, err := p.podCache.Get(pod.UID); err == nil {
if isPodStatusCacheTerminal(statusCache) {
//状态
status = &podSyncStatus{
terminatedAt: now,
terminatingAt: now,
syncedAt: now,
startedTerminating: true,
finished: true,
fullname: kubecontainer.GetPodFullName(pod),
}
}
}
}
//更新状态
p.podSyncStatuses[uid] = status
}
if status.IsTerminationRequested() {
if options.UpdateType == kubetypes.SyncPodCreate {
//Pod请求创建,但是当前是Terminationing状态,重置状态为 restartRequested =true
status.restartRequested = true
klog.V(4).InfoS("Pod is terminating but has been requested to restart with same UID, will be reconciled
later", "pod", klog.KObj(pod), "podUID", pod.UID)
return
}
}
// once a pod is terminated by UID, it cannot reenter the pod worker (until the UID is purged by housekeeping)
//Pod的状态是Finished的,不再处理
if status.IsFinished() {
klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KObj(pod), "podUID", pod.UID)
return
}
//任务类型
workType = SyncPodWork
// the desired work we want to be performing
//任务对象
work := podWork{
WorkType: workType,
Options: options,
}
if podUpdates, exists = p.podUpdates[uid]; !exists {
//任务没启动则启动
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
// dispatch a request to the pod worker if none are running
// 当前Pod没有处于 Sync
if !status.IsWorking() {
status.working = true
podUpdates <- work
return
}
}
managePodLoop 持续遍历 podUpdates chan,每个podUpdate事件调用 syncPodFn 处理
//pkg/kubelet/pod_workers.go
func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
for update := range podUpdates {
//获取比lastSyncTime 是更新的状态
status, err = p.podCache.GetNewerThan(pod.UID, lastSyncTime)
//syncPodFn 函数== /pkg/kubelet/kubelet.go#syncPod函数
err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status)
//重新进入队列
p.completeWork(pod, err)
}
syncPodFn 其实就是 pkg/kubelet/kubelet.go 的 syncPod 函数
//pkg/kubelet/kubelet.go
func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
//判断Pod是否可以被接纳(基于)
runnable := kl.canRunPod(pod)
//更新pod状态 由 pkg/kubelet/status/status_manager.go 负责处理
kl.statusManager.SetPodStatus(pod, apiPodStatus)
......
//创建Pod目录
//var/lib/kubelet/pods/{podUID}
// var/lib/kubelet/pods/{podUID}//volumes
// var/lib/kubelet/pods/{podUID}//plugins
if err := kl.makePodDataDirs(pod); err != nil {
//获得pullImage 的 secrets
pullSecrets := kl.getPullSecretsForPod(pod)
result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
}
syncPod 函数通过statusManager 更新 Pod状态, 创建目录,调用 pkg/kubelet/kuberuntime/kuberuntime_manager.go 的 SyncPod,SyncPod 方法,官方代码也给出了提示,共7个步骤
//pkg/kubelet/kuberuntime/kuberuntime_manager.go
// 1. Compute sandbox and container changes. 计算sandbox和 container的变化
// 2. Kill pod sandbox if necessary. 如果有需要就干掉sandbox
// 3. Kill any containers that should not be running. 干掉所有不需要运行的container
// 4. Create sandbox if necessary. 如果有需要就创建sandbox
// 5. Create ephemeral containers. 创建临时的 container
// 6. Create init containers. 创建init container
// 7. Create normal containers. 创建正常container
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
//Step1
.....
}
- Compute sandbox and container changes. 计算sandbox和 container的变化
/Step 1: Compute sandbox and container changes.
podContainerChanges := m.computePodActions(pod, podStatus)
klog.V(3).InfoS("computePodActions got for pod", "podActions", podContainerChanges, "pod", klog.KObj(pod))
if podContainerChanges.CreateSandbox {
ref, err := ref.GetReference(legacyscheme.Scheme, pod)
if err != nil {
klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
}
if podContainerChanges.SandboxID != "" {
//Pod 变更 需要Kill 和 re-Created
m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
} else {
//新Pod 需要创建
klog.V(4).InfoS("SyncPod received new pod, will create a sandbox for it", "pod", klog.KObj(pod))
}
}
- Kill pod sandbox if necessary. 如果有需要就干掉sandbox
- Kill any containers that should not be running. 干掉所有不需要运行的container
if podContainerChanges.KillPod {
// Step 2: Kill the pod if the sandbox has changed.
if podContainerChanges.CreateSandbox {
klog.V(4).InfoS("Stopping PodSandbox for pod, will start new one", "pod", klog.KObj(pod))
} else {
klog.V(4).InfoS("Stopping PodSandbox for pod, because all other containers are dead", "pod", klog.KObj(pod))
}
//通过CRI kill pod和 container 分别对应调用CRI 的 StopPodSandbox 和 StopContainer
killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
result.AddPodSyncResult(killResult)
if killResult.Error() != nil {
klog.ErrorS(killResult.Error(), "killPodWithSyncResult failed")
return
}
if podContainerChanges.CreateSandbox {
m.purgeInitContainers(pod, podStatus)
}
} else {
// Step 3: kill any running containers in this pod which are not to keep.
Step2和Step3互斥
- Create sandbox if necessary. 如果有需要就创建sandbox
//pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
Step5,Step6,Step7 分别启动 临时容器,init容器,普通容器
//pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
// 创建日志目录 /var/log/pods/{podNamespace}_{podName}_{podUID}
err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
//runtimeHandler 具体的容器运行时Handler
runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName)
//创建PodSandbox
podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
// Step 5: start ephemeral containers
// These are started "prior" to init containers to allow running ephemeral containers even when there
// are errors starting an init container. In practice init containers will start first since ephemeral
// containers cannot be specified on pod creation.
if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
for _, idx := range podContainerChanges.EphemeralContainersToStart {
//启动临时容器
start("ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
}
}
// Step 6: start the init container.
if container := podContainerChanges.NextInitContainerToStart; container != nil {
// Start the next init container.
//启动init容器
if err := start("init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
return
}
// Successfully started the container; clear the entry in the failure
klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
}
// Step 7: start containers in podContainerChanges.ContainersToStart.
for _, idx := range podContainerChanges.ContainersToStart {
//启动容器
start("container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
}
start 方法是启动容器分别为: pull 镜像,创建容器,启动容器,执行 生命周期Hook
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
// Step 1: pull the image. (imageManager负责)
imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
// Step 2: create the container. 通过调用 CRI 的 CreateContainer
containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
// Step 3: start the container. 通过调用CRI StartContainer
err = m.runtimeService.StartContainer(containerID)
}
网友评论