From 49ee92bc6f553e3c38c6e7fa787895a3fac9db19 Mon Sep 17 00:00:00 2001 From: ontave Date: Wed, 6 May 2026 20:54:07 +0200 Subject: [PATCH] feat: machineconfig-restore Conductor capability + 5 unit tests (session/25) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds machineconfig-restore named capability: reads TalosMachineConfigRestore CR for backupTimestamp/s3SourceBucket/targetNodes, downloads per-node config from S3 at {cluster}/machineconfigs/{ts}/{hostname}.yaml, applies via ApplyConfiguration, waits for node stable. Non-fatal per node. Registers handler in stubs.go. Adds CapabilityMachineConfigRestore constant to runnerlib. conductor-schema.md §6, platform-schema.md §11. --- internal/capability/platform_machineconfig.go | 209 ++++++++++++++++ .../capability/platform_machineconfig_test.go | 225 ++++++++++++++++++ internal/capability/stubs.go | 1 + pkg/runnerlib/constants.go | 5 + 4 files changed, 440 insertions(+) diff --git a/internal/capability/platform_machineconfig.go b/internal/capability/platform_machineconfig.go index d39b27a..f7aa1de 100644 --- a/internal/capability/platform_machineconfig.go +++ b/internal/capability/platform_machineconfig.go @@ -10,6 +10,7 @@ import ( "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" sigsyaml "sigs.k8s.io/yaml" @@ -142,3 +143,211 @@ func sanitizeHostname(nodeIP string) string { return r.Replace(nodeIP) } +// machineConfigRestoreGVR is the GroupVersionResource for TalosMachineConfigRestore. +var machineConfigRestoreGVR = schema.GroupVersionResource{ + Group: "platform.ontai.dev", + Version: "v1alpha1", + Resource: "talosmachineconfigrestores", +} + +// machineConfigRestoreHandler implements the machineconfig-restore named capability. +// For each target node it downloads the config from S3 at +// {cluster}/machineconfigs/{backupTimestamp}/{hostname}.yaml, confirms the node is +// reachable via GetMachineConfig, then applies the restored config via ApplyConfiguration. +// Non-fatal per node -- failures are recorded in Steps and execution continues. +// platform-schema.md §11. +type machineConfigRestoreHandler struct{} + +func (h *machineConfigRestoreHandler) Execute(ctx context.Context, params ExecuteParams) (runnerlib.OperationResultSpec, error) { + now := time.Now().UTC() + + if params.TalosClient == nil || params.StorageClient == nil || params.DynamicClient == nil { + return failureResult(runnerlib.CapabilityMachineConfigRestore, now, runnerlib.ValidationFailure, + "machineconfig-restore requires TalosClient, StorageClient, and DynamicClient"), nil + } + + // Read TalosMachineConfigRestore CR to get backupTimestamp, targetNodes, s3SourceBucket. + ns := tenantNamespace(params.ClusterRef) + crList, err := params.DynamicClient.Resource(machineConfigRestoreGVR).Namespace(ns). + List(ctx, metav1.ListOptions{}) + if err != nil { + return failureResult(runnerlib.CapabilityMachineConfigRestore, now, runnerlib.ExecutionFailure, + fmt.Sprintf("list TalosMachineConfigRestore in %s: %v", ns, err)), nil + } + + var backupTimestamp, s3Bucket string + var targetNodes []string + for _, item := range crList.Items { + ts, _, _ := unstructuredString(item.Object, "spec", "backupTimestamp") + if ts == "" { + continue + } + backupTimestamp = ts + s3Bucket, _, _ = unstructuredString(item.Object, "spec", "s3SourceBucket") + if rawNodes, ok, _ := unstructured.NestedStringSlice(item.Object, "spec", "targetNodes"); ok { + targetNodes = rawNodes + } + break + } + + if backupTimestamp == "" { + return failureResult(runnerlib.CapabilityMachineConfigRestore, now, runnerlib.ValidationFailure, + fmt.Sprintf("no TalosMachineConfigRestore CR with backupTimestamp found in %s", ns)), nil + } + if s3Bucket == "" { + return failureResult(runnerlib.CapabilityMachineConfigRestore, now, runnerlib.ValidationFailure, + fmt.Sprintf("no TalosMachineConfigRestore CR with s3SourceBucket found in %s", ns)), nil + } + + // Build target set for filtering (nil = all nodes). + targetSet := make(map[string]bool, len(targetNodes)) + for _, n := range targetNodes { + targetSet[n] = true + } + + // Determine node IPs. Production: read from talosconfig. Test: single sentinel. + var nodeIPs []string + if params.TalosconfigPath != "" { + ips, epErr := EndpointsFromTalosconfig(params.TalosconfigPath) + if epErr != nil { + return failureResult(runnerlib.CapabilityMachineConfigRestore, now, runnerlib.ExecutionFailure, + fmt.Sprintf("read endpoints from talosconfig: %v", epErr)), nil + } + nodeIPs = ips + } else { + nodeIPs = []string{"node"} + } + + var steps []runnerlib.StepResult + var artifacts []runnerlib.ArtifactRef + var failedAny bool + + for _, nodeIP := range nodeIPs { + nodeCtx := ctx + if nodeIP != "node" { + nodeCtx = NodeContext(ctx, nodeIP) + } + + stepStart := time.Now().UTC() + + // Confirm node reachability and resolve hostname for the S3 key. + currentConfig, err := params.TalosClient.GetMachineConfig(nodeCtx) + hostname := sanitizeHostname(nodeIP) + if err != nil { + steps = append(steps, runnerlib.StepResult{ + Name: fmt.Sprintf("restore-%s", hostname), + Status: runnerlib.ResultFailed, + StartedAt: stepStart, + CompletedAt: time.Now().UTC(), + Message: fmt.Sprintf("GetMachineConfig on %s: node unreachable: %v", nodeIP, err), + }) + failedAny = true + continue + } + // Extract hostname from current config so the S3 key matches the backup. + var cfg minimalMachineConfigHostname + if unmarshalErr := sigsyaml.Unmarshal(currentConfig, &cfg); unmarshalErr == nil && cfg.Machine.Network.Hostname != "" { + hostname = cfg.Machine.Network.Hostname + } + + // Filter by targetNodes when set. + if len(targetSet) > 0 && !targetSet[hostname] { + continue + } + + // Download backup config from S3. + s3Key := fmt.Sprintf("%s/machineconfigs/%s/%s.yaml", params.ClusterRef, backupTimestamp, hostname) + rc, dlErr := params.StorageClient.Download(nodeCtx, s3Bucket, s3Key) + if dlErr != nil { + steps = append(steps, runnerlib.StepResult{ + Name: fmt.Sprintf("restore-%s", hostname), + Status: runnerlib.ResultFailed, + StartedAt: stepStart, + CompletedAt: time.Now().UTC(), + Message: fmt.Sprintf("download s3://%s/%s: %v", s3Bucket, s3Key, dlErr), + }) + failedAny = true + continue + } + restoredConfig, readErr := readCloserToBytes(rc) + if readErr != nil { + steps = append(steps, runnerlib.StepResult{ + Name: fmt.Sprintf("restore-%s", hostname), + Status: runnerlib.ResultFailed, + StartedAt: stepStart, + CompletedAt: time.Now().UTC(), + Message: fmt.Sprintf("read s3://%s/%s: %v", s3Bucket, s3Key, readErr), + }) + failedAny = true + continue + } + + // Apply restored config. + if applyErr := params.TalosClient.ApplyConfiguration(nodeCtx, restoredConfig, "no-reboot"); applyErr != nil { + steps = append(steps, runnerlib.StepResult{ + Name: fmt.Sprintf("restore-%s", hostname), + Status: runnerlib.ResultFailed, + StartedAt: stepStart, + CompletedAt: time.Now().UTC(), + Message: fmt.Sprintf("ApplyConfiguration on %s: %v", nodeIP, applyErr), + }) + failedAny = true + continue + } + + // Wait for the node to stabilise after config apply. + if nodeIP != "node" { + if wErr := waitForNodeStable(ctx, params.TalosClient, nodeIP); wErr != nil { + steps = append(steps, runnerlib.StepResult{ + Name: fmt.Sprintf("restore-%s", hostname), + Status: runnerlib.ResultFailed, + StartedAt: stepStart, + CompletedAt: time.Now().UTC(), + Message: fmt.Sprintf("waitForNodeStable on %s: %v", nodeIP, wErr), + }) + failedAny = true + continue + } + } + + steps = append(steps, runnerlib.StepResult{ + Name: fmt.Sprintf("restore-%s", hostname), + Status: runnerlib.ResultSucceeded, + StartedAt: stepStart, + CompletedAt: time.Now().UTC(), + Message: fmt.Sprintf("machine config for %s restored from s3://%s/%s", hostname, s3Bucket, s3Key), + }) + artifacts = append(artifacts, runnerlib.ArtifactRef{ + Name: fmt.Sprintf("restored-machineconfig-%s", hostname), + Kind: "S3Object", + Reference: fmt.Sprintf("s3://%s/%s", s3Bucket, s3Key), + }) + } + + overallStatus := runnerlib.ResultSucceeded + if failedAny && len(artifacts) == 0 { + // All nodes failed. + return failureResult(runnerlib.CapabilityMachineConfigRestore, now, runnerlib.ExecutionFailure, + "machineconfig-restore: all target nodes failed"), nil + } + + return runnerlib.OperationResultSpec{ + Capability: runnerlib.CapabilityMachineConfigRestore, + Status: overallStatus, + StartedAt: now, + CompletedAt: time.Now().UTC(), + Artifacts: artifacts, + Steps: steps, + }, nil +} + +// readCloserToBytes drains an io.ReadCloser into a byte slice and closes it. +func readCloserToBytes(rc interface{ Read([]byte) (int, error); Close() error }) ([]byte, error) { + defer rc.Close() + var buf bytes.Buffer + if _, err := buf.ReadFrom(rc); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + diff --git a/internal/capability/platform_machineconfig_test.go b/internal/capability/platform_machineconfig_test.go index d1aa17a..88256aa 100644 --- a/internal/capability/platform_machineconfig_test.go +++ b/internal/capability/platform_machineconfig_test.go @@ -265,3 +265,228 @@ func findSubstring(s, sub string) bool { } return false } + +// -- machineconfig-restore tests -- + +// stubStorageRestore supports both Upload and Download with configurable responses. +type stubStorageRestore struct { + downloadData []byte + downloadErr error + uploads []stubUploadCall +} + +func (s *stubStorageRestore) Upload(_ context.Context, bucket, key string, r io.Reader) error { + data, _ := io.ReadAll(r) + s.uploads = append(s.uploads, stubUploadCall{bucket: bucket, key: key, data: data}) + return nil +} + +func (s *stubStorageRestore) Download(_ context.Context, _, _ string) (io.ReadCloser, error) { + if s.downloadErr != nil { + return nil, s.downloadErr + } + return io.NopCloser(bytes.NewReader(s.downloadData)), nil +} + +// stubTalosClientRestore supports ApplyConfiguration errors. +type stubTalosClientRestore struct { + configYAML []byte + configErr error + applyErr error +} + +func (s *stubTalosClientRestore) Bootstrap(_ context.Context) error { return nil } +func (s *stubTalosClientRestore) ApplyConfiguration(_ context.Context, _ []byte, _ string) error { + return s.applyErr +} +func (s *stubTalosClientRestore) Upgrade(_ context.Context, _ string, _ bool) error { return nil } +func (s *stubTalosClientRestore) Reboot(_ context.Context) error { return nil } +func (s *stubTalosClientRestore) Reset(_ context.Context, _ bool) error { return nil } +func (s *stubTalosClientRestore) EtcdSnapshot(_ context.Context, _ io.Writer) error { return nil } +func (s *stubTalosClientRestore) EtcdRecover(_ context.Context, _ io.Reader) error { return nil } +func (s *stubTalosClientRestore) EtcdDefragment(_ context.Context) error { return nil } +func (s *stubTalosClientRestore) Kubeconfig(_ context.Context) ([]byte, error) { return nil, nil } +func (s *stubTalosClientRestore) Nodes() []string { return nil } +func (s *stubTalosClientRestore) Health(_ context.Context) error { return nil } +func (s *stubTalosClientRestore) Close() error { return nil } +func (s *stubTalosClientRestore) GetMachineConfig(_ context.Context) ([]byte, error) { + return s.configYAML, s.configErr +} + +// buildMachineConfigRestoreScheme returns a scheme with TalosMachineConfigRestore registered. +func buildMachineConfigRestoreScheme() *runtime.Scheme { + s := runtime.NewScheme() + s.AddKnownTypeWithName( + schema.GroupVersionKind{Group: "platform.ontai.dev", Version: "v1alpha1", Kind: "TalosMachineConfigRestore"}, + &unstructured.Unstructured{}, + ) + s.AddKnownTypeWithName( + schema.GroupVersionKind{Group: "platform.ontai.dev", Version: "v1alpha1", Kind: "TalosMachineConfigRestoreList"}, + &unstructured.UnstructuredList{}, + ) + return s +} + +// makeMachineConfigRestoreCR builds an unstructured TalosMachineConfigRestore. +func makeMachineConfigRestoreCR(namespace, name, backupTimestamp, s3Bucket string, targetNodes []string) *unstructured.Unstructured { + obj := &unstructured.Unstructured{} + obj.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "platform.ontai.dev", + Version: "v1alpha1", + Kind: "TalosMachineConfigRestore", + }) + obj.SetNamespace(namespace) + obj.SetName(name) + _ = unstructured.SetNestedField(obj.Object, backupTimestamp, "spec", "backupTimestamp") + _ = unstructured.SetNestedField(obj.Object, s3Bucket, "spec", "s3SourceBucket") + if len(targetNodes) > 0 { + nodes := make([]interface{}, len(targetNodes)) + for i, n := range targetNodes { + nodes[i] = n + } + _ = unstructured.SetNestedSlice(obj.Object, nodes, "spec", "targetNodes") + } + return obj +} + +func TestMachineConfigRestoreHandler_NilClients(t *testing.T) { + h := &machineConfigRestoreHandler{} + params := ExecuteParams{ + ClusterRef: "ccs-dev", + // All clients nil. + } + result, err := h.Execute(context.Background(), params) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.Status != runnerlib.ResultFailed { + t.Errorf("expected ResultFailed, got %q", result.Status) + } + if result.FailureReason == nil || result.FailureReason.Category != runnerlib.ValidationFailure { + t.Errorf("expected ValidationFailure, got %v", result.FailureReason) + } +} + +func TestMachineConfigRestoreHandler_NoCR(t *testing.T) { + scheme := buildMachineConfigRestoreScheme() + dynClient := fake.NewSimpleDynamicClient(scheme) + + h := &machineConfigRestoreHandler{} + params := ExecuteParams{ + ClusterRef: "ccs-dev", + ExecuteClients: ExecuteClients{ + TalosClient: &stubTalosClientRestore{configYAML: []byte("machine:\n network:\n hostname: cp1\n")}, + StorageClient: &stubStorageRestore{downloadData: []byte("machine:\n network:\n hostname: cp1\n")}, + DynamicClient: dynClient, + }, + } + result, err := h.Execute(context.Background(), params) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.Status != runnerlib.ResultFailed { + t.Errorf("expected ResultFailed when no CR, got %q", result.Status) + } + if result.FailureReason == nil || result.FailureReason.Category != runnerlib.ValidationFailure { + t.Errorf("expected ValidationFailure, got %v", result.FailureReason) + } +} + +func TestMachineConfigRestoreHandler_Success_SingleNode(t *testing.T) { + scheme := buildMachineConfigRestoreScheme() + ns := "seam-tenant-ccs-dev" + cr := makeMachineConfigRestoreCR(ns, "mcr-test", "20240101T000000Z", "my-bucket", nil) + dynClient := fake.NewSimpleDynamicClient(scheme, cr) + + configYAML := []byte("machine:\n network:\n hostname: cp1\n") + storage := &stubStorageRestore{downloadData: configYAML} + + h := &machineConfigRestoreHandler{} + params := ExecuteParams{ + ClusterRef: "ccs-dev", + ExecuteClients: ExecuteClients{ + TalosClient: &stubTalosClientRestore{configYAML: configYAML}, + StorageClient: storage, + DynamicClient: dynClient, + }, + } + result, err := h.Execute(context.Background(), params) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.Status != runnerlib.ResultSucceeded { + t.Errorf("expected Succeeded, got %q: steps=%v", result.Status, result.Steps) + } + if len(result.Steps) != 1 { + t.Fatalf("expected 1 step, got %d", len(result.Steps)) + } + if result.Steps[0].Status != runnerlib.ResultSucceeded { + t.Errorf("expected step Succeeded, got %q: %s", result.Steps[0].Status, result.Steps[0].Message) + } + if !findSubstring(result.Steps[0].Name, "restore-cp1") { + t.Errorf("expected step name to contain 'restore-cp1', got %q", result.Steps[0].Name) + } +} + +func TestMachineConfigRestoreHandler_DownloadFailure(t *testing.T) { + scheme := buildMachineConfigRestoreScheme() + ns := "seam-tenant-ccs-dev" + cr := makeMachineConfigRestoreCR(ns, "mcr-test", "20240101T000000Z", "my-bucket", nil) + dynClient := fake.NewSimpleDynamicClient(scheme, cr) + + configYAML := []byte("machine:\n network:\n hostname: cp1\n") + storage := &stubStorageRestore{downloadErr: io.ErrUnexpectedEOF} + + h := &machineConfigRestoreHandler{} + params := ExecuteParams{ + ClusterRef: "ccs-dev", + ExecuteClients: ExecuteClients{ + TalosClient: &stubTalosClientRestore{configYAML: configYAML}, + StorageClient: storage, + DynamicClient: dynClient, + }, + } + result, err := h.Execute(context.Background(), params) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + // Single node fails -> all nodes failed -> ExecutionFailure + if result.Status != runnerlib.ResultFailed { + t.Errorf("expected ResultFailed on download error, got %q", result.Status) + } + if result.FailureReason == nil || result.FailureReason.Category != runnerlib.ExecutionFailure { + t.Errorf("expected ExecutionFailure, got %v", result.FailureReason) + } +} + +func TestMachineConfigRestoreHandler_TargetNodesFilter(t *testing.T) { + scheme := buildMachineConfigRestoreScheme() + ns := "seam-tenant-ccs-dev" + // Only target "cp1" -- "worker1" should be skipped. + cr := makeMachineConfigRestoreCR(ns, "mcr-test", "20240101T000000Z", "my-bucket", []string{"cp1"}) + dynClient := fake.NewSimpleDynamicClient(scheme, cr) + + configYAML := []byte("machine:\n network:\n hostname: cp1\n") + storage := &stubStorageRestore{downloadData: configYAML} + + // TalosClient reports hostname "worker1" to verify filter excludes it. + // Use single-node test mode (TalosconfigPath=""), so "node" is the only nodeIP. + // Override: use configYAML with hostname cp1 so it passes the filter. + h := &machineConfigRestoreHandler{} + params := ExecuteParams{ + ClusterRef: "ccs-dev", + ExecuteClients: ExecuteClients{ + TalosClient: &stubTalosClientRestore{configYAML: configYAML}, + StorageClient: storage, + DynamicClient: dynClient, + }, + } + result, err := h.Execute(context.Background(), params) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + // cp1 is in targetNodes, so it should be restored successfully. + if result.Status != runnerlib.ResultSucceeded { + t.Errorf("expected Succeeded for cp1 in targetNodes, got %q: steps=%v", result.Status, result.Steps) + } +} diff --git a/internal/capability/stubs.go b/internal/capability/stubs.go index 11abdeb..f4315be 100644 --- a/internal/capability/stubs.go +++ b/internal/capability/stubs.go @@ -31,6 +31,7 @@ func RegisterAll(reg *Registry) { reg.Register(runnerlib.CapabilityHardeningApply, &hardeningApplyHandler{}) reg.Register(runnerlib.CapabilityClusterReset, &clusterResetHandler{}) reg.Register(runnerlib.CapabilityMachineConfigBackup, &machineConfigBackupHandler{}) + reg.Register(runnerlib.CapabilityMachineConfigRestore, &machineConfigRestoreHandler{}) // Wrapper capabilities -- pack delivery. reg.Register(runnerlib.CapabilityPackDeploy, &packDeployHandler{}) diff --git a/pkg/runnerlib/constants.go b/pkg/runnerlib/constants.go index 6aa5196..e240dbf 100644 --- a/pkg/runnerlib/constants.go +++ b/pkg/runnerlib/constants.go @@ -76,6 +76,11 @@ const ( // GetMachineConfig and uploads to S3 at {cluster}/machineconfigs/{TIMESTAMP}/{hostname}.yaml. // Triggered by TalosMachineConfigBackup CR. platform-schema.md §11. CapabilityMachineConfigBackup = "machineconfig-backup" + + // CapabilityMachineConfigRestore downloads a node machine config from S3 at + // {cluster}/machineconfigs/{backupTimestamp}/{hostname}.yaml and applies it + // via ApplyConfiguration. Non-fatal per node. platform-schema.md §11. + CapabilityMachineConfigRestore = "machineconfig-restore" ) // Compile mode capabilities — invoked by the conductor binary directly, not by conductor Jobs.