美文网首页
k8s client-go leader election分析

k8s client-go leader election分析

作者: 陈先生_9e91 | 来源:发表于2021-09-28 16:25 被阅读0次

背景
多副本服务实例之间存在差异,比如服务存在定时任务等场景,所以需要从多个实例中选出一个leader处理这些定时任务,其他实例作为hot-standby

Argo

High-Availability (HA) - Argo Workflows - The workflow engine for Kubernetes

feat(controller): HA Leader election support on Workflow-controller by sarabala1979 · Pull Request #

argoproj/argo-workflows/workflow/controller/controller.go

leaderElectionOff := os.Getenv("LEADER_ELECTION_DISABLE")
if leaderElectionOff == "true" {
   log.Info("Leader election is turned off. Running in single-instance mode")
   logCtx := log.WithField("id", "single-instance")
   go wfc.startLeading(ctx, logCtx, podCleanupWorkers, workflowTTLWorkers, wfWorkers, podWorkers)
} else {
   nodeID, ok := os.LookupEnv("LEADER_ELECTION_IDENTITY")
   logCtx := log.WithField("id", nodeID)

   leaderName := "workflow-controller"
   if wfc.Config.InstanceID != "" {
      leaderName = fmt.Sprintf("%s-%s", leaderName, wfc.Config.InstanceID)
   }

   go leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
      Lock: &resourcelock.LeaseLock{
         LeaseMeta: metav1.ObjectMeta{Name: leaderName, Namespace: wfc.namespace}, Client: wfc.kubeclientset.CoordinationV1(),
         LockConfig: resourcelock.ResourceLockConfig{Identity: nodeID, EventRecorder: wfc.eventRecorderManager.Get(wfc.namespace)},
      },
      ReleaseOnCancel: true,
      LeaseDuration:   env.LookupEnvDurationOr("LEADER_ELECTION_LEASE_DURATION", 15*time.Second),
      RenewDeadline:   env.LookupEnvDurationOr("LEADER_ELECTION_RENEW_DEADLINE", 10*time.Second),
      RetryPeriod:     env.LookupEnvDurationOr("LEADER_ELECTION_RETRY_PERIOD", 5*time.Second),
      Callbacks: leaderelection.LeaderCallbacks{
         OnStartedLeading: func(ctx context.Context) {
            wfc.startLeading(ctx, logCtx, podCleanupWorkers, workflowTTLWorkers, wfWorkers, podWorkers)
         },
         OnStoppedLeading: func() {
            logCtx.Info("stopped leading")
            cancel()
         },
         OnNewLeader: func(identity string) {
            logCtx.WithField("leader", identity).Info("new leader")
         },
      },
   })
}

  1. 通过 LEADER_ELECTION_IDENTITY 获取实例id,一般使用metadata.name,即pod name
  2. 设置LeaderElectionConfig
  3. 通过callback处理不同的阶段,重点关注OnStartedLeading的callback startLeading()
func (wfc *WorkflowController) startLeading(ctx context.Context, logCtx *log.Entry, podCleanupWorkers int, workflowTTLWorkers int, wfWorkers int, podWorkers int) {
   defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

   logCtx.Info("started leading")

   for i := 0; i < podCleanupWorkers; i++ {
      go wait.UntilWithContext(ctx, wfc.runPodCleanup, time.Second)
   }
   go wfc.workflowGarbageCollector(ctx.Done())
   go wfc.archivedWorkflowGarbageCollector(ctx.Done())

   go wfc.runTTLController(ctx, workflowTTLWorkers)
   go wfc.runCronController(ctx)
   go wait.Until(wfc.syncWorkflowPhaseMetrics, 15*time.Second, ctx.Done())
   go wait.Until(wfc.syncPodPhaseMetrics, 15*time.Second, ctx.Done())

   go wait.Until(wfc.syncManager.CheckWorkflowExistence, workflowExistenceCheckPeriod, ctx.Done())

   for i := 0; i < wfWorkers; i++ {
      go wait.Until(wfc.runWorker, time.Second, ctx.Done())
   }
   for i := 0; i < podWorkers; i++ {
      go wait.Until(wfc.podWorker, time.Second, ctx.Done())
   }
}

在实例选举为leader的时候,会开启一系列定时任务/controller/gc等行为,这些操作都属于后台行为,不需要所有实例都开启,造成冲突

image

lock Workflow-controller被pod workflow-controller-84b5fffb9-777wk持有

client-go

example

k8s.io/client-go/examples/leader-election/main.go

// leader election uses the Kubernetes API by writing to a
// lock object, which can be a LeaseLock object (preferred),
// a ConfigMap, or an Endpoints (deprecated) object.
// Conflicting writes are detected and each client handles those actions
// independently.
config, err := buildConfig(kubeconfig)
if err != nil {
   klog.Fatal(err)
}
client := clientset.NewForConfigOrDie(config)

配置k8s client用于创建lease lock object

// we use the Lease lock type since edits to Leases are less common
// and fewer objects in the cluster watch "all Leases".
lock := &resourcelock.LeaseLock{
   LeaseMeta: metav1.ObjectMeta{
      Name:      leaseLockName,
      Namespace: leaseLockNamespace,
   },
   Client: client.CoordinationV1(),
   LockConfig: resourcelock.ResourceLockConfig{
      Identity: id,
   },
}

设置lease lock

// start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
   Lock: lock,
   // IMPORTANT: you MUST ensure that any code you have that
   // is protected by the lease must terminate **before**
   // you call cancel. Otherwise, you could have a background
   // loop still running and another process could
   // get elected before your background loop finished, violating
   // the stated goal of the lease.
   ReleaseOnCancel: true,
   LeaseDuration:   60 * time.Second,
   RenewDeadline:   15 * time.Second,
   RetryPeriod:     5 * time.Second,
   Callbacks: leaderelection.LeaderCallbacks{
      OnStartedLeading: func(ctx context.Context) {
         // we're notified when we start - this is where you would
         // usually put your code
         run(ctx)
      },
      OnStoppedLeading: func() {
         // we can do cleanup here
         klog.Infof("leader lost: %s", id)
         os.Exit(0)
      },
      OnNewLeader: func(identity string) {
         // we're notified when new leader elected
         if identity == id {
            // I just got the lock
            return
         }
         klog.Infof("new leader elected: %s", identity)
      },
   },
})

开启leader选举过程,特别注意我们需要确保在代码在lease lock失效OnStoppedLeading时终止,生效OnStartedLeading时运行,避免冲突

Lease

Make periodic NodeStatus updates cheaper · Issue #14733 · kubernetes/kubernetes

image image

code

k8s.io/client-go/tools/leaderelection/leaderelection.go

// RunOrDie starts a client with the provided config or panics if the config
// fails to validate. RunOrDie blocks until leader election loop is
// stopped by ctx or it has stopped holding the leader lease
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
   le, err := NewLeaderElector(lec)
   if err != nil {
      panic(err)
   }
   if lec.WatchDog != nil {
      lec.WatchDog.SetLeaderElection(le)
   }
   le.Run(ctx)
}

// Run starts the leader election loop. Run will not return
// before leader election loop is stopped by ctx or it has
// stopped holding the leader lease
func (le *LeaderElector) Run(ctx context.Context) {
   defer runtime.HandleCrash()
   defer func() {
      le.config.Callbacks.OnStoppedLeading()
   }()

   if !le.acquire(ctx) {
      return // ctx signalled done
   }
   ctx, cancel := context.WithCancel(ctx)
   defer cancel()
   go le.config.Callbacks.OnStartedLeading(ctx)
   le.renew(ctx)
}

// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
// Returns false if ctx signals done.
func (le *LeaderElector) acquire(ctx context.Context) bool {
   ctx, cancel := context.WithCancel(ctx)
   defer cancel()
   succeeded := false
   desc := le.config.Lock.Describe()
   klog.Infof("attempting to acquire leader lease %v...", desc)
   wait.JitterUntil(func() {
      succeeded = le.tryAcquireOrRenew(ctx)
      le.maybeReportTransition()
      if !succeeded {
         klog.V(4).Infof("failed to acquire lease %v", desc)
         return
      }
      le.config.Lock.RecordEvent("became leader")
      le.metrics.leaderOn(le.config.Name)
      klog.Infof("successfully acquired lease %v", desc)
      cancel()
   }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
   return succeeded
}

// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
// else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false.
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
   now := metav1.Now()
   leaderElectionRecord := rl.LeaderElectionRecord{
      HolderIdentity:       le.config.Lock.Identity(),
      LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
      RenewTime:            now,
      AcquireTime:          now,
   }

   // 1\. obtain or create the ElectionRecord
   oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
   if err != nil {
      if !errors.IsNotFound(err) {
         klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
         return false
      }
      if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
         klog.Errorf("error initially creating leader election record: %v", err)
         return false
      }

      le.setObservedRecord(&leaderElectionRecord)

      return true
   }

   // 2\. Record obtained, check the Identity & Time
   if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
      le.setObservedRecord(oldLeaderElectionRecord)

      le.observedRawRecord = oldLeaderElectionRawRecord
   }
   if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
      le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
      !le.IsLeader() {
      klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
      return false
   }

   // 3\. We're going to try to update. The leaderElectionRecord is set to it's default
   // here. Let's correct it before updating.
   if le.IsLeader() {
      leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
      leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
   } else {
      leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
   }

   // update the lock itself
   if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
      klog.Errorf("Failed to update lock: %v", err)
      return false
   }

   le.setObservedRecord(&leaderElectionRecord)
   return true
}

  1. obtain or create the ElectionRecord
  2. Record obtained, check the Identity & Time
  3. We're going to try to update. The leaderElectionRecord is set to it's default here. Let's correct it before updating.

相关文章

网友评论

      本文标题:k8s client-go leader election分析

      本文链接:https://www.haomeiwen.com/subject/pfrrnltx.html