K8S ReplicaSet & ReplicationController
RC是RS的老版本API,代码都是一份。
For RC, the objects are converted on the way in and out (see ../replication/), as if ReplicationController were just an older API version of ReplicaSet. However, RC and RS still have separate storage and separate instantiations of the ReplicaSetController object.
type ReplicationManager struct {
replicaset.ReplicaSetController
}
code
K8S controller基本都是一个套路,通过Informer注册事件处理函数,Add事件处理都是将ev入队;
controller的run方法会启动一个goroutine,每隔一秒消费queue。所以跳过上述步骤,直接进入同步逻辑。
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
// 貌似每个controller都会很暴力的获取全量Pods,这个有点狠,为什么不加点过滤条件呢?
// 内存选出rs selector的Pods
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
for _, pod := range allPods {
if controller.IsPodActive(pod) {
filteredPods = append(filteredPods, pod)
}
}
filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
// 老套路,管理Pods,使之达到预期
rsc.manageReplicas(filteredPods, rs)
}
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
rsKey, err := controller.KeyFunc(rs)
if diff < 0 {
// "Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff
// batch create, 所有controller都这样,类似TCP拥塞控制,并发创建1,2,4。。。
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
boolPtr := func(b bool) *bool { return &b }
controllerRef := &metav1.OwnerReference{
APIVersion: rsc.GroupVersion().String(),
Kind: rsc.Kind,
Name: rs.Name,
UID: rs.UID,
BlockOwnerDeletion: boolPtr(true),
Controller: boolPtr(true),
}
err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
})
} else if diff > 0 {
// "Too many replicas for %v %s/%s, need %d, deleting %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff
// Choose which Pods to delete, preferring those in earlier phases of startup.
podsToDelete := getPodsToDelete(filteredPods, diff)
for _, pod := range podsToDelete {
go func(targetPod *v1.Pod) {
err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs);
}(pod)
}
}
}
网友评论