作用
当job创建时,如果设置了ttlSecondsAfterFinished且job已完成(condition中Complete或者Failed为True)
当job更新时,如果设置了ttlSecondsAfterFinished且job已完成(condition中Complete或者Failed为True)
图
image.png相关代码
前置相关代码
cmd/kube-controller-manager/app/controllermanager.go
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
...
controllers["ttl-after-finished"] = startTTLAfterFinishedController
...
return controllers
}
cmd/kube-controller-manager/app/core.go
func startTTLAfterFinishedController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
go ttlafterfinished.New(
controllerContext.InformerFactory.Batch().V1().Jobs(),
controllerContext.ClientBuilder.ClientOrDie("ttl-after-finished-controller"),
).Run(ctx, int(controllerContext.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs))
return nil, true, nil
}
pkg/controller/ttlafterfinished/ttlafterfinished_controller.go
func New(jobInformer batchinformers.JobInformer, client clientset.Interface) *Controller {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
ratelimiter.RegisterMetricAndTrackRateLimiterUsage("ttl_after_finished_controller", client.CoreV1().RESTClient().GetRateLimiter())
}
metrics.Register()
tc := &Controller{
client: client,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ttl-after-finished-controller"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttl_jobs_to_delete"),
}
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: tc.addJob,
UpdateFunc: tc.updateJob,
})
tc.jLister = jobInformer.Lister()
tc.jListerSynced = jobInformer.Informer().HasSynced
tc.clock = clock.RealClock{}
return tc
}
// Run starts the workers to clean up Jobs.
func (tc *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer tc.queue.ShutDown()
klog.Infof("Starting TTL after finished controller")
defer klog.Infof("Shutting down TTL after finished controller")
if !cache.WaitForNamedCacheSync("TTL after finished", ctx.Done(), tc.jListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, tc.worker, time.Second)
}
<-ctx.Done()
}
主要代码
pkg/controller/volume/pvprotection/pv_protection_controller.go
func (tc *Controller) addJob(obj interface{}) {
job := obj.(*batch.Job)
klog.V(4).Infof("Adding job %s/%s", job.Namespace, job.Name)
if job.DeletionTimestamp == nil && needsCleanup(job) {
tc.enqueue(job)
}
}
func (tc *Controller) updateJob(old, cur interface{}) {
job := cur.(*batch.Job)
klog.V(4).Infof("Updating job %s/%s", job.Namespace, job.Name)
if job.DeletionTimestamp == nil && needsCleanup(job) {
tc.enqueue(job)
}
}
func (tc *Controller) processJob(ctx context.Context, key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
klog.V(4).Infof("Checking if Job %s/%s is ready for cleanup", namespace, name)
// Ignore the Jobs that are already deleted or being deleted, or the ones that don't need clean up.
job, err := tc.jLister.Jobs(namespace).Get(name)
if errors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
if expiredAt, err := tc.processTTL(job); err != nil {
return err
} else if expiredAt == nil {
return nil
}
// The Job's TTL is assumed to have expired, but the Job TTL might be stale.
// Before deleting the Job, do a final sanity check.
// If TTL is modified before we do this check, we cannot be sure if the TTL truly expires.
// The latest Job may have a different UID, but it's fine because the checks will be run again.
fresh, err := tc.client.BatchV1().Jobs(namespace).Get(ctx, name, metav1.GetOptions{})
if errors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
// Use the latest Job TTL to see if the TTL truly expires.
expiredAt, err := tc.processTTL(fresh)
if err != nil {
return err
} else if expiredAt == nil {
return nil
}
// Cascade deletes the Jobs if TTL truly expires.
policy := metav1.DeletePropagationForeground
options := metav1.DeleteOptions{
PropagationPolicy: &policy,
Preconditions: &metav1.Preconditions{UID: &fresh.UID},
}
klog.V(4).Infof("Cleaning up Job %s/%s", namespace, name)
if err := tc.client.BatchV1().Jobs(fresh.Namespace).Delete(ctx, fresh.Name, options); err != nil {
return err
}
metrics.JobDeletionDurationSeconds.Observe(time.Since(*expiredAt).Seconds())
return nil
}
网友评论