背景
上次我们简单看了下kubelet作为客户端、服务端的认证和授权方式(https://www.jianshu.com/p/cb203dbc3dd0)
这次我们来看下kubelet的主要工作之一:创建POD
kubelet启动
kubelet的syncLoop
kubelet再启动的时候用于处理pod流程的是syncLoop,在syncLoop中通过select监听着多种信号,其中一种就是处于pod的增删改查的
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
...
switch u.Op {
case kubetypes.ADD:
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
handler.HandlePodUpdates(u.Pods)
...
kl.sourcesReady.AddSource(u.Source)
case e := <-plegCh:
...
case <-syncCh:
...
case update := <-kl.livenessManager.Updates():
...
case <-housekeepingCh:
...
return true
}
u, open := <-configCh
监听通道中的pod是怎么发送过来的呢?
POD从哪里来
在实际使用中,我们已经知道POD是通过Deployment -> ReplicaSet ->Pod
这么个过程产生的,所以我们直接看下ReplicaSet
的代码看下他是怎么提交POD到apiserver的
具体的代码在kube-controller-manager中的ReplicaSetController中,不是本文的重点,因此只是简略看下
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
// 查出这个命名空间下的所有POD
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
// 找出这个ReplicaSet的POD
filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
// 进行POD的副本调整
if rsNeedsSync && rs.DeletionTimestamp == nil {
manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
}
...
}
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
...
if diff < 0 {
// diff小于0 说明POD数量少了,需要创建POD
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
...
return err
})
...
} else if diff > 0 {
// diff大于0, 说明POD数量多了,需要删除POD
}
return nil
}
func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
// ReplicaSet中提取出POD的模板
// 然后把ReplicaSet中的selector设置到POD的labels里面
// 再把ReplicaSet设置为POD的ownerReference
pod, err := GetPodFromTemplate(template, object, controllerRef)
...
// 创建POD
newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
...
}
总结一下:
- ReplicaSet提交POD的流程还算简单,只需要对比期望副本数和实际副本数,来调整POD数量即可
- 创建POD的时候,从ReplicaSet中提取出POD的模板,然后把ReplicaSet中的selector设置到POD的labels里面,再把ReplicaSet设置为POD的ownerReference;然后就可以提交给Apiserver去创建了
kubelet接收POD
上述ReplicaSet创建了一个POD后,kubelet是怎么检测到的呢?
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)
}
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
}
r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
go r.Run(wait.NeverStop)
}
可以看到,kubelet实际watch的是spec.nodeName=当前节点的那些POD,spec.nodeName的值是kube-scheduler设置上去的;
与普通的watch不同,kubelet watch到pod事件后会把当前watch到的所有POD都发送到一个updates通道里
为了方便理解,假设kubelet是新启的,还没有任何pod,现在kubelet通过watch机制收到了一个POD ADD事件,那么就会把所有的pod都发送到updates通道里,由于目前只有一个pod,所以也只会发送一个pod到通道里
那通道收到这个POD ADD后会怎么处理呢?
func (m *Mux) Channel(source string) chan interface{} {
...
m.sources[source] = newChannel
go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
return newChannel
}
func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
for update := range listenChannel {
m.merger.Merge(source, update)
}
}
func (s *podStorage) Merge(source string, change interface{}) error {
// 收到的所有POD先和kubelet本地缓存的POD进行合并
// 区分出哪些是新增的、哪些是更新了的、哪些是删除了的、哪些是status变化了只需要reconcile即可
adds, updates, deletes, removes, reconciles := s.merge(source, change)
switch s.mode {
// 默认是这个模式,会把所有的POD都发往kubelet主循环里监听的通道
case PodConfigNotificationIncremental:
if len(removes.Pods) > 0 {
s.updates <- *removes
}
if len(adds.Pods) > 0 {
s.updates <- *adds
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
// Send an empty update when first seeing the source and there are
// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
// the source is ready.
s.updates <- *adds
}
// Only add reconcile support here, because kubelet doesn't support Snapshot update now.
if len(reconciles.Pods) > 0 {
s.updates <- *reconciles
}
}
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
...
updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
filtered := filterInvalidPods(newPods, source, s.recorder)
for _, ref := range filtered {
...
if existing, found := oldPods[ref.UID]; found {
pods[ref.UID] = existing
needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
if needUpdate {
updatePods = append(updatePods, existing)
} else if needReconcile {
reconcilePods = append(reconcilePods, existing)
} else if needGracefulDelete {
deletePods = append(deletePods, existing)
}
continue
}
recordFirstSeenTime(ref)
pods[ref.UID] = ref
addPods = append(addPods, ref)
}
}
switch update.Op {
case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
...
case kubetypes.REMOVE:
...
// 再前面的watch里面,发送的是SET事件
case kubetypes.SET:
klog.V(4).Infof("Setting pods for source %s", source)
s.markSourceSet(source)
// Clear the old map entries by just creating a new map
oldPods := pods
pods = make(map[types.UID]*v1.Pod)
updatePodsFunc(update.Pods, oldPods, pods)
for uid, existing := range oldPods {
if _, found := pods[uid]; !found {
// this is a delete
removePods = append(removePods, existing)
}
}
...
}
s.pods[source] = pods
adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
return adds, updates, deletes, removes, reconciles
}
可以看到PodConfig中会为每个source维护一个通道,目前只有一个source=api,然后再协程中一直监听这个通道,这个通道对应的就是watch后发送所有pod的通道;
通道收到数据后,就会和PodConfig中缓存的pod进行merge,区分出哪些是新增的、哪些是更新了的、哪些是删除了的、哪些是status变化了只需要reconcile即可
- 缓存中已经存在了的,需要进一步区分是update还是reconcile还是delete;
- 如果pod模板没变化,但是status变化了,那么是reconcile
- 如果pod模板变了,但是删除时间不为空,那么是delete
- 如果pod模板变了,删除时间也为空,那么是update
- 缓存中不存在的,是add
- 缓存中多出的,是remove
所有这些merge完成后的pod和对应的事件都会发送到kubelet的主循环监听的通道里
梳理一下整个流程:
-
kubelet通过watch一直监听spec.nodeName=当前节点的所有POD事件
-
收到POD的某个事件后就会将所有watch到的POD(相当于一次list)发送到一个缓存的通道里
-
通道收到这全部的POD后,就和和kubelet本地缓存的所有POD进行merge,进而对POD进行分类,ADD、DELETE、UPDATE、RECONCILE和REMOVE
-
然后再将这些分类后的POD发送到kubelet的syncLoop里,kubelet会对不同操作类型的POD进行不同的操作
创建POD
kubelet收到ADD事件后,会执行对应的创建操作
分发POD
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
// 所有POD按照创建时间排序
sort.Sort(sliceutils.PodsByCreationTime(pods))
// 这是当前需要创建的所有POD
for _, pod := range pods {
...
// 忽略一些静态POD的处理逻辑
// 分发这个POD,这里的类型是SyncPodCreate
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
//
kl.probeManager.AddPod(pod)
}
}
接收POD
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
...
// 添加这个POD,上一步指定了这里的类型是SyncPodCreate
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
OnCompleteFunc: ...,
})
...
}
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
...
// 检查是否已经处理过这个POD(UID)
if podUpdates, exists = p.podUpdates[uid]; !exists {
// 还没处理过的,记录已经处理过这个POD了,注意这里是大小为1的阻塞通道
podUpdates = make(chan UpdatePodOptions, 1)
p.podUpdates[uid] = podUpdates
// 启动一个协程来处理这个POD的事件
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
// ADD的时候会执行if中的逻辑,然后标记这个POD再处理中了,然后发送一个消息到上面的阻塞通道里
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
podUpdates <- *options
} else {
// if a request to kill a pod is pending, we do not let anything overwrite that request.
update, found := p.lastUndeliveredWorkUpdate[pod.UID]
if !found || update.UpdateType != kubetypes.SyncPodKill {
p.lastUndeliveredWorkUpdate[pod.UID] = *options
}
}
}
开始干活
真正干活的是上一步中启动的异步协程
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
var lastSyncTime time.Time
// 大小为1的通道上阻塞的读取,上一步中执行ADD逻辑的时候会发送一个消息到这个通道里
for update := range podUpdates {
err := func() error {
// 从缓存中获取这个POD的状态
// 这是一个阻塞操作,会一直等待直到缓存中存在当前POD的状态
status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
...
// 执行主要的syncPod逻辑
err = p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
lastSyncTime = time.Now()
return err
}()
...
p.wrapUp(update.Pod.UID, err)
}
}
上面的逻辑里主要包括两步,需要分别看下
第一步是:从缓存中阻塞获取POD的状态,那么POD的状态是何时丢到缓存中去的呢?
这就要回到kubelet的syncLoop中去了,再syncLoop中除了会监听POD的事件之外,还会监听PLEG信号(Pod Lifecycle Event Generate),即POD的生命历程里的各种事件生成信号,那他为什么和唤醒上面的阻塞读有关呢?
// kubelet启动的时候会有这么一步
kl.pleg.Start()
// 这个启动了一个异步协程每秒执行一次relist函数
func (g *GenericPLEG) Start() {
go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
}
// relist的执行逻辑
func (g *GenericPLEG) relist() {
// 调用CRI接口获取所有k8s容器,包括sandbox容器
// 因为k8s管理的容器会打一些特殊的标签,因此是可以区分开的
podList, err := g.runtime.GetPods(true)
// 生成事件
for pid := range g.podRecords {
oldPod := g.podRecords.getOld(pid)
pod := g.podRecords.getCurrent(pid)
// Get all containers in the old and the new pod.
allContainers := getContainersFromPods(oldPod, pod)
for _, container := range allContainers {
events := computeEvents(oldPod, pod, &container.ID)
for _, e := range events {
updateEvents(eventsByPodID, e)
}
}
}
// 发送事件到PLEG通道,由kubelet捕获
for pid, events := range eventsByPodID {
...
}
// 默认就是启用了缓存的
if g.cacheEnabled() {
// 更新缓存里的全局时间
g.cache.UpdateTime(timestamp)
}
}
初次ADD POD的时候,虽然CRI上什么容器都查不到,但是只要启用了缓存,就会对缓存的全局时间进行更新,这很重要
func (c *cache) UpdateTime(timestamp time.Time) {
...
c.timestamp = ×tamp
// Notify all the subscribers if the condition is met.
for id := range c.subscribers {
c.notify(id, *c.timestamp)
}
}
func (c *cache) notify(id types.UID, timestamp time.Time) {
...
for i, r := range list {
...
r.ch <- c.get(id)
close(r.ch)
}
if len(newList) == 0 {
delete(c.subscribers, id)
} else {
c.subscribers[id] = newList
}
}
func (c *cache) get(id types.UID) *data {
d, ok := c.pods[id]
if !ok {
return &data{status: &PodStatus{ID: id}, err: nil}
}
return d
}
上面更新了缓存的全局时间后,这列会对阻塞再缓存读取上的订阅者进行唤醒,唤醒的时候如果还没有这个POD的status数据,就会返回一个默认的只带UID的状态数据,然后从订阅者中删除这个UID的POD
因此pod worker才能从缓存阻塞获取状态的请求中被唤醒,然后开始执行 syncPod的主要逻辑
第二步 syncPod的逻辑主要就是创建容器相关的了
func (kl *Kubelet) syncPod(o syncPodOptions) error {
// 为这个POD生成status部分数据
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
// 检测这个POD能否再当前节点运行
runnable := kl.canRunPod(pod)
// 更新这个POD的status数据
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// 维护这个POD的Cgroup路径
pcm := kl.containerManager.NewPodContainerManager()
// 创建这个POD的数据目录 /var/lib/kubelet/uid/volumes、/var/lib/kubelet/uid/plugins
if err := kl.makePodDataDirs(pod)
// Volume manager will not mount volumes for terminated pods
if !kl.podIsTerminated(pod) {
// 等待volume挂载完成
if err := kl.volumeManager.WaitForAttachAndMount(pod);
}
// 获取镜像拉取Secret
pullSecrets := kl.getPullSecretsForPod(pod)
// 调用CRI创建容器
result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
}
网友评论