-
Notifications
You must be signed in to change notification settings - Fork 158
feat(controller): Add leader election for high availability #851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,10 +27,14 @@ import ( | |
| "path" | ||
| "syscall" | ||
|
|
||
| "github.com/google/uuid" | ||
| "github.com/prometheus/client_golang/prometheus" | ||
| "github.com/prometheus/client_golang/prometheus/promhttp" | ||
| "github.com/urfave/cli/v2" | ||
|
|
||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| "k8s.io/client-go/tools/leaderelection" | ||
| "k8s.io/client-go/tools/leaderelection/resourcelock" | ||
| "k8s.io/component-base/logs" | ||
| "k8s.io/component-base/metrics/legacyregistry" | ||
| "k8s.io/klog/v2" | ||
|
|
@@ -56,7 +60,8 @@ const ( | |
| ) | ||
|
|
||
| type Flags struct { | ||
| kubeClientConfig pkgflags.KubeClientConfig | ||
| kubeClientConfig pkgflags.KubeClientConfig | ||
| leaderElectionConfig pkgflags.LeaderElectionConfig | ||
|
|
||
| podName string | ||
| namespace string | ||
|
|
@@ -157,6 +162,7 @@ func newApp() *cli.App { | |
| }, | ||
| } | ||
|
|
||
| cliFlags = append(cliFlags, flags.leaderElectionConfig.Flags()...) | ||
| cliFlags = append(cliFlags, flags.kubeClientConfig.Flags()...) | ||
| cliFlags = append(cliFlags, featureGateConfig.Flags()...) | ||
| cliFlags = append(cliFlags, loggingConfig.Flags()...) | ||
|
|
@@ -217,12 +223,19 @@ func newApp() *cli.App { | |
| controller := NewController(config) | ||
| ctx, cancel := context.WithCancel(c.Context) | ||
| go func() { | ||
| errChan <- controller.Run(ctx) | ||
| // Fallback to standalone mode if leader election is disabled | ||
| if !config.flags.leaderElectionConfig.Enabled { | ||
| klog.Info("Leader election disabled, starting controller directly") | ||
| errChan <- controller.Run(ctx) | ||
| return | ||
| } | ||
| errChan <- runWithLeaderElection(ctx, config, controller) | ||
| }() | ||
|
|
||
| for { | ||
| select { | ||
| case <-sigs: | ||
| case sig := <-sigs: | ||
| klog.InfoS("Received signal, shutting down", "signal", sig.String()) | ||
| cancel() | ||
| case err := <-errChan: | ||
| cancel() | ||
|
|
@@ -253,6 +266,109 @@ func newApp() *cli.App { | |
| return app | ||
| } | ||
|
|
||
| func runWithLeaderElection(ctx context.Context, config *Config, controller *Controller) error { | ||
| klog.Info("Leader election enabled") | ||
| // Unique identity: PodName + UUID to prevent conflicts on restarts | ||
| id := uuid.New().String() | ||
| lockID := fmt.Sprintf("%s-%s", config.flags.podName, id) | ||
| klog.InfoS("Leader election candidate registered", "lockID", lockID, | ||
| "leaseName", config.flags.leaderElectionConfig.LeaseLockName, | ||
| "leaseNamespace", config.flags.leaderElectionConfig.LeaseLockNamespace) | ||
|
|
||
| // electorCtx controls the lifecycle of the leader election loop | ||
| electorCtx, cancelElector := context.WithCancel(ctx) | ||
| // Standard defer to ensure resources are cleaned up on function exit | ||
| defer cancelElector() | ||
|
|
||
| lock := &resourcelock.LeaseLock{ | ||
| LeaseMeta: metav1.ObjectMeta{ | ||
| Name: config.flags.leaderElectionConfig.LeaseLockName, | ||
| Namespace: config.flags.leaderElectionConfig.LeaseLockNamespace, | ||
| }, | ||
| Client: config.clientsets.Core.CoordinationV1(), | ||
| LockConfig: resourcelock.ResourceLockConfig{ | ||
| Identity: lockID, | ||
| }, | ||
| } | ||
|
|
||
| controllerErrCh := make(chan error, 1) | ||
| callbacks := leaderelection.LeaderCallbacks{ | ||
| OnStartedLeading: func(leaderCtx context.Context) { | ||
| klog.InfoS("Became leader, starting controller", "lockID", lockID) | ||
|
|
||
| // ARCHITECTURE NOTE: | ||
| // We use cancelElector() to ensure that if the controller logic exits | ||
| // (either gracefully or with an error), the entire leader election loop | ||
| // terminates. This triggers ReleaseOnCancel, clearing the lease holder | ||
| // identity and allowing standby replicas to take over immediately. | ||
| // | ||
| // By returning from run() after elector.Run() finishes, we rely on | ||
| // Kubernetes to restart the Pod, ensuring a clean in-memory state | ||
| // for the next leadership term. | ||
| defer cancelElector() | ||
|
|
||
| // NOTE: Use leaderCtx provided by the callback. | ||
| // It is automatically cancelled if leadership is lost. | ||
| if err := controller.Run(leaderCtx); err != nil { | ||
| select { | ||
| case controllerErrCh <- err: | ||
| default: | ||
| } | ||
|
Comment on lines
+313
to
+316
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this select necessary? I would hope that erroring here would trigger Which will then block until the follow up call to: happens later (which we want to block to ensure the controller has shutdown completely before returning the error). |
||
| klog.ErrorS(err, "Controller exited with error", "lockID", lockID) | ||
| } else { | ||
| klog.InfoS("Controller exited gracefully", "lockID", lockID) | ||
| } | ||
| }, | ||
| OnStoppedLeading: func() { | ||
| // ARCHITECTURE NOTE: | ||
| // We only log here. The actual shutdown of the controller is handled by the | ||
| // cancellation of the leaderCtx passed to OnStartedLeading. | ||
| // When leadership is lost, the library cancels that context, triggering | ||
| // the controller's graceful shutdown logic. | ||
| klog.Warningf("Stopped leading, lockID: %s", lockID) | ||
| }, | ||
| OnNewLeader: func(identity string) { | ||
| // OnNewLeader is called when a new leader is observed. | ||
| // We ignore the case where the "new" leader is ourselves to avoid | ||
| // redundant logs during initial election or re-election. | ||
| if identity == lockID { | ||
| klog.V(6).InfoS("OnNewLeader callback: observed leader is still ourselves", "lockID", lockID) | ||
| return | ||
| } | ||
| klog.InfoS("New leader elected", "leader", identity, "currentCandidate", lockID) | ||
| }, | ||
| } | ||
|
|
||
| elector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ | ||
| Lock: lock, | ||
| LeaseDuration: config.flags.leaderElectionConfig.LeaseDuration, | ||
| RenewDeadline: config.flags.leaderElectionConfig.RenewDeadline, | ||
| RetryPeriod: config.flags.leaderElectionConfig.RetryPeriod, | ||
| Name: config.flags.leaderElectionConfig.LeaseLockName, | ||
| Callbacks: callbacks, | ||
| ReleaseOnCancel: true, // Steps down immediately by clearing the Lease holder | ||
| }) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to create leader elector: %w", err) | ||
| } | ||
|
|
||
| // Block until electorCtx is cancelled or leadership is lost | ||
| klog.InfoS("Starting leader election loop", "lockID", lockID) | ||
| elector.Run(electorCtx) | ||
|
|
||
| // If exiting due to a controller failure, propagate the error to main | ||
| select { | ||
| case err := <-controllerErrCh: | ||
| if err != nil { | ||
| klog.ErrorS(err, "Process exiting due to controller failure") | ||
| return fmt.Errorf("controller execution failed: %w", err) | ||
| } | ||
| default: | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As above, I don't think we want a We can / should block here until the controller has pushed an error into this channel.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @klueska , thanks for the review! Actually, making the channel unbuffered and removing the
The current buffered channel + non-blocking select pattern acts as a safe 'error mailbox' across goroutine boundaries, ensuring we never block the crucial
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. That makes sense. I'm still not 100% happy with the way it reads with these selects, but I'll defer to @shivamerla to decide if something should be changed here. |
||
| klog.InfoS("Leader election loop ended gracefully", "lockID", lockID) | ||
| return nil | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we always want to return
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In our updated version, we do not always return
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
| } | ||
|
|
||
| func SetupHTTPEndpoint(config *Config) error { | ||
| if config.flags.metricsPath != "" { | ||
| // To collect metrics data from the metric handler itself, we | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -185,6 +185,12 @@ webhook: | |
| caBundle: "" | ||
|
|
||
| controller: | ||
| replicas: 1 | ||
| leaderElection: | ||
| enabled: false | ||
| leaseDuration: "15s" | ||
| renewDeadline: "10s" | ||
| retryPeriod: "2s" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where did we take inspiration for these values? :)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. https://github.com/kubernetes/client-go/blob/v0.34.0/tools/leaderelection/leaderelection.go#L116 type LeaderElectionConfig struct {
// Lock is the resource that will be used for locking
Lock rl.Interface
// LeaseDuration is the duration that non-leader candidates will
// wait to force acquire leadership. This is measured against time of
// last observed ack.
//
// A client needs to wait a full LeaseDuration without observing a change to
// the record before it can attempt to take over. When all clients are
// shutdown and a new set of clients are started with different names against
// the same leader record, they must wait the full LeaseDuration before
// attempting to acquire the lease. Thus LeaseDuration should be as short as
// possible (within your tolerance for clock skew rate) to avoid a possible
// long waits in the scenario.
//
// Core clients default this value to 15 seconds.
LeaseDuration time.Duration
// RenewDeadline is the duration that the acting master will retry
// refreshing leadership before giving up.
//
// Core clients default this value to 10 seconds.
RenewDeadline time.Duration
// RetryPeriod is the duration the LeaderElector clients should wait
// between tries of actions.
//
// Core clients default this value to 2 seconds.
RetryPeriod time.Durationthe recommended defaults in the client-go |
||
| priorityClassName: "system-node-critical" | ||
| podAnnotations: {} | ||
| podSecurityContext: {} | ||
|
|
@@ -208,6 +214,14 @@ controller: | |
| - matchExpressions: | ||
| - key: "node-role.kubernetes.io/control-plane" | ||
| operator: "Exists" | ||
| podAntiAffinity: | ||
| preferredDuringSchedulingIgnoredDuringExecution: | ||
| - weight: 100 | ||
| podAffinityTerm: | ||
| labelSelector: | ||
| matchLabels: | ||
| nvidia-dra-driver-gpu-component: controller | ||
|
klueska marked this conversation as resolved.
|
||
| topologyKey: kubernetes.io/hostname | ||
| # Network policy settings | ||
| networkPolicy: | ||
| # If the network policy is enabled or not | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| /* | ||
| * Copyright 2025 NVIDIA CORPORATION. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package flags | ||
|
|
||
| import ( | ||
| "time" | ||
|
|
||
| "github.com/urfave/cli/v2" | ||
| ) | ||
|
|
||
| type LeaderElectionConfig struct { | ||
| Enabled bool | ||
| LeaseLockName string | ||
| LeaseLockNamespace string | ||
| LeaseDuration time.Duration | ||
| RenewDeadline time.Duration | ||
| RetryPeriod time.Duration | ||
| } | ||
|
|
||
| func (l *LeaderElectionConfig) Flags() []cli.Flag { | ||
| return []cli.Flag{ | ||
| &cli.BoolFlag{ | ||
| Category: "Leader election:", | ||
| Name: "leader-election-enabled", | ||
| Usage: "Start a leader election client and gain leadership before executing the main loop. Enable this when running replicated components for high availability.", | ||
| Value: false, | ||
| Destination: &l.Enabled, | ||
| EnvVars: []string{"LEADER_ELECTION_ENABLED"}, | ||
| }, | ||
| &cli.StringFlag{ | ||
| Category: "Leader election:", | ||
| Name: "leader-election-lease-lock-namespace", | ||
| Usage: "The lease lock resource namespace.", | ||
| Value: "default", | ||
| Destination: &l.LeaseLockNamespace, | ||
| EnvVars: []string{"LEADER_ELECTION_LEASE_LOCK_NAMESPACE"}, | ||
| }, | ||
| &cli.StringFlag{ | ||
| Category: "Leader election:", | ||
| Name: "leader-election-lease-lock-name", | ||
| Usage: "The lease lock resource name.", | ||
| Value: "nvidia-compute-domain-controller", | ||
| Destination: &l.LeaseLockName, | ||
| EnvVars: []string{"LEADER_ELECTION_LEASE_LOCK_NAME"}, | ||
| }, | ||
| &cli.DurationFlag{ | ||
| Category: "Leader election:", | ||
| Name: "leader-election-lease-duration", | ||
| Usage: "The duration that non-leader candidates will wait to force acquire leadership. This is measured against time of last observed ack.", | ||
| Value: 15 * time.Second, | ||
| Destination: &l.LeaseDuration, | ||
| EnvVars: []string{"LEADER_ELECTION_LEASE_DURATION"}, | ||
| }, | ||
| &cli.DurationFlag{ | ||
| Category: "Leader election:", | ||
| Name: "leader-election-renew-deadline", | ||
| Usage: "The duration that the acting controlplane will retry refreshing leadership before giving up.", | ||
| Value: 10 * time.Second, | ||
| Destination: &l.RenewDeadline, | ||
| EnvVars: []string{"LEADER_ELECTION_RENEW_DEADLINE"}, | ||
| }, | ||
| &cli.DurationFlag{ | ||
| Category: "Leader election:", | ||
| Name: "leader-election-retry-period", | ||
| Usage: "The duration the LeaderElector clients should wait between tries of actions.", | ||
| Value: 2 * time.Second, | ||
| Destination: &l.RetryPeriod, | ||
| EnvVars: []string{"LEADER_ELECTION_RETRY_PERIOD"}, | ||
| }, | ||
| } | ||
| } |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want this to be a buffered channel? I would think we want it to be unbuffered.