Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 209 additions & 0 deletions internal/capability/platform_machineconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
sigsyaml "sigs.k8s.io/yaml"

Expand Down Expand Up @@ -142,3 +143,211 @@ func sanitizeHostname(nodeIP string) string {
return r.Replace(nodeIP)
}

// machineConfigRestoreGVR is the GroupVersionResource for TalosMachineConfigRestore.
var machineConfigRestoreGVR = schema.GroupVersionResource{
Group: "platform.ontai.dev",
Version: "v1alpha1",
Resource: "talosmachineconfigrestores",
}

// machineConfigRestoreHandler implements the machineconfig-restore named capability.
// For each target node it downloads the config from S3 at
// {cluster}/machineconfigs/{backupTimestamp}/{hostname}.yaml, confirms the node is
// reachable via GetMachineConfig, then applies the restored config via ApplyConfiguration.
// Non-fatal per node -- failures are recorded in Steps and execution continues.
// platform-schema.md §11.
type machineConfigRestoreHandler struct{}

func (h *machineConfigRestoreHandler) Execute(ctx context.Context, params ExecuteParams) (runnerlib.OperationResultSpec, error) {
now := time.Now().UTC()

if params.TalosClient == nil || params.StorageClient == nil || params.DynamicClient == nil {
return failureResult(runnerlib.CapabilityMachineConfigRestore, now, runnerlib.ValidationFailure,
"machineconfig-restore requires TalosClient, StorageClient, and DynamicClient"), nil
}

// Read TalosMachineConfigRestore CR to get backupTimestamp, targetNodes, s3SourceBucket.
ns := tenantNamespace(params.ClusterRef)
crList, err := params.DynamicClient.Resource(machineConfigRestoreGVR).Namespace(ns).
List(ctx, metav1.ListOptions{})
if err != nil {
return failureResult(runnerlib.CapabilityMachineConfigRestore, now, runnerlib.ExecutionFailure,
fmt.Sprintf("list TalosMachineConfigRestore in %s: %v", ns, err)), nil
}

var backupTimestamp, s3Bucket string
var targetNodes []string
for _, item := range crList.Items {
ts, _, _ := unstructuredString(item.Object, "spec", "backupTimestamp")
if ts == "" {
continue
}
backupTimestamp = ts
s3Bucket, _, _ = unstructuredString(item.Object, "spec", "s3SourceBucket")
if rawNodes, ok, _ := unstructured.NestedStringSlice(item.Object, "spec", "targetNodes"); ok {
targetNodes = rawNodes
}
break
}

if backupTimestamp == "" {
return failureResult(runnerlib.CapabilityMachineConfigRestore, now, runnerlib.ValidationFailure,
fmt.Sprintf("no TalosMachineConfigRestore CR with backupTimestamp found in %s", ns)), nil
}
if s3Bucket == "" {
return failureResult(runnerlib.CapabilityMachineConfigRestore, now, runnerlib.ValidationFailure,
fmt.Sprintf("no TalosMachineConfigRestore CR with s3SourceBucket found in %s", ns)), nil
}

// Build target set for filtering (nil = all nodes).
targetSet := make(map[string]bool, len(targetNodes))
for _, n := range targetNodes {
targetSet[n] = true
}

// Determine node IPs. Production: read from talosconfig. Test: single sentinel.
var nodeIPs []string
if params.TalosconfigPath != "" {
ips, epErr := EndpointsFromTalosconfig(params.TalosconfigPath)
if epErr != nil {
return failureResult(runnerlib.CapabilityMachineConfigRestore, now, runnerlib.ExecutionFailure,
fmt.Sprintf("read endpoints from talosconfig: %v", epErr)), nil
}
nodeIPs = ips
} else {
nodeIPs = []string{"node"}
}

var steps []runnerlib.StepResult
var artifacts []runnerlib.ArtifactRef
var failedAny bool

for _, nodeIP := range nodeIPs {
nodeCtx := ctx
if nodeIP != "node" {
nodeCtx = NodeContext(ctx, nodeIP)
}

stepStart := time.Now().UTC()

// Confirm node reachability and resolve hostname for the S3 key.
currentConfig, err := params.TalosClient.GetMachineConfig(nodeCtx)
hostname := sanitizeHostname(nodeIP)
if err != nil {
steps = append(steps, runnerlib.StepResult{
Name: fmt.Sprintf("restore-%s", hostname),
Status: runnerlib.ResultFailed,
StartedAt: stepStart,
CompletedAt: time.Now().UTC(),
Message: fmt.Sprintf("GetMachineConfig on %s: node unreachable: %v", nodeIP, err),
})
failedAny = true
continue
}
// Extract hostname from current config so the S3 key matches the backup.
var cfg minimalMachineConfigHostname
if unmarshalErr := sigsyaml.Unmarshal(currentConfig, &cfg); unmarshalErr == nil && cfg.Machine.Network.Hostname != "" {
hostname = cfg.Machine.Network.Hostname
}

// Filter by targetNodes when set.
if len(targetSet) > 0 && !targetSet[hostname] {
continue
}

// Download backup config from S3.
s3Key := fmt.Sprintf("%s/machineconfigs/%s/%s.yaml", params.ClusterRef, backupTimestamp, hostname)
rc, dlErr := params.StorageClient.Download(nodeCtx, s3Bucket, s3Key)
if dlErr != nil {
steps = append(steps, runnerlib.StepResult{
Name: fmt.Sprintf("restore-%s", hostname),
Status: runnerlib.ResultFailed,
StartedAt: stepStart,
CompletedAt: time.Now().UTC(),
Message: fmt.Sprintf("download s3://%s/%s: %v", s3Bucket, s3Key, dlErr),
})
failedAny = true
continue
}
restoredConfig, readErr := readCloserToBytes(rc)
if readErr != nil {
steps = append(steps, runnerlib.StepResult{
Name: fmt.Sprintf("restore-%s", hostname),
Status: runnerlib.ResultFailed,
StartedAt: stepStart,
CompletedAt: time.Now().UTC(),
Message: fmt.Sprintf("read s3://%s/%s: %v", s3Bucket, s3Key, readErr),
})
failedAny = true
continue
}

// Apply restored config.
if applyErr := params.TalosClient.ApplyConfiguration(nodeCtx, restoredConfig, "no-reboot"); applyErr != nil {
steps = append(steps, runnerlib.StepResult{
Name: fmt.Sprintf("restore-%s", hostname),
Status: runnerlib.ResultFailed,
StartedAt: stepStart,
CompletedAt: time.Now().UTC(),
Message: fmt.Sprintf("ApplyConfiguration on %s: %v", nodeIP, applyErr),
})
failedAny = true
continue
}

// Wait for the node to stabilise after config apply.
if nodeIP != "node" {
if wErr := waitForNodeStable(ctx, params.TalosClient, nodeIP); wErr != nil {
steps = append(steps, runnerlib.StepResult{
Name: fmt.Sprintf("restore-%s", hostname),
Status: runnerlib.ResultFailed,
StartedAt: stepStart,
CompletedAt: time.Now().UTC(),
Message: fmt.Sprintf("waitForNodeStable on %s: %v", nodeIP, wErr),
})
failedAny = true
continue
}
}

steps = append(steps, runnerlib.StepResult{
Name: fmt.Sprintf("restore-%s", hostname),
Status: runnerlib.ResultSucceeded,
StartedAt: stepStart,
CompletedAt: time.Now().UTC(),
Message: fmt.Sprintf("machine config for %s restored from s3://%s/%s", hostname, s3Bucket, s3Key),
})
artifacts = append(artifacts, runnerlib.ArtifactRef{
Name: fmt.Sprintf("restored-machineconfig-%s", hostname),
Kind: "S3Object",
Reference: fmt.Sprintf("s3://%s/%s", s3Bucket, s3Key),
})
}

overallStatus := runnerlib.ResultSucceeded
if failedAny && len(artifacts) == 0 {
// All nodes failed.
return failureResult(runnerlib.CapabilityMachineConfigRestore, now, runnerlib.ExecutionFailure,
"machineconfig-restore: all target nodes failed"), nil
}

return runnerlib.OperationResultSpec{
Capability: runnerlib.CapabilityMachineConfigRestore,
Status: overallStatus,
StartedAt: now,
CompletedAt: time.Now().UTC(),
Artifacts: artifacts,
Steps: steps,
}, nil
}

// readCloserToBytes drains an io.ReadCloser into a byte slice and closes it.
func readCloserToBytes(rc interface{ Read([]byte) (int, error); Close() error }) ([]byte, error) {
defer rc.Close()
var buf bytes.Buffer
if _, err := buf.ReadFrom(rc); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

Loading
Loading