kubelet status manager,这部分功能主要是负责同步pod.status,status manager并不像controller一样,主动监听pod状态的变化,而是提供接口给其他manager调用,主要是pod manager和prober manager使用。
1. 初始化
kubelet的结构体中由statusManager的定义,因此在初始化过程中,创建statusManager对象。这段代码在NewMainKubelet()中。
// pkg/kubelet/kubelet.go: 671
func NewMainKubelet(...){
...
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
...
}
NewManager()方法很简单,就是赋值。下面先看下status manager的结构体定义。
2. 结构体定义
// pkg/kubelet/status/status_manager.go: 62
type manager struct {
// 连接kube-apiserver的client
kubeClient clientset.Interface
// kubelet的组件,维护内存中pod数据
podManager kubepod.Manager
// 相当于tatus manager的缓存,保存的是podUID和状态的对应关系
podStatuses map[types.UID]versionedPodStatus
// 互斥锁,更新podStatus时用
podStatusesLock sync.RWMutex
// 接收更新podStatus的管道,收到消息就触发一次sync动作
podStatusChannel chan podStatusSyncRequest
// 维护mirror pod的status的版本号,更新一次加1,互斥访问
apiStatusVersions map[kubetypes.MirrorPodUID]uint64
// 删除pod的接口
podDeletionSafety PodDeletionSafetyProvider
}
结构体中属性也相对较少,一眼就可以看出,status manager就是个工具类。
3. 对外接口
按照惯例,有了对象定义,就会实现一组接口或者方法,对外提供功能。Manager囊括了对外提供的接口定义。
// pkg/kubelet/status/status_manager.go: 91
type Manager interface {
// 这也是个接口,通过podUID获取podStatus对象
PodStatusProvider
// status manager的启动方法
Start()
// 设置pod.status,同时触发一次sync动作
SetPodStatus(pod *v1.Pod, status v1.PodStatus)
// 检查pod的containerStatus是否ready,不是触发sync
SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
// 设置pod的containerStatus是否完成startUp,不是触发sync
SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool)
// 设置pod的container和init containers的状态为terminated
TerminatePod(pod *v1.Pod)
// 删除statusManager中缓存的无用数据
RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
}
上面的接口都是对status字段操作,下面看一个pod.status具体长什么样:
status:
conditions:
- lastProbeTime: null
lastTransitionTime: "2020-02-22T05:54:27Z"
status: "True"
type: Initialized
- lastProbeTime: null
lastTransitionTime: "2020-02-22T05:54:37Z"
status: "True"
type: Ready
- lastProbeTime: null
lastTransitionTime: "2020-02-22T05:54:37Z"
status: "True"
type: ContainersReady
- lastProbeTime: null
lastTransitionTime: "2020-02-22T05:54:27Z"
status: "True"
type: PodScheduled
containerStatuses:
- containerID: docker://6bc05dd746f0cdc176efcc85a530f5936662d04909077c382491e8d5d38a7cdc
image: 132.65.110.170:20202/op_svc_pom/webhook-arm_64:1.0.50
imageID: docker://sha256:8623c55d61494ab0463c6febc3cab977df37563d467dab785e917cb7e22682b3
lastState: {}
name: webhook
ready: true
restartCount: 0
state:
running:
startedAt: "2020-02-22T05:52:28Z"
hostIP: 132.65.110.133
initContainerStatuses:
- containerID: docker://778b25dd389ffb59c1518b929c50ed37cdbe894590f7930f560eb54c351c491f
image: 132.65.110.170:20202/op_svc_pom/webhook-arm_64:1.0.50
imageID: docker://sha256:8623c55d61494ab0463c6febc3cab977df37563d467dab785e917cb7e22682b3
lastState: {}
name: webhook-init
ready: true
restartCount: 0
state:
terminated:
containerID: docker://778b25dd389ffb59c1518b929c50ed37cdbe894590f7930f560eb54c351c491f
exitCode: 0
finishedAt: "2020-02-22T05:52:26Z"
reason: Completed
startedAt: "2020-02-22T05:52:26Z"
managementIP: 132.65.110.133
phase: Running
podIP: 132.65.110.133
podNetworks:
- iP:
- ""
name: eth0
network: fst-manage
qosClass: Burstable
startTime: "2020-02-22T05:54:27Z"
3.1 Start()
首先看下start接口,kubelet的Run()方法,将statusManager跑起来的。
// pkg/kubelet/kubelet.go:1442
func Run(updates <-chan kubetypes.PodUpdate){
...
kl.statusManager.Start()
...
}
Start()代码如下:
// pkg/kubelet/status/status_manager.go: 149
func (m *manager) Start() {
if m.kubeClient == nil {
klog.Infof("Kubernetes client is nil, not starting status manager.")
return
}
klog.Info("Starting to sync pod status with apiserver")
// 1. 开启定时器,每隔10s触发一次批量同步
syncTicker := time.Tick(syncPeriod)
// 2. 启动协程,处理podStatusChannel传过来的待更新pod
go wait.Forever(func() {
select {
case syncRequest := <-m.podStatusChannel:
klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
m.syncPod(syncRequest.podUID, syncRequest.status)
case <-syncTicker:
m.syncBatch()
}
}, 0)
}
Start()就做了2件事,其一是作为消费者,等待管道那头的通知,sync某一个pod;其二是每个10秒来个批量同步。
m.syncPod()
// pkg/kubelet/status/status_manager.go: 516
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
// 1.判断要不要更新,不需要更新直接return
if !m.needsUpdate(uid, status) {
klog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid)
return
}
// 2.获取pod当前的实例
pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{})
if errors.IsNotFound(err) {
klog.V(3).Infof("Pod %q does not exist on the server", format.PodDesc(status.podName, status.podNamespace, uid))
// If the Pod is deleted the status will be cleared in
// RemoveOrphanedStatuses, so we just ignore the update here.
return
}
if err != nil {
klog.Warningf("Failed to get status for pod %q: %v", format.PodDesc(status.podName, status.podNamespace, uid), err)
return
}
// 3.拿到pod的真实UID,对于mirror pod,获取静态pod的id,其他pod就是转换id的类型
translatedUID := m.podManager.TranslatePodUID(pod.UID)
// 4.确认pod是否刚刚重建过,是就return
if len(translatedUID) > 0 && translatedUID != kubetypes.ResolvedPodUID(uid) {
klog.V(2).Infof("Pod %q was deleted and then recreated, skipping status update; old UID %q, new UID %q", format.Pod(pod), uid, translatedUID)
m.deletePodStatus(uid)
return
}
// 5. 检查通过,调用patch接口
oldStatus := pod.Status.DeepCopy()
newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status))
klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes)
if err != nil {
klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)
return
}
pod = newPod
klog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status)
m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version
// 6. 判断status是否可以删除,删除pod并清理statusManager保存该pod的缓存数据
if m.canBeDeleted(pod, status.status) {
deleteOptions := metav1.NewDeleteOptions(0)
// Use the pod UID as the precondition for deletion to prevent deleting a newly created pod with the same name and namespace.
deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(pod.UID))
err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions)
if err != nil {
klog.Warningf("Failed to delete status for pod %q: %v", format.Pod(pod), err)
return
}
klog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod))
m.deletePodStatus(uid)
}
}
上面代码的注释,基本描述了syncPod的主体逻辑,还有几个细节注意一下:
needUpdate()
除了mirror pod和已经删除的pod需要特殊处理,其他都要看pod的资源是不是都回收完成了,也就是PodResourcesAreReclaimed()。
// pkg/kubelet/status/status_manager.go: 572
func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool {
latest, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(uid)]
// 如果不是mirror pod或者缓存的status version较低,要更新
if !ok || latest < status.version {
return true
}
// pod不存在,不更新
pod, ok := m.podManager.GetPodByUID(uid)
if !ok {
return false
}
return m.canBeDeleted(pod, status.status)
}
func (m *manager) canBeDeleted(pod *v1.Pod, status v1.PodStatus) bool {
// 没有删除时间或者是mirror pod,不需要更新
if pod.DeletionTimestamp == nil || kubetypes.IsMirrorPod(pod) {
return false
}
return m.podDeletionSafety.PodResourcesAreReclaimed(pod, status)
}
PodResourcesAreReclaimed()
判断pod的资源是否回收完成,PodResourcesAreReclaimed()检查以下几个维度:
- pod中所有container都不是Running状态
- 检查kubelet的podCache,pod的container是否删干净
- pod的volume是否清理干净
- pod的cgroup是否清理干净
以上检查均通过,表示可以更新。
// pkg/kubelet/kubelet_pods.go: 912
func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
// 1.pod的container已经不是running状态,不更新
if !notRunning(status.ContainerStatuses) {
// We shouldn't delete pods that still have running containers
klog.V(3).Infof("Pod %q is terminated, but some containers are still running", format.Pod(pod))
return false
}
// 2.从kubelet保存的podCache中取出pod,pod已经被删除,或者container有残留,不更新
runtimeStatus, err := kl.podCache.Get(pod.UID)
if err != nil {
klog.V(3).Infof("Pod %q is terminated, Error getting runtimeStatus from the podCache: %s", format.Pod(pod), err)
return false
}
if len(runtimeStatus.ContainerStatuses) > 0 {
var statusStr string
for _, status := range runtimeStatus.ContainerStatuses {
statusStr += fmt.Sprintf("%+v ", *status)
}
klog.V(3).Infof("Pod %q is terminated, but some containers have not been cleaned up: %s", format.Pod(pod), statusStr)
return false
}
// 3.pod的volume还有残留,不更新
if kl.podVolumesExist(pod.UID) && !kl.keepTerminatedPodVolumes {
klog.V(3).Infof("Pod %q is terminated, but some volumes have not been cleaned up", format.Pod(pod))
return false
}
// 4.pod的cgroup没有清理,不更新
if kl.kubeletConfiguration.CgroupsPerQOS {
pcm := kl.containerManager.NewPodContainerManager()
if pcm.Exists(pod) {
klog.V(3).Infof("Pod %q is terminated, but pod cgroup sandbox has not been cleaned up", format.Pod(pod))
return false
}
}
return true
}
m.syncBacth()
在看m.syncBacth()代码之前,既然是全量更新,应该是维护status manager维护的缓存,调用syncPod依次更新。
// pkg/kubelet/status/status_manager.go: 471
func (m *manager) syncBatch() {
var updatedStatuses []podStatusSyncRequest
podToMirror, mirrorToPod := m.podManager.GetUIDTranslations()
// 核心逻辑包装成了匿名函数
func() { // Critical section
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
// 清理mirror pod中的残留
for uid := range m.apiStatusVersions {
_, hasPod := m.podStatuses[types.UID(uid)]
_, hasMirror := mirrorToPod[uid]
if !hasPod && !hasMirror {
delete(m.apiStatusVersions, uid)
}
}
// 遍历维护的缓存podStatus
for uid, status := range m.podStatuses {
syncedUID := kubetypes.MirrorPodUID(uid)
// mirror pod 跳过
if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok {
if mirrorUID == "" {
klog.V(5).Infof("Static pod %q (%s/%s) does not have a corresponding mirror pod; skipping", uid, status.podName, status.podNamespace)
continue
}
syncedUID = mirrorUID
}
if m.needsUpdate(types.UID(syncedUID), status) {
// 需要更新的加入待更新数组
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
} else if m.needsReconcile(uid, status.status) {
// status manager维护的缓存和请求apiserver得到的结果不一致的,也需要更新
delete(m.apiStatusVersions, syncedUID)
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
}
}
}()
// 依次更新
for _, update := range updatedStatuses {
klog.V(5).Infof("Status Manager: syncPod in syncbatch. pod UID: %q", update.podUID)
m.syncPod(update.podUID, update.status)
}
}
看了代码,果然不出所料。总结一下,syncBatch()做了以下操作:
- 获取当前static pod 和 mirror pod互相对应关系,删除status manager维护的apiStatusVersions中多余的数据
- 遍历status manager维护的podStatus缓存,筛选出status version不一致和pod status需要更新的的podUID,依次调用syncPod()方法更新。
needsReconcile()
其中有个needsReconcile()方法,判断入参status和从apiserver请求回来的status是否一致,核心就是检查pod.status.conditions内容是否一致。kubelet的conditions有:Unschedulable、PodScheduled、Initialized、ContainersReady、Ready。具体代码如下:
// pkg/kubelet/status/status_manager.go: 690
func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool {
// 1.请求apiserver,获取当前pod对象内容
pod, ok := m.podManager.GetPodByUID(uid)
if !ok {
klog.V(4).Infof("Pod %q has been deleted, no need to reconcile", string(uid))
return false
}
// 2.如果pod是静态pod,找到它的mirror pod
if kubetypes.IsStaticPod(pod) {
mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod)
if !ok {
klog.V(4).Infof("Static pod %q has no corresponding mirror pod, no need to reconcile", format.Pod(pod))
return false
}
pod = mirrorPod
}
podStatus := pod.Status.DeepCopy()
// 3.这个方法是podStatus中时间戳字段,精度降低,从*RFC339NANO* -> *RFC3339*
normalizeStatus(pod, podStatus)
// 4.检查podStatus的conditions,属于kubelet维护的那些conditions,检查是否一致
if isPodStatusByKubeletEqual(podStatus, &status) {
// If the status from the source is the same with the cached status,
// reconcile is not needed. Just return.
return false
}
klog.V(3).Infof("Pod status is inconsistent with cached status for pod %q, a reconciliation should be triggered:\n %s", format.Pod(pod),
diff.ObjectDiff(podStatus, &status))
return true
}
3.2 SetPodStatus()
给定pod设置pod status,是提供给kubelet的核心业务syncPod()调用的。方法很简单,就是调用updateStatusInternal()方法触发更新。
// pkg/kubelet/status/status_manager.go: 181
func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
for _, c := range pod.Status.Conditions {
if !kubetypes.PodConditionByKubelet(c.Type) {
klog.Errorf("Kubelet is trying to update pod condition %q for pod %q. "+
"But it is not owned by kubelet.", string(c.Type), format.Pod(pod))
}
}
// Make sure we're caching a deep copy.
status = *status.DeepCopy()
m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil)
}
m.updateStatusInternal()
核心业务都在这里实现,前面做一大堆准备工作,其实就是构建newPodStatus,然后往podStatusChannel发送一个信号,触发status manager的syncPod()动作。
// pkg/kubelet/status/status_manager.go: 364
func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate bool) bool {
// 1.从缓存中取出old status
var oldStatus v1.PodStatus
cachedStatus, isCached := m.podStatuses[pod.UID]
if isCached {
oldStatus = cachedStatus.status
} else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok {
oldStatus = mirrorPod.Status
} else {
oldStatus = pod.Status
}
// 2.检查pod的containers和init containers不会从teminated状态转化到非terminated
if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil {
klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
return false
}
if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil {
klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
return false
}
// 3.更新操作时间
updateLastTransitionTime(&status, &oldStatus, v1.ContainersReady)
updateLastTransitionTime(&status, &oldStatus, v1.PodReady)
updateLastTransitionTime(&status, &oldStatus, v1.PodInitialized)
updateLastTransitionTime(&status, &oldStatus, v1.PodScheduled)
// 4.维护status.startTime
if oldStatus.StartTime != nil && !oldStatus.StartTime.IsZero() {
status.StartTime = oldStatus.StartTime
} else if status.StartTime.IsZero() {
// if the status has no start time, we need to set an initial time
now := metav1.Now()
status.StartTime = &now
}
// 5.格式化时间戳
normalizeStatus(pod, &status)
// 6.只允许单调更新
if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate {
klog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
return false // No new status.
}
// 组装对象
newStatus := versionedPodStatus{
status: status,
version: cachedStatus.version + 1,
podName: pod.Name,
podNamespace: pod.Namespace,
}
m.podStatuses[pod.UID] = newStatus
select {
// 7.发送信号,触发syncPod动作
case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}:
klog.V(5).Infof("Status Manager: adding pod: %q, with status: (%d, %v) to podStatusChannel",
pod.UID, newStatus.version, newStatus.status)
return true
default:
klog.V(4).Infof("Skipping the status update for pod %q for now because the channel is full; status: %+v",
format.Pod(pod), status)
return false
}
}
其他的几个接口,代码流程基本一致,这里就不一一赘述了。
- SetContainerStartup()和SetContainerReadiness()是给prober manager用的,探针检查完毕后,更新下容器状态。startUp是0/1,readiness是1/1。
- TerminatePod()是kubelet在dispatchWork()时,做更新之前,检查container是否是待删除状态。
- RemoveOrphanedStatuses()是kubelet在执行houseKeeping时调用,cleanPod同时,顺便清理下status manager的无用缓存。
网友评论