背景
多副本服务实例之间存在差异,比如服务存在定时任务等场景,所以需要从多个实例中选出一个leader处理这些定时任务,其他实例作为hot-standby
Argo
High-Availability (HA) - Argo Workflows - The workflow engine for Kubernetes
feat(controller): HA Leader election support on Workflow-controller by sarabala1979 · Pull Request #
argoproj/argo-workflows/workflow/controller/controller.go
leaderElectionOff := os.Getenv("LEADER_ELECTION_DISABLE")
if leaderElectionOff == "true" {
log.Info("Leader election is turned off. Running in single-instance mode")
logCtx := log.WithField("id", "single-instance")
go wfc.startLeading(ctx, logCtx, podCleanupWorkers, workflowTTLWorkers, wfWorkers, podWorkers)
} else {
nodeID, ok := os.LookupEnv("LEADER_ELECTION_IDENTITY")
logCtx := log.WithField("id", nodeID)
leaderName := "workflow-controller"
if wfc.Config.InstanceID != "" {
leaderName = fmt.Sprintf("%s-%s", leaderName, wfc.Config.InstanceID)
}
go leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{Name: leaderName, Namespace: wfc.namespace}, Client: wfc.kubeclientset.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{Identity: nodeID, EventRecorder: wfc.eventRecorderManager.Get(wfc.namespace)},
},
ReleaseOnCancel: true,
LeaseDuration: env.LookupEnvDurationOr("LEADER_ELECTION_LEASE_DURATION", 15*time.Second),
RenewDeadline: env.LookupEnvDurationOr("LEADER_ELECTION_RENEW_DEADLINE", 10*time.Second),
RetryPeriod: env.LookupEnvDurationOr("LEADER_ELECTION_RETRY_PERIOD", 5*time.Second),
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
wfc.startLeading(ctx, logCtx, podCleanupWorkers, workflowTTLWorkers, wfWorkers, podWorkers)
},
OnStoppedLeading: func() {
logCtx.Info("stopped leading")
cancel()
},
OnNewLeader: func(identity string) {
logCtx.WithField("leader", identity).Info("new leader")
},
},
})
}
- 通过
LEADER_ELECTION_IDENTITY
获取实例id,一般使用metadata.name,即pod name - 设置LeaderElectionConfig
- 通过callback处理不同的阶段,重点关注OnStartedLeading的callback startLeading()
func (wfc *WorkflowController) startLeading(ctx context.Context, logCtx *log.Entry, podCleanupWorkers int, workflowTTLWorkers int, wfWorkers int, podWorkers int) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)
logCtx.Info("started leading")
for i := 0; i < podCleanupWorkers; i++ {
go wait.UntilWithContext(ctx, wfc.runPodCleanup, time.Second)
}
go wfc.workflowGarbageCollector(ctx.Done())
go wfc.archivedWorkflowGarbageCollector(ctx.Done())
go wfc.runTTLController(ctx, workflowTTLWorkers)
go wfc.runCronController(ctx)
go wait.Until(wfc.syncWorkflowPhaseMetrics, 15*time.Second, ctx.Done())
go wait.Until(wfc.syncPodPhaseMetrics, 15*time.Second, ctx.Done())
go wait.Until(wfc.syncManager.CheckWorkflowExistence, workflowExistenceCheckPeriod, ctx.Done())
for i := 0; i < wfWorkers; i++ {
go wait.Until(wfc.runWorker, time.Second, ctx.Done())
}
for i := 0; i < podWorkers; i++ {
go wait.Until(wfc.podWorker, time.Second, ctx.Done())
}
}
在实例选举为leader的时候,会开启一系列定时任务/controller/gc等行为,这些操作都属于后台行为,不需要所有实例都开启,造成冲突
imagelock Workflow-controller被pod workflow-controller-84b5fffb9-777wk持有
client-go
example
k8s.io/client-go/examples/leader-election/main.go
// leader election uses the Kubernetes API by writing to a
// lock object, which can be a LeaseLock object (preferred),
// a ConfigMap, or an Endpoints (deprecated) object.
// Conflicting writes are detected and each client handles those actions
// independently.
config, err := buildConfig(kubeconfig)
if err != nil {
klog.Fatal(err)
}
client := clientset.NewForConfigOrDie(config)
配置k8s client用于创建lease lock object
// we use the Lease lock type since edits to Leases are less common
// and fewer objects in the cluster watch "all Leases".
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaseLockName,
Namespace: leaseLockNamespace,
},
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
}
设置lease lock
// start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
// IMPORTANT: you MUST ensure that any code you have that
// is protected by the lease must terminate **before**
// you call cancel. Otherwise, you could have a background
// loop still running and another process could
// get elected before your background loop finished, violating
// the stated goal of the lease.
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// we're notified when we start - this is where you would
// usually put your code
run(ctx)
},
OnStoppedLeading: func() {
// we can do cleanup here
klog.Infof("leader lost: %s", id)
os.Exit(0)
},
OnNewLeader: func(identity string) {
// we're notified when new leader elected
if identity == id {
// I just got the lock
return
}
klog.Infof("new leader elected: %s", identity)
},
},
})
开启leader选举过程,特别注意我们需要确保在代码在lease lock失效OnStoppedLeading
时终止,生效OnStartedLeading
时运行,避免冲突
Lease
Make periodic NodeStatus updates cheaper · Issue #14733 · kubernetes/kubernetes
image imagecode
k8s.io/client-go/tools/leaderelection/leaderelection.go
// RunOrDie starts a client with the provided config or panics if the config
// fails to validate. RunOrDie blocks until leader election loop is
// stopped by ctx or it has stopped holding the leader lease
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
le, err := NewLeaderElector(lec)
if err != nil {
panic(err)
}
if lec.WatchDog != nil {
lec.WatchDog.SetLeaderElection(le)
}
le.Run(ctx)
}
// Run starts the leader election loop. Run will not return
// before leader election loop is stopped by ctx or it has
// stopped holding the leader lease
func (le *LeaderElector) Run(ctx context.Context) {
defer runtime.HandleCrash()
defer func() {
le.config.Callbacks.OnStoppedLeading()
}()
if !le.acquire(ctx) {
return // ctx signalled done
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go le.config.Callbacks.OnStartedLeading(ctx)
le.renew(ctx)
}
// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
// Returns false if ctx signals done.
func (le *LeaderElector) acquire(ctx context.Context) bool {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
succeeded := false
desc := le.config.Lock.Describe()
klog.Infof("attempting to acquire leader lease %v...", desc)
wait.JitterUntil(func() {
succeeded = le.tryAcquireOrRenew(ctx)
le.maybeReportTransition()
if !succeeded {
klog.V(4).Infof("failed to acquire lease %v", desc)
return
}
le.config.Lock.RecordEvent("became leader")
le.metrics.leaderOn(le.config.Name)
klog.Infof("successfully acquired lease %v", desc)
cancel()
}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
return succeeded
}
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
// else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false.
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
RenewTime: now,
AcquireTime: now,
}
// 1\. obtain or create the ElectionRecord
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
klog.Errorf("error initially creating leader election record: %v", err)
return false
}
le.setObservedRecord(&leaderElectionRecord)
return true
}
// 2\. Record obtained, check the Identity & Time
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
le.setObservedRecord(oldLeaderElectionRecord)
le.observedRawRecord = oldLeaderElectionRawRecord
}
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
!le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}
// 3\. We're going to try to update. The leaderElectionRecord is set to it's default
// here. Let's correct it before updating.
if le.IsLeader() {
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
} else {
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}
// update the lock itself
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
return false
}
le.setObservedRecord(&leaderElectionRecord)
return true
}
- obtain or create the ElectionRecord
- Record obtained, check the Identity & Time
- We're going to try to update. The leaderElectionRecord is set to it's default here. Let's correct it before updating.
网友评论