diff --git a/cmd/api/api/instances_test.go b/cmd/api/api/instances_test.go index 7a532936..8b04a330 100644 --- a/cmd/api/api/instances_test.go +++ b/cmd/api/api/instances_test.go @@ -61,7 +61,7 @@ func TestCreateInstance_AutoPullImage(t *testing.T) { t.Log("Creating instance without pre-pulling image (testing auto-pull)...") networkEnabled := false - resp, err := svc.CreateInstance(ctx(), oapi.CreateInstanceRequestObject{ + createReq := oapi.CreateInstanceRequestObject{ Body: &oapi.CreateInstanceRequest{ Name: "test-auto-pull", Image: "docker.io/library/alpine:latest", @@ -74,11 +74,29 @@ func TestCreateInstance_AutoPullImage(t *testing.T) { Enabled: &networkEnabled, }, }, - }) - require.NoError(t, err) + } + + deadline := time.Now().Add(integrationTestTimeout(15 * time.Second)) + var created oapi.CreateInstance201JSONResponse + for { + resp, err := svc.CreateInstance(ctx(), createReq) + require.NoError(t, err) + + if okResp, ok := resp.(oapi.CreateInstance201JSONResponse); ok { + created = okResp + break + } + + notReadyResp, ok := resp.(oapi.CreateInstance400JSONResponse) + require.True(t, ok, "expected create to either succeed or report image_not_ready while auto-pull finishes") + require.Equal(t, "image_not_ready", notReadyResp.Code) + + if time.Now().After(deadline) { + t.Fatalf("auto-pull did not finish before deadline: %s", notReadyResp.Message) + } + time.Sleep(500 * time.Millisecond) + } - created, ok := resp.(oapi.CreateInstance201JSONResponse) - require.True(t, ok, "expected 201 response — auto-pull should have fetched the image") t.Logf("Instance created via auto-pull: %s", created.Id) // Cleanup: delete the instance @@ -86,7 +104,7 @@ func TestCreateInstance_AutoPullImage(t *testing.T) { t.Log("Deleting instance...") deleteResp, err := svc.DeleteInstance(ctxWithInstance(svc, instanceID), oapi.DeleteInstanceRequestObject{Id: instanceID}) require.NoError(t, err) - _, ok = deleteResp.(oapi.DeleteInstance204Response) + _, ok := deleteResp.(oapi.DeleteInstance204Response) require.True(t, ok, "expected 204 response for delete") t.Log("Instance deleted successfully") } diff --git a/lib/hypervisor/cloudhypervisor/process.go b/lib/hypervisor/cloudhypervisor/process.go index 5fd4037e..4fe1e10d 100644 --- a/lib/hypervisor/cloudhypervisor/process.go +++ b/lib/hypervisor/cloudhypervisor/process.go @@ -62,7 +62,9 @@ func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, s } // 1. Start the Cloud Hypervisor process - pid, err := vmm.StartProcess(ctx, p, chVersion, socketPath) + processCtx, processSpan := hypervisor.StartProcessSpan(ctx, hypervisor.TypeCloudHypervisor) + pid, err := vmm.StartProcess(processCtx, p, chVersion, socketPath) + hypervisor.FinishTraceSpan(processSpan, err) if err != nil { return 0, nil, fmt.Errorf("start process: %w", err) } @@ -117,7 +119,9 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, // 1. Start the Cloud Hypervisor process processStartTime := time.Now() - pid, err := vmm.StartProcess(ctx, p, chVersion, socketPath) + processCtx, processSpan := hypervisor.StartProcessSpan(ctx, hypervisor.TypeCloudHypervisor) + pid, err := vmm.StartProcess(processCtx, p, chVersion, socketPath) + hypervisor.FinishTraceSpan(processSpan, err) if err != nil { return 0, nil, fmt.Errorf("start process: %w", err) } diff --git a/lib/hypervisor/firecracker/firecracker.go b/lib/hypervisor/firecracker/firecracker.go index ad7f5185..85e2a40c 100644 --- a/lib/hypervisor/firecracker/firecracker.go +++ b/lib/hypervisor/firecracker/firecracker.go @@ -15,6 +15,10 @@ import ( "time" "github.com/kernel/hypeman/lib/hypervisor" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) type apiError struct { @@ -273,10 +277,21 @@ func guestTargetBytesToMiB(bytes int64) int64 { } func (f *Firecracker) do(ctx context.Context, method, path string, reqBody any, expectedStatus ...int) ([]byte, error) { + attrs := hypervisor.TraceAttributesFromContext(ctx) + attrs = append(attrs, + attribute.String("operation", method+" "+path), + attribute.String("http.method", method), + attribute.String("http.route", path), + ) + ctx, span := otel.Tracer("hypeman/hypervisor/firecracker").Start(ctx, "hypervisor.http "+method+" "+path, trace.WithAttributes(attrs...)) + defer span.End() + var bodyReader io.Reader if reqBody != nil { data, err := json.Marshal(reqBody) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, fmt.Errorf("marshal request body: %w", err) } bodyReader = bytes.NewReader(data) @@ -284,6 +299,8 @@ func (f *Firecracker) do(ctx context.Context, method, path string, reqBody any, req, err := http.NewRequestWithContext(ctx, method, "http://localhost"+path, bodyReader) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, fmt.Errorf("create request: %w", err) } req.Header.Set("Accept", "application/json") @@ -293,17 +310,23 @@ func (f *Firecracker) do(ctx context.Context, method, path string, reqBody any, resp, err := f.client.Do(req) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, fmt.Errorf("request %s %s: %w", method, path, err) } defer resp.Body.Close() + span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode)) data, err := io.ReadAll(resp.Body) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, fmt.Errorf("read response body: %w", err) } for _, status := range expectedStatus { if resp.StatusCode == status { + span.SetStatus(codes.Ok, "") return data, nil } } @@ -311,9 +334,11 @@ func (f *Firecracker) do(ctx context.Context, method, path string, reqBody any, if len(data) > 0 { var apiErr apiError if err := json.Unmarshal(data, &apiErr); err == nil && apiErr.FaultMessage != "" { + span.SetStatus(codes.Error, apiErr.FaultMessage) return nil, fmt.Errorf("status %d: %s", resp.StatusCode, apiErr.FaultMessage) } } + span.SetStatus(codes.Error, resp.Status) return nil, fmt.Errorf("status %d: %s", resp.StatusCode, string(data)) } diff --git a/lib/hypervisor/firecracker/process.go b/lib/hypervisor/firecracker/process.go index 8b3c5d36..371e1f2e 100644 --- a/lib/hypervisor/firecracker/process.go +++ b/lib/hypervisor/firecracker/process.go @@ -62,7 +62,9 @@ func (s *Starter) GetVersion(p *paths.Paths) (string, error) { } func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, socketPath string, config hypervisor.VMConfig) (int, hypervisor.Hypervisor, error) { - pid, err := s.startProcess(ctx, p, version, socketPath) + processCtx, processSpan := hypervisor.StartProcessSpan(ctx, hypervisor.TypeFirecracker) + pid, err := s.startProcess(processCtx, p, version, socketPath) + hypervisor.FinishTraceSpan(processSpan, err) if err != nil { return 0, nil, fmt.Errorf("start firecracker process: %w", err) } @@ -92,7 +94,9 @@ func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, s } func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, socketPath string, snapshotPath string) (int, hypervisor.Hypervisor, error) { - pid, err := s.startProcess(ctx, p, version, socketPath) + processCtx, processSpan := hypervisor.StartProcessSpan(ctx, hypervisor.TypeFirecracker) + pid, err := s.startProcess(processCtx, p, version, socketPath) + hypervisor.FinishTraceSpan(processSpan, err) if err != nil { return 0, nil, fmt.Errorf("start firecracker process: %w", err) } diff --git a/lib/hypervisor/hypervisor.go b/lib/hypervisor/hypervisor.go index e3e1dc00..a3e5d01f 100644 --- a/lib/hypervisor/hypervisor.go +++ b/lib/hypervisor/hypervisor.go @@ -288,5 +288,9 @@ func NewClient(hvType Type, socketPath string) (Hypervisor, error) { if !ok { return nil, fmt.Errorf("no client factory registered for hypervisor type: %s", hvType) } - return factory(socketPath) + client, err := factory(socketPath) + if err != nil { + return nil, err + } + return WrapHypervisor(hvType, client), nil } diff --git a/lib/hypervisor/qemu/process.go b/lib/hypervisor/qemu/process.go index 74fa0916..e59dbb05 100644 --- a/lib/hypervisor/qemu/process.go +++ b/lib/hypervisor/qemu/process.go @@ -18,6 +18,10 @@ import ( "github.com/kernel/hypeman/lib/hypervisor" "github.com/kernel/hypeman/lib/logger" "github.com/kernel/hypeman/lib/paths" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "gvisor.dev/gvisor/pkg/cleanup" ) @@ -128,15 +132,25 @@ func buildQMPArgs(socketPath string) []string { // The cleanup function must be called on error; call cleanup.Release() on success. func (s *Starter) startQEMUProcess(ctx context.Context, p *paths.Paths, version string, socketPath string, args []string) (int, *QEMU, *cleanup.Cleanup, error) { log := logger.FromContext(ctx) + processAttrs := hypervisor.TraceAttributesFromContext(ctx) + processAttrs = append(processAttrs, + attribute.String("operation", "start_process"), + attribute.String("hypervisor", string(hypervisor.TypeQEMU)), + ) + processCtx, processSpan := otel.Tracer("hypeman/hypervisor/qemu").Start(ctx, "hypervisor.start_process", trace.WithAttributes(processAttrs...)) + defer processSpan.End() // Get binary path binaryPath, err := s.GetBinaryPath(p, version) if err != nil { + processSpan.RecordError(err) + processSpan.SetStatus(codes.Error, err.Error()) return 0, nil, nil, fmt.Errorf("get binary: %w", err) } // Check if socket is already in use if isSocketInUse(socketPath) { + processSpan.SetStatus(codes.Error, "socket already in use") return 0, nil, nil, fmt.Errorf("socket already in use, QEMU may be running at %s", socketPath) } @@ -155,6 +169,8 @@ func (s *Starter) startQEMUProcess(ctx context.Context, p *paths.Paths, version instanceDir := filepath.Dir(socketPath) logsDir := filepath.Join(instanceDir, "logs") if err := os.MkdirAll(logsDir, 0755); err != nil { + processSpan.RecordError(err) + processSpan.SetStatus(codes.Error, err.Error()) return 0, nil, nil, fmt.Errorf("create logs directory: %w", err) } @@ -164,6 +180,8 @@ func (s *Starter) startQEMUProcess(ctx context.Context, p *paths.Paths, version 0644, ) if err != nil { + processSpan.RecordError(err) + processSpan.SetStatus(codes.Error, err.Error()) return 0, nil, nil, fmt.Errorf("create vmm log: %w", err) } defer vmmLogFile.Close() @@ -173,11 +191,13 @@ func (s *Starter) startQEMUProcess(ctx context.Context, p *paths.Paths, version processStartTime := time.Now() if err := cmd.Start(); err != nil { + processSpan.RecordError(err) + processSpan.SetStatus(codes.Error, err.Error()) return 0, nil, nil, fmt.Errorf("start qemu: %w", err) } pid := cmd.Process.Pid - log.DebugContext(ctx, "QEMU process started", "pid", pid, "duration_ms", time.Since(processStartTime).Milliseconds()) + log.DebugContext(processCtx, "QEMU process started", "pid", pid, "duration_ms", time.Since(processStartTime).Milliseconds()) // Setup cleanup to kill the process if subsequent steps fail cu := cleanup.Make(func() { @@ -187,10 +207,12 @@ func (s *Starter) startQEMUProcess(ctx context.Context, p *paths.Paths, version // Wait for socket to be ready socketWaitStart := time.Now() if err := waitForSocket(socketPath, socketWaitTimeout); err != nil { + processSpan.RecordError(err) + processSpan.SetStatus(codes.Error, err.Error()) cu.Clean() return 0, nil, nil, appendVMMLog(err, logsDir) } - log.DebugContext(ctx, "QMP socket ready", "duration_ms", time.Since(socketWaitStart).Milliseconds()) + log.DebugContext(processCtx, "QMP socket ready", "duration_ms", time.Since(socketWaitStart).Milliseconds()) // Create QMP client. The socket file may exist before QEMU can actually // accept monitor connections, so retry briefly on transient dial failures. @@ -202,12 +224,15 @@ func (s *Starter) startQEMUProcess(ctx context.Context, p *paths.Paths, version break } if time.Now().After(clientDeadline) { + processSpan.RecordError(err) + processSpan.SetStatus(codes.Error, err.Error()) cu.Clean() return 0, nil, nil, appendVMMLog(fmt.Errorf("create client: %w", err), logsDir) } time.Sleep(socketPollInterval) } + processSpan.SetStatus(codes.Ok, "") return pid, hv, &cu, nil } diff --git a/lib/hypervisor/tracing.go b/lib/hypervisor/tracing.go new file mode 100644 index 00000000..63908822 --- /dev/null +++ b/lib/hypervisor/tracing.go @@ -0,0 +1,286 @@ +package hypervisor + +import ( + "context" + "time" + + "github.com/kernel/hypeman/lib/paths" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +type traceAttrsKey struct{} + +type traceWrapped interface { + isTraceWrapped() +} + +type tracingHypervisor struct { + hvType Type + next Hypervisor + tracer trace.Tracer + attrs []attribute.KeyValue +} + +type tracingVMStarter struct { + hvType Type + next VMStarter + tracer trace.Tracer +} + +func WithTraceAttributes(ctx context.Context, attrs ...attribute.KeyValue) context.Context { + if len(attrs) == 0 { + return ctx + } + + existing, _ := ctx.Value(traceAttrsKey{}).([]attribute.KeyValue) + merged := make([]attribute.KeyValue, 0, len(existing)+len(attrs)) + merged = append(merged, existing...) + merged = append(merged, attrs...) + return context.WithValue(ctx, traceAttrsKey{}, merged) +} + +func TraceAttributesFromContext(ctx context.Context) []attribute.KeyValue { + existing, _ := ctx.Value(traceAttrsKey{}).([]attribute.KeyValue) + if len(existing) == 0 { + return nil + } + out := make([]attribute.KeyValue, len(existing)) + copy(out, existing) + return out +} + +func WrapHypervisor(hvType Type, hv Hypervisor) Hypervisor { + if hv == nil { + return nil + } + if _, ok := hv.(traceWrapped); ok { + return hv + } + + return &tracingHypervisor{ + hvType: hvType, + next: hv, + tracer: otel.Tracer(traceSubsystemForType(hvType)), + attrs: []attribute.KeyValue{ + attribute.String("hypervisor", string(hvType)), + }, + } +} + +func WrapVMStarter(hvType Type, starter VMStarter) VMStarter { + if starter == nil { + return nil + } + if _, ok := starter.(traceWrapped); ok { + return starter + } + return &tracingVMStarter{ + hvType: hvType, + next: starter, + tracer: otel.Tracer(traceSubsystemForType(hvType)), + } +} + +func traceSubsystemForType(hvType Type) string { + switch hvType { + case TypeCloudHypervisor: + return "hypeman/hypervisor/cloudhypervisor" + case TypeFirecracker: + return "hypeman/hypervisor/firecracker" + case TypeQEMU: + return "hypeman/hypervisor/qemu" + case TypeVZ: + return "hypeman/hypervisor/vz" + default: + return "hypeman/hypervisor" + } +} + +func StartImplementationSpan(ctx context.Context, hvType Type, name string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { + baseAttrs := []attribute.KeyValue{ + attribute.String("hypervisor", string(hvType)), + } + baseAttrs = append(baseAttrs, attrs...) + return startTraceSpan(ctx, otel.Tracer(traceSubsystemForType(hvType)), name, baseAttrs...) +} + +func StartProcessSpan(ctx context.Context, hvType Type) (context.Context, trace.Span) { + return StartImplementationSpan(ctx, hvType, "hypervisor.start_process", attribute.String("operation", "start_process")) +} + +func FinishTraceSpan(span trace.Span, err error) { + finishTraceSpan(span, err) +} + +func startTraceSpan(ctx context.Context, tracer trace.Tracer, name string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { + allAttrs := TraceAttributesFromContext(ctx) + if len(attrs) > 0 { + allAttrs = append(allAttrs, attrs...) + } + if len(allAttrs) == 0 { + return tracer.Start(ctx, name) + } + return tracer.Start(ctx, name, trace.WithAttributes(allAttrs...)) +} + +func finishTraceSpan(span trace.Span, err error) { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } else { + span.SetStatus(codes.Ok, "") + } + span.End() +} + +func (h *tracingHypervisor) isTraceWrapped() {} + +func (s *tracingVMStarter) isTraceWrapped() {} + +func (h *tracingHypervisor) Capabilities() Capabilities { + return h.next.Capabilities() +} + +func (h *tracingHypervisor) spanAttrs(attrs ...attribute.KeyValue) []attribute.KeyValue { + out := make([]attribute.KeyValue, 0, len(h.attrs)+len(attrs)) + out = append(out, h.attrs...) + out = append(out, attrs...) + return out +} + +func (h *tracingHypervisor) DeleteVM(ctx context.Context) (err error) { + ctx, span := startTraceSpan(ctx, h.tracer, "hypervisor.delete_vm", h.spanAttrs(attribute.String("operation", "delete_vm"))...) + defer func() { finishTraceSpan(span, err) }() + return h.next.DeleteVM(ctx) +} + +func (h *tracingHypervisor) Shutdown(ctx context.Context) (err error) { + ctx, span := startTraceSpan(ctx, h.tracer, "hypervisor.shutdown", h.spanAttrs(attribute.String("operation", "shutdown"))...) + defer func() { finishTraceSpan(span, err) }() + return h.next.Shutdown(ctx) +} + +func (h *tracingHypervisor) GetVMInfo(ctx context.Context) (_ *VMInfo, err error) { + ctx, span := startTraceSpan(ctx, h.tracer, "hypervisor.get_vm_info", h.spanAttrs(attribute.String("operation", "get_vm_info"))...) + defer func() { finishTraceSpan(span, err) }() + return h.next.GetVMInfo(ctx) +} + +func (h *tracingHypervisor) Pause(ctx context.Context) (err error) { + ctx, span := startTraceSpan(ctx, h.tracer, "hypervisor.pause", h.spanAttrs(attribute.String("operation", "pause"))...) + defer func() { finishTraceSpan(span, err) }() + return h.next.Pause(ctx) +} + +func (h *tracingHypervisor) Resume(ctx context.Context) (err error) { + ctx, span := startTraceSpan(ctx, h.tracer, "hypervisor.resume", h.spanAttrs(attribute.String("operation", "resume"))...) + defer func() { finishTraceSpan(span, err) }() + return h.next.Resume(ctx) +} + +func (h *tracingHypervisor) Snapshot(ctx context.Context, destPath string) (err error) { + ctx, span := startTraceSpan(ctx, h.tracer, "hypervisor.snapshot", + h.spanAttrs( + attribute.String("operation", "snapshot"), + )..., + ) + defer func() { finishTraceSpan(span, err) }() + return h.next.Snapshot(ctx, destPath) +} + +func (h *tracingHypervisor) ResizeMemory(ctx context.Context, bytes int64) (err error) { + ctx, span := startTraceSpan(ctx, h.tracer, "hypervisor.resize_memory", + h.spanAttrs( + attribute.String("operation", "resize_memory"), + attribute.Int64("memory_bytes", bytes), + )..., + ) + defer func() { finishTraceSpan(span, err) }() + return h.next.ResizeMemory(ctx, bytes) +} + +func (h *tracingHypervisor) ResizeMemoryAndWait(ctx context.Context, bytes int64, timeout time.Duration) (err error) { + ctx, span := startTraceSpan(ctx, h.tracer, "hypervisor.resize_memory_and_wait", + h.spanAttrs( + attribute.String("operation", "resize_memory_and_wait"), + attribute.Int64("memory_bytes", bytes), + attribute.Int64("timeout_seconds", int64(timeout.Seconds())), + )..., + ) + defer func() { finishTraceSpan(span, err) }() + return h.next.ResizeMemoryAndWait(ctx, bytes, timeout) +} + +func (h *tracingHypervisor) SetTargetGuestMemoryBytes(ctx context.Context, bytes int64) (err error) { + ctx, span := startTraceSpan(ctx, h.tracer, "hypervisor.set_target_guest_memory_bytes", + h.spanAttrs( + attribute.String("operation", "set_target_guest_memory_bytes"), + attribute.Int64("guest_memory_bytes", bytes), + )..., + ) + defer func() { finishTraceSpan(span, err) }() + return h.next.SetTargetGuestMemoryBytes(ctx, bytes) +} + +func (h *tracingHypervisor) GetTargetGuestMemoryBytes(ctx context.Context) (_ int64, err error) { + ctx, span := startTraceSpan(ctx, h.tracer, "hypervisor.get_target_guest_memory_bytes", + h.spanAttrs(attribute.String("operation", "get_target_guest_memory_bytes"))..., + ) + defer func() { finishTraceSpan(span, err) }() + return h.next.GetTargetGuestMemoryBytes(ctx) +} + +func (s *tracingVMStarter) SocketName() string { + return s.next.SocketName() +} + +func (s *tracingVMStarter) GetBinaryPath(p *paths.Paths, version string) (string, error) { + return s.next.GetBinaryPath(p, version) +} + +func (s *tracingVMStarter) GetVersion(p *paths.Paths) (string, error) { + return s.next.GetVersion(p) +} + +func (s *tracingVMStarter) StartVM(ctx context.Context, p *paths.Paths, version string, socketPath string, config VMConfig) (pid int, hv Hypervisor, err error) { + ctx, span := startTraceSpan(ctx, s.tracer, "hypervisor.start_vm", + attribute.String("hypervisor", string(s.hvType)), + attribute.String("operation", "start_vm"), + ) + defer func() { + if err == nil && hv != nil { + hv = WrapHypervisor(s.hvType, hv) + } + finishTraceSpan(span, err) + }() + pid, hv, err = s.next.StartVM(ctx, p, version, socketPath, config) + return pid, hv, err +} + +func (s *tracingVMStarter) RestoreVM(ctx context.Context, p *paths.Paths, version string, socketPath string, snapshotPath string) (pid int, hv Hypervisor, err error) { + ctx, span := startTraceSpan(ctx, s.tracer, "hypervisor.restore_vm", + attribute.String("hypervisor", string(s.hvType)), + attribute.String("operation", "restore_vm"), + ) + defer func() { + if err == nil && hv != nil { + hv = WrapHypervisor(s.hvType, hv) + } + finishTraceSpan(span, err) + }() + pid, hv, err = s.next.RestoreVM(ctx, p, version, socketPath, snapshotPath) + return pid, hv, err +} + +func (s *tracingVMStarter) PrepareFork(ctx context.Context, req ForkPrepareRequest) (_ ForkPrepareResult, err error) { + ctx, span := startTraceSpan(ctx, s.tracer, "hypervisor.prepare_fork", + attribute.String("hypervisor", string(s.hvType)), + attribute.String("operation", "prepare_fork"), + attribute.Bool("has_snapshot_config_path", req.SnapshotConfigPath != ""), + ) + defer func() { finishTraceSpan(span, err) }() + return s.next.PrepareFork(ctx, req) +} diff --git a/lib/hypervisor/tracing_test.go b/lib/hypervisor/tracing_test.go new file mode 100644 index 00000000..d203af4a --- /dev/null +++ b/lib/hypervisor/tracing_test.go @@ -0,0 +1,134 @@ +package hypervisor + +import ( + "context" + "testing" + "time" + + "github.com/kernel/hypeman/lib/paths" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +type fakeHypervisor struct{} + +func (fakeHypervisor) DeleteVM(context.Context) error { return nil } +func (fakeHypervisor) Shutdown(context.Context) error { return nil } +func (fakeHypervisor) GetVMInfo(context.Context) (*VMInfo, error) { + return &VMInfo{State: StateRunning}, nil +} +func (fakeHypervisor) Pause(context.Context) error { return nil } +func (fakeHypervisor) Resume(context.Context) error { return nil } +func (fakeHypervisor) Snapshot(context.Context, string) error { return nil } +func (fakeHypervisor) ResizeMemory(context.Context, int64) error { return nil } +func (fakeHypervisor) ResizeMemoryAndWait(context.Context, int64, time.Duration) error { + return nil +} +func (fakeHypervisor) SetTargetGuestMemoryBytes(context.Context, int64) error { return nil } +func (fakeHypervisor) GetTargetGuestMemoryBytes(context.Context) (int64, error) { + return 0, nil +} +func (fakeHypervisor) Capabilities() Capabilities { return Capabilities{} } + +type fakeStarter struct { + returned Hypervisor +} + +func (s fakeStarter) SocketName() string { return "fake.sock" } +func (s fakeStarter) GetBinaryPath(*paths.Paths, string) (string, error) { + return "", nil +} +func (s fakeStarter) GetVersion(*paths.Paths) (string, error) { return "test", nil } +func (s fakeStarter) StartVM(context.Context, *paths.Paths, string, string, VMConfig) (int, Hypervisor, error) { + return 42, s.returned, nil +} +func (s fakeStarter) RestoreVM(context.Context, *paths.Paths, string, string, string) (int, Hypervisor, error) { + return 43, s.returned, nil +} +func (s fakeStarter) PrepareFork(context.Context, ForkPrepareRequest) (ForkPrepareResult, error) { + return ForkPrepareResult{}, nil +} + +func TestWrapHypervisorCreatesChildSpan(t *testing.T) { + recorder, provider := newTestTracerProvider(t) + ctx, parent := otel.Tracer("test").Start(context.Background(), "parent") + ctx = WithTraceAttributes(ctx, + attribute.String("instance_id", "inst_123"), + attribute.String("hypervisor", string(TypeQEMU)), + ) + + hv := WrapHypervisor(TypeQEMU, fakeHypervisor{}) + require.NoError(t, hv.Resume(ctx)) + parent.End() + + child := findSpanByName(t, recorder.Ended(), "hypervisor.resume") + require.Equal(t, parent.SpanContext().SpanID(), child.Parent().SpanID()) + assert.Equal(t, codes.Ok, child.Status().Code) + + attrs := attrsToMap(child.Attributes()) + assert.Equal(t, "inst_123", attrs["instance_id"]) + assert.Equal(t, string(TypeQEMU), attrs["hypervisor"]) + assert.Equal(t, "resume", attrs["operation"]) + + _ = provider +} + +func TestWrapVMStarterWrapsReturnedHypervisor(t *testing.T) { + recorder, _ := newTestTracerProvider(t) + ctx, parent := otel.Tracer("test").Start(context.Background(), "parent") + ctx = WithTraceAttributes(ctx, attribute.String("instance_id", "inst_456")) + + starter := WrapVMStarter(TypeCloudHypervisor, fakeStarter{returned: fakeHypervisor{}}) + _, hv, err := starter.StartVM(ctx, nil, "test", "/tmp/socket", VMConfig{}) + require.NoError(t, err) + + require.NoError(t, hv.Resume(ctx)) + parent.End() + + startSpan := findSpanByName(t, recorder.Ended(), "hypervisor.start_vm") + resumeSpan := findSpanByName(t, recorder.Ended(), "hypervisor.resume") + require.Equal(t, parent.SpanContext().SpanID(), startSpan.Parent().SpanID()) + require.Equal(t, parent.SpanContext().TraceID(), resumeSpan.SpanContext().TraceID()) + + attrs := attrsToMap(resumeSpan.Attributes()) + assert.Equal(t, "inst_456", attrs["instance_id"]) + assert.Equal(t, string(TypeCloudHypervisor), attrs["hypervisor"]) +} + +func newTestTracerProvider(t *testing.T) (*tracetest.SpanRecorder, *sdktrace.TracerProvider) { + t.Helper() + + recorder := tracetest.NewSpanRecorder() + provider := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(recorder)) + previous := otel.GetTracerProvider() + otel.SetTracerProvider(provider) + t.Cleanup(func() { + otel.SetTracerProvider(previous) + _ = provider.Shutdown(context.Background()) + }) + return recorder, provider +} + +func findSpanByName(t *testing.T, spans []sdktrace.ReadOnlySpan, name string) sdktrace.ReadOnlySpan { + t.Helper() + for _, span := range spans { + if span.Name() == name { + return span + } + } + t.Fatalf("span %q not found", name) + return nil +} + +func attrsToMap(attrs []attribute.KeyValue) map[string]string { + out := make(map[string]string, len(attrs)) + for _, attr := range attrs { + out[string(attr.Key)] = attr.Value.Emit() + } + return out +} diff --git a/lib/hypervisor/vz/client.go b/lib/hypervisor/vz/client.go index 3d371331..d1331c56 100644 --- a/lib/hypervisor/vz/client.go +++ b/lib/hypervisor/vz/client.go @@ -14,6 +14,10 @@ import ( "time" "github.com/kernel/hypeman/lib/hypervisor" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) // Client implements hypervisor.Hypervisor via HTTP to the vz-shim process. @@ -98,8 +102,19 @@ func (c *Client) doPut(ctx context.Context, path string, body io.Reader) error { } func (c *Client) doPutWithClient(ctx context.Context, client *http.Client, path string, body io.Reader) error { + attrs := hypervisor.TraceAttributesFromContext(ctx) + attrs = append(attrs, + attribute.String("operation", http.MethodPut+" "+path), + attribute.String("http.method", http.MethodPut), + attribute.String("http.route", path), + ) + ctx, span := otel.Tracer("hypeman/hypervisor/vz").Start(ctx, "hypervisor.http PUT "+path, trace.WithAttributes(attrs...)) + defer span.End() + req, err := http.NewRequestWithContext(ctx, http.MethodPut, "http://vz-shim"+path, body) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return err } if body != nil { @@ -107,28 +122,58 @@ func (c *Client) doPutWithClient(ctx context.Context, client *http.Client, path } resp, err := client.Do(req) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return err } defer resp.Body.Close() + span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode)) if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { bodyBytes, _ := io.ReadAll(resp.Body) + span.SetStatus(codes.Error, resp.Status) return fmt.Errorf("%s failed with status %d: %s", path, resp.StatusCode, string(bodyBytes)) } + span.SetStatus(codes.Ok, "") return nil } // doGet sends a GET request to the shim and returns the response body. func (c *Client) doGet(ctx context.Context, path string) ([]byte, error) { + attrs := hypervisor.TraceAttributesFromContext(ctx) + attrs = append(attrs, + attribute.String("operation", http.MethodGet+" "+path), + attribute.String("http.method", http.MethodGet), + attribute.String("http.route", path), + ) + ctx, span := otel.Tracer("hypeman/hypervisor/vz").Start(ctx, "hypervisor.http GET "+path, trace.WithAttributes(attrs...)) + defer span.End() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://vz-shim"+path, nil) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, err } resp, err := c.httpClient.Do(req) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, err } defer resp.Body.Close() - return io.ReadAll(resp.Body) + span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode)) + body, err := io.ReadAll(resp.Body) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + if resp.StatusCode >= http.StatusBadRequest { + span.SetStatus(codes.Error, resp.Status) + } else { + span.SetStatus(codes.Ok, "") + } + return body, nil } func (c *Client) DeleteVM(ctx context.Context) error { diff --git a/lib/instances/create.go b/lib/instances/create.go index e54a8898..a620a24f 100644 --- a/lib/instances/create.go +++ b/lib/instances/create.go @@ -20,7 +20,6 @@ import ( "github.com/kernel/hypeman/lib/volumes" "github.com/nrednav/cuid2" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" "gvisor.dev/gvisor/pkg/cleanup" ) @@ -73,17 +72,15 @@ func generateVsockCID(instanceID string) int64 { func (m *manager) createInstance( ctx context.Context, req CreateInstanceRequest, -) (*Instance, error) { +) (_ *Instance, retErr error) { start := time.Now() log := logger.FromContext(ctx) log.InfoContext(ctx, "creating instance", "name", req.Name, "image", req.Image, "vcpus", req.Vcpus) - // Start tracing span if tracer is available - if m.metrics != nil && m.metrics.tracer != nil { - var span trace.Span - ctx, span = m.metrics.tracer.Start(ctx, "CreateInstance") - defer span.End() - } + ctx, span := m.startLifecycleSpan(ctx, "instances.create", + attribute.String("operation", "create"), + ) + defer func() { finishInstancesSpan(span, retErr) }() // 1. Validate request if err := validateCreateRequest(&req); err != nil { @@ -93,36 +90,44 @@ func (m *manager) createInstance( // 2. Validate image exists and is ready; auto-pull if not found log.DebugContext(ctx, "validating image", "image", req.Image) - imageInfo, err := m.imageManager.GetImage(ctx, req.Image) + imageCtx, imageSpanEnd := m.startLifecycleStep(ctx, "resolve_image", + attribute.String("operation", "resolve_image"), + ) + imageInfo, err := m.imageManager.GetImage(imageCtx, req.Image) if err != nil { if err == images.ErrNotFound { // Auto-pull: image not found locally, kick off the pull in the // background and wait up to 5 seconds for it to complete. log.InfoContext(ctx, "image not found locally, auto-pulling", "image", req.Image) - _, pullErr := m.imageManager.CreateImage(ctx, images.CreateImageRequest{Name: req.Image}) + _, pullErr := m.imageManager.CreateImage(imageCtx, images.CreateImageRequest{Name: req.Image}) if pullErr != nil { + imageSpanEnd(pullErr) log.ErrorContext(ctx, "failed to auto-pull image", "image", req.Image, "error", pullErr) return nil, fmt.Errorf("auto-pull image %s: %w", req.Image, pullErr) } // Wait with a short timeout — if the pull doesn't finish in time // we return an error but let it continue in the background. - pullCtx, pullCancel := context.WithTimeout(ctx, 5*time.Second) + pullCtx, pullCancel := context.WithTimeout(imageCtx, 5*time.Second) defer pullCancel() if waitErr := m.imageManager.WaitForReady(pullCtx, req.Image); waitErr != nil { + imageSpanEnd(waitErr) log.InfoContext(ctx, "image pull not ready within timeout, pull continues in background", "image", req.Image, "error", waitErr) return nil, fmt.Errorf("%w: image %s is being pulled, please try again shortly", ErrImageNotReady, req.Image) } // Re-fetch after successful pull - imageInfo, err = m.imageManager.GetImage(ctx, req.Image) + imageInfo, err = m.imageManager.GetImage(imageCtx, req.Image) if err != nil { + imageSpanEnd(err) log.ErrorContext(ctx, "failed to get image after auto-pull", "image", req.Image, "error", err) return nil, fmt.Errorf("get image after auto-pull: %w", err) } } else { + imageSpanEnd(err) log.ErrorContext(ctx, "failed to get image", "image", req.Image, "error", err) return nil, fmt.Errorf("get image: %w", err) } } + imageSpanEnd(nil) if imageInfo.Status != images.StatusReady { log.ErrorContext(ctx, "image not ready", "image", req.Image, "status", imageInfo.Status) @@ -131,6 +136,7 @@ func (m *manager) createInstance( // 3. Generate instance ID (CUID2 for secure, collision-resistant IDs) id := cuid2.Generate() + ctx = enrichInstancesTrace(ctx, attribute.String("instance_id", id)) log.DebugContext(ctx, "generated instance ID", "instance_id", id) // 4. Generate vsock configuration @@ -209,12 +215,7 @@ func (m *manager) createInstance( // Enrich logger and trace span with hypervisor type log = log.With("hypervisor", string(hvType)) ctx = logger.AddToContext(ctx, log) - if m.metrics != nil && m.metrics.tracer != nil { - span := trace.SpanFromContext(ctx) - if span.IsRecording() { - span.SetAttributes(attribute.String("hypervisor", string(hvType))) - } - } + ctx = enrichInstancesTrace(ctx, attribute.String("hypervisor", string(hvType))) starter, err := m.getVMStarter(hvType) if err != nil { @@ -364,13 +365,20 @@ func (m *manager) createInstance( if networkName != "" { log.DebugContext(ctx, "allocating network", "instance_id", id, "network", networkName, "download_bps", stored.NetworkBandwidthDownload, "upload_bps", stored.NetworkBandwidthUpload) - netConfig, err = m.networkManager.CreateAllocation(ctx, network.AllocateRequest{ + networkCtx, networkSpanEnd := m.startLifecycleStep(ctx, "allocate_network", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "allocate_network"), + attribute.Bool("network_enabled", true), + ) + netConfig, err = m.networkManager.CreateAllocation(networkCtx, network.AllocateRequest{ InstanceID: id, InstanceName: req.Name, DownloadBps: stored.NetworkBandwidthDownload, UploadBps: stored.NetworkBandwidthUpload, UploadCeilBps: stored.NetworkBandwidthUpload * int64(m.networkManager.GetUploadBurstMultiplier()), }) + networkSpanEnd(err) if err != nil { log.ErrorContext(ctx, "failed to allocate network", "instance_id", id, "network", networkName, "error", err) return nil, fmt.Errorf("allocate network: %w", err) @@ -442,10 +450,17 @@ func (m *manager) createInstance( }) } log.DebugContext(ctx, "creating config disk", "instance_id", id) - if err := m.createConfigDisk(ctx, inst, imageInfo, netConfig, proxyGuestConfig); err != nil { + configDiskCtx, configDiskSpanEnd := m.startLifecycleStep(ctx, "create_config_disk", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "create_config_disk"), + ) + if err := m.createConfigDisk(configDiskCtx, inst, imageInfo, netConfig, proxyGuestConfig); err != nil { + configDiskSpanEnd(err) log.ErrorContext(ctx, "failed to create config disk", "instance_id", id, "error", err) return nil, fmt.Errorf("create config disk: %w", err) } + configDiskSpanEnd(nil) // 17. Record boot start time before launching the VM so marker hydration // can safely ignore stale sentinels from prior runs. @@ -465,10 +480,17 @@ func (m *manager) createInstance( // 19. Start VMM and boot VM log.InfoContext(ctx, "starting VMM and booting VM", "instance_id", id) - if err := m.startAndBootVM(ctx, stored, imageInfo, netConfig); err != nil { + startVMCtx, startVMSpanEnd := m.startLifecycleStep(ctx, "start_vm", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "start_vm"), + ) + if err := m.startAndBootVM(startVMCtx, stored, imageInfo, netConfig); err != nil { + startVMSpanEnd(err) log.ErrorContext(ctx, "failed to start and boot VM", "instance_id", id, "error", err) return nil, err } + startVMSpanEnd(nil) // 20. Persist runtime metadata updates after VM boot. meta = &metadata{StoredMetadata: *stored} diff --git a/lib/instances/egress_proxy_integration_test.go b/lib/instances/egress_proxy_integration_test.go index bee8793c..e2b9e125 100644 --- a/lib/instances/egress_proxy_integration_test.go +++ b/lib/instances/egress_proxy_integration_test.go @@ -107,7 +107,7 @@ func TestEgressProxyRewritesHTTPSHeaders(t *testing.T) { } }) - _, err = waitForInstanceState(ctx, manager, inst.Id, StateRunning, 5*time.Second) + _, err = waitForInstanceState(ctx, manager, inst.Id, StateRunning, integrationTestTimeout(5*time.Second)) if err != nil { logs, logErr := collectLogs(ctx, manager, inst.Id, 200) if logErr != nil { diff --git a/lib/instances/exec_test.go b/lib/instances/exec_test.go index 06a07e22..b1e46a78 100644 --- a/lib/instances/exec_test.go +++ b/lib/instances/exec_test.go @@ -139,8 +139,9 @@ func TestExecConcurrent(t *testing.T) { manager.DeleteInstance(ctx, inst.Id) }) - // Wait for exec-agent to be ready (retry here is OK - we're just waiting for startup) - err = waitForExecAgent(ctx, manager, inst.Id, 15*time.Second) + // This test exercises concurrent exec behavior, not boot-speed budgets. + // Give the guest a little more headroom on busy Linux CI runners. + err = waitForExecAgent(ctx, manager, inst.Id, 30*time.Second) require.NoError(t, err, "exec-agent should be ready") // Verify exec-agent works with a simple command first diff --git a/lib/instances/fork.go b/lib/instances/fork.go index 531af12c..2c75c94e 100644 --- a/lib/instances/fork.go +++ b/lib/instances/fork.go @@ -17,15 +17,21 @@ import ( "github.com/kernel/hypeman/lib/logger" "github.com/kernel/hypeman/lib/network" "github.com/nrednav/cuid2" + "go.opentelemetry.io/otel/attribute" "gvisor.dev/gvisor/pkg/cleanup" ) // forkInstance creates a new instance by cloning a stopped or standby source // instance. It returns the newly created fork and the requested final target // state; callers apply remaining target state transitions outside the source lock. -func (m *manager) forkInstance(ctx context.Context, id string, req ForkInstanceRequest) (*Instance, State, error) { +func (m *manager) forkInstance(ctx context.Context, id string, req ForkInstanceRequest) (_ *Instance, _ State, retErr error) { log := logger.FromContext(ctx) log.InfoContext(ctx, "forking instance", "source_instance_id", id, "fork_name", req.Name) + ctx, span := m.startLifecycleSpan(ctx, "instances.fork", + attribute.String("instance_id", id), + attribute.String("operation", "fork"), + ) + defer func() { finishInstancesSpan(span, retErr) }() if err := validateForkRequest(req); err != nil { return nil, "", err diff --git a/lib/instances/manager.go b/lib/instances/manager.go index 44d9b162..bf8da680 100644 --- a/lib/instances/manager.go +++ b/lib/instances/manager.go @@ -124,7 +124,7 @@ func NewManager(p *paths.Paths, imageManager images.Manager, systemManager syste // Initialize VM starters from platform-specific init functions vmStarters := make(map[hypervisor.Type]hypervisor.VMStarter, len(platformStarters)) for hvType, starter := range platformStarters { - vmStarters[hvType] = starter + vmStarters[hvType] = hypervisor.WrapVMStarter(hvType, starter) } m := &manager{ diff --git a/lib/instances/restore.go b/lib/instances/restore.go index d9b97be6..09110a11 100644 --- a/lib/instances/restore.go +++ b/lib/instances/restore.go @@ -14,7 +14,7 @@ import ( "github.com/kernel/hypeman/lib/hypervisor" "github.com/kernel/hypeman/lib/logger" "github.com/kernel/hypeman/lib/network" - "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/attribute" ) // RestoreInstance restores an instance from standby @@ -23,17 +23,16 @@ func (m *manager) restoreInstance( ctx context.Context, id string, -) (*Instance, error) { +) (_ *Instance, retErr error) { start := time.Now() log := logger.FromContext(ctx) log.InfoContext(ctx, "restoring instance from standby", "instance_id", id) - // Start tracing span if tracer is available - if m.metrics != nil && m.metrics.tracer != nil { - var span trace.Span - ctx, span = m.metrics.tracer.Start(ctx, "RestoreInstance") - defer span.End() - } + ctx, span := m.startLifecycleSpan(ctx, "instances.restore", + attribute.String("instance_id", id), + attribute.String("operation", "restore"), + ) + defer func() { finishInstancesSpan(span, retErr) }() // 1. Load instance meta, err := m.loadMetadata(id) @@ -44,6 +43,7 @@ func (m *manager) restoreInstance( inst := m.toInstance(ctx, meta) stored := &meta.StoredMetadata + ctx = enrichInstancesTrace(ctx, attribute.String("hypervisor", string(stored.HypervisorType))) log.DebugContext(ctx, "loaded instance", "instance_id", id, "state", inst.State, "has_snapshot", inst.HasSnapshot) // 2. Validate state @@ -69,14 +69,13 @@ func (m *manager) restoreInstance( // 3. Get snapshot directory snapshotDir := m.paths.InstanceSnapshotLatest(id) - var prepareSnapshotSpan trace.Span - if m.metrics != nil && m.metrics.tracer != nil { - ctx, prepareSnapshotSpan = m.metrics.tracer.Start(ctx, "PrepareSnapshotMemory") - } - err = m.ensureSnapshotMemoryReady(ctx, snapshotDir, m.snapshotJobKeyForInstance(id), stored.HypervisorType) - if prepareSnapshotSpan != nil { - prepareSnapshotSpan.End() - } + prepareSnapshotCtx, prepareSnapshotSpanEnd := m.startLifecycleStep(ctx, "prepare_snapshot_memory", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "prepare_snapshot_memory"), + ) + err = m.ensureSnapshotMemoryReady(prepareSnapshotCtx, snapshotDir, m.snapshotJobKeyForInstance(id), stored.HypervisorType) + prepareSnapshotSpanEnd(err) if err != nil { return nil, fmt.Errorf("prepare standby snapshot memory: %w", err) } @@ -109,17 +108,19 @@ func (m *manager) restoreInstance( // 4. Recreate or allocate network if network enabled if stored.NetworkEnabled { - var networkSpan trace.Span - if m.metrics != nil && m.metrics.tracer != nil { - ctx, networkSpan = m.metrics.tracer.Start(ctx, "RestoreNetwork") - } + networkCtx, networkSpanEnd := m.startLifecycleStep(ctx, "restore_network", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "restore_network"), + attribute.Bool("network_enabled", true), + ) // If IP/MAC is empty (forked standby flow), allocate a fresh identity and // patch the copied snapshot config before restore. if stored.IP == "" || stored.MAC == "" { log.InfoContext(ctx, "allocating fresh network identity for standby restore", "instance_id", id, "network", "default", "download_bps", stored.NetworkBandwidthDownload, "upload_bps", stored.NetworkBandwidthUpload) - netConfig, err := m.networkManager.CreateAllocation(ctx, network.AllocateRequest{ + netConfig, err := m.networkManager.CreateAllocation(networkCtx, network.AllocateRequest{ InstanceID: id, InstanceName: stored.Name, DownloadBps: stored.NetworkBandwidthDownload, @@ -127,9 +128,7 @@ func (m *manager) restoreInstance( UploadCeilBps: stored.NetworkBandwidthUpload * int64(m.networkManager.GetUploadBurstMultiplier()), }) if err != nil { - if networkSpan != nil { - networkSpan.End() - } + networkSpanEnd(err) log.ErrorContext(ctx, "failed to allocate network", "instance_id", id, "error", err) return nil, fmt.Errorf("allocate network: %w", err) } @@ -147,7 +146,7 @@ func (m *manager) restoreInstance( stored.IP = netConfig.IP stored.MAC = netConfig.MAC - if _, err := starter.PrepareFork(ctx, hypervisor.ForkPrepareRequest{ + if _, err := starter.PrepareFork(networkCtx, hypervisor.ForkPrepareRequest{ SnapshotConfigPath: m.paths.InstanceSnapshotConfig(id), VsockCID: stored.VsockCID, VsockSocket: stored.VsockSocket, @@ -158,9 +157,7 @@ func (m *manager) restoreInstance( Netmask: netConfig.Netmask, }, }); err != nil { - if networkSpan != nil { - networkSpan.End() - } + networkSpanEnd(err) if errors.Is(err, hypervisor.ErrNotSupported) { log.ErrorContext(ctx, "forked standby network rewrite not supported for hypervisor", "instance_id", id, "hypervisor", stored.HypervisorType) releaseNetwork() @@ -173,17 +170,13 @@ func (m *manager) restoreInstance( } else { log.InfoContext(ctx, "recreating network for restore", "instance_id", id, "network", "default", "download_bps", stored.NetworkBandwidthDownload, "upload_bps", stored.NetworkBandwidthUpload) - if err := m.networkManager.RecreateAllocation(ctx, id, stored.NetworkBandwidthDownload, stored.NetworkBandwidthUpload); err != nil { - if networkSpan != nil { - networkSpan.End() - } + if err := m.networkManager.RecreateAllocation(networkCtx, id, stored.NetworkBandwidthDownload, stored.NetworkBandwidthUpload); err != nil { + networkSpanEnd(err) log.ErrorContext(ctx, "failed to recreate network", "instance_id", id, "error", err) return nil, fmt.Errorf("recreate network: %w", err) } } - if networkSpan != nil { - networkSpan.End() - } + networkSpanEnd(nil) } // 4b. Register proxy/enforcement once network identity is active. @@ -224,15 +217,14 @@ func (m *manager) restoreInstance( } // 5. Transition: Standby → Paused (start hypervisor + restore) - var restoreSpan trace.Span - if m.metrics != nil && m.metrics.tracer != nil { - ctx, restoreSpan = m.metrics.tracer.Start(ctx, "RestoreFromSnapshot") - } + restoreCtx, restoreSpanEnd := m.startLifecycleStep(ctx, "restore_from_snapshot", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "restore_from_snapshot"), + ) log.InfoContext(ctx, "restoring from snapshot", "instance_id", id, "snapshot_dir", snapshotDir, "hypervisor", stored.HypervisorType) - pid, hv, err := m.restoreFromSnapshot(ctx, stored, snapshotDir) - if restoreSpan != nil { - restoreSpan.End() - } + pid, hv, err := m.restoreFromSnapshot(restoreCtx, stored, snapshotDir) + restoreSpanEnd(err) if err != nil { log.ErrorContext(ctx, "failed to restore from snapshot", "instance_id", id, "error", err) // Cleanup network on failure @@ -244,35 +236,39 @@ func (m *manager) restoreInstance( stored.HypervisorPID = &pid // 6. Transition: Paused → Running (resume) - var resumeSpan trace.Span - if m.metrics != nil && m.metrics.tracer != nil { - ctx, resumeSpan = m.metrics.tracer.Start(ctx, "ResumeVM") - } + resumeCtx, resumeSpanEnd := m.startLifecycleStep(ctx, "resume_vm", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "resume_vm"), + ) log.InfoContext(ctx, "resuming VM", "instance_id", id) - if err := hv.Resume(ctx); err != nil { - if resumeSpan != nil { - resumeSpan.End() - } + if err := hv.Resume(resumeCtx); err != nil { + resumeSpanEnd(err) log.ErrorContext(ctx, "failed to resume VM", "instance_id", id, "error", err) // Cleanup on failure hv.Shutdown(ctx) releaseNetwork() return nil, fmt.Errorf("resume vm failed: %w", err) } - if resumeSpan != nil { - resumeSpan.End() - } + resumeSpanEnd(nil) // Forked standby restores may allocate a fresh identity while the guest memory snapshot // still has the source VM's old IP configuration. Reconfigure guest networking after // resume so host ingress to the new private IP works reliably. if allocatedNet != nil && !stored.SkipGuestAgent { - if err := reconfigureGuestNetwork(ctx, stored, allocatedNet); err != nil { + reconfigureCtx, reconfigureSpanEnd := m.startLifecycleStep(ctx, "reconfigure_guest_network", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "reconfigure_guest_network"), + ) + if err := reconfigureGuestNetwork(reconfigureCtx, stored, allocatedNet); err != nil { + reconfigureSpanEnd(err) log.ErrorContext(ctx, "failed to configure guest network after restore", "instance_id", id, "error", err) _ = hv.Shutdown(ctx) releaseNetwork() return nil, fmt.Errorf("configure guest network after restore: %w", err) } + reconfigureSpanEnd(nil) } // 8. Delete snapshot after successful restore unless the hypervisor is keeping it diff --git a/lib/instances/snapshot.go b/lib/instances/snapshot.go index 5cf1f6c6..be982f74 100644 --- a/lib/instances/snapshot.go +++ b/lib/instances/snapshot.go @@ -14,6 +14,7 @@ import ( snapshotstore "github.com/kernel/hypeman/lib/snapshot" "github.com/kernel/hypeman/lib/tags" "github.com/nrednav/cuid2" + "go.opentelemetry.io/otel/attribute" "gvisor.dev/gvisor/pkg/cleanup" ) @@ -43,9 +44,15 @@ func (m *manager) getSnapshot(ctx context.Context, snapshotID string) (*Snapshot return snapshot, nil } -func (m *manager) createSnapshot(ctx context.Context, id string, req CreateSnapshotRequest) (*Snapshot, error) { +func (m *manager) createSnapshot(ctx context.Context, id string, req CreateSnapshotRequest) (_ *Snapshot, retErr error) { log := logger.FromContext(ctx) log.InfoContext(ctx, "creating snapshot", "instance_id", id, "kind", req.Kind, "name", req.Name) + ctx, span := m.startLifecycleSpan(ctx, "instances.create_snapshot", + attribute.String("instance_id", id), + attribute.String("operation", "create_snapshot"), + attribute.String("snapshot_kind", string(req.Kind)), + ) + defer func() { finishInstancesSpan(span, retErr) }() if err := validateCreateSnapshotRequest(req); err != nil { return nil, err @@ -229,8 +236,14 @@ func (m *manager) deleteSnapshot(ctx context.Context, snapshotID string) error { return nil } -func (m *manager) restoreSnapshot(ctx context.Context, id string, snapshotID string, req RestoreSnapshotRequest) (*Instance, error) { +func (m *manager) restoreSnapshot(ctx context.Context, id string, snapshotID string, req RestoreSnapshotRequest) (_ *Instance, retErr error) { log := logger.FromContext(ctx) + ctx, span := m.startLifecycleSpan(ctx, "instances.restore_snapshot", + attribute.String("instance_id", id), + attribute.String("operation", "restore_snapshot"), + attribute.String("snapshot_id", snapshotID), + ) + defer func() { finishInstancesSpan(span, retErr) }() rec, err := m.loadSnapshotRecord(snapshotID) if err != nil { return nil, err diff --git a/lib/instances/snapshot_compression.go b/lib/instances/snapshot_compression.go index 680cd96a..5c930002 100644 --- a/lib/instances/snapshot_compression.go +++ b/lib/instances/snapshot_compression.go @@ -19,6 +19,8 @@ import ( snapshotstore "github.com/kernel/hypeman/lib/snapshot" "github.com/klauspost/compress/zstd" "github.com/pierrec/lz4/v4" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) const ( @@ -311,6 +313,7 @@ func (m *manager) startCompressionJob(ctx context.Context, target compressionTar done: make(chan struct{}), target: target, } + parentSpanContext := trace.SpanContextFromContext(ctx) m.compressionJobs[target.Key] = job m.compressionMu.Unlock() @@ -319,11 +322,28 @@ func (m *manager) startCompressionJob(ctx context.Context, target compressionTar result := snapshotCompressionResultSuccess var uncompressedSize int64 var compressedSize int64 + var spanErr error metricsCtx := context.Background() + spanOptions := []trace.SpanStartOption{ + trace.WithNewRoot(), + trace.WithAttributes( + attribute.String("operation", "snapshot_compression"), + attribute.String("owner_id", target.OwnerID), + attribute.String("snapshot_id", target.SnapshotID), + attribute.String("hypervisor", string(target.HypervisorType)), + attribute.String("source", string(target.Source)), + attribute.String("algorithm", string(target.Policy.Algorithm)), + ), + } + if parentSpanContext.IsValid() { + spanOptions = append(spanOptions, trace.WithLinks(trace.Link{SpanContext: parentSpanContext})) + } + metricsCtx, span := m.tracerOrDefault().Start(metricsCtx, "instances.snapshot_compression", spanOptions...) log := logger.FromContext(ctx) defer func() { m.recordSnapshotCompressionJob(metricsCtx, target, result, start, uncompressedSize, compressedSize) + finishInstancesSpan(span, spanErr) m.compressionMu.Lock() delete(m.compressionJobs, target.Key) m.compressionMu.Unlock() @@ -351,6 +371,7 @@ func (m *manager) startCompressionJob(ctx context.Context, target compressionTar if err != nil { if errors.Is(err, context.Canceled) { result = snapshotCompressionResultCanceled + spanErr = err if target.SnapshotID != "" { if err := m.updateSnapshotCompressionMetadata(target.SnapshotID, snapshotstore.SnapshotCompressionStateNone, "", nil, nil, nil); err != nil { log.ErrorContext(jobCtx, "failed to update snapshot compression metadata", "snapshot_id", target.SnapshotID, "snapshot_dir", target.SnapshotDir, "state", snapshotstore.SnapshotCompressionStateNone, "error", err) @@ -359,6 +380,7 @@ func (m *manager) startCompressionJob(ctx context.Context, target compressionTar return } result = snapshotCompressionResultFailed + spanErr = err if target.SnapshotID != "" { if metadataErr := m.updateSnapshotCompressionMetadata(target.SnapshotID, snapshotstore.SnapshotCompressionStateError, err.Error(), &target.Policy, nil, nil); metadataErr != nil { log.ErrorContext(jobCtx, "failed to update snapshot compression metadata", "snapshot_id", target.SnapshotID, "snapshot_dir", target.SnapshotDir, "state", snapshotstore.SnapshotCompressionStateError, "error", metadataErr) diff --git a/lib/instances/standby.go b/lib/instances/standby.go index 94edc1ea..193ed16a 100644 --- a/lib/instances/standby.go +++ b/lib/instances/standby.go @@ -13,7 +13,7 @@ import ( "github.com/kernel/hypeman/lib/logger" "github.com/kernel/hypeman/lib/network" snapshotstore "github.com/kernel/hypeman/lib/snapshot" - "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/attribute" ) // StandbyInstance puts an instance in standby state @@ -24,17 +24,16 @@ func (m *manager) standbyInstance( id string, req StandbyInstanceRequest, skipCompression bool, -) (*Instance, error) { +) (_ *Instance, retErr error) { start := time.Now() log := logger.FromContext(ctx) log.InfoContext(ctx, "putting instance in standby", "instance_id", id) - // Start tracing span if tracer is available - if m.metrics != nil && m.metrics.tracer != nil { - var span trace.Span - ctx, span = m.metrics.tracer.Start(ctx, "StandbyInstance") - defer span.End() - } + ctx, span := m.startLifecycleSpan(ctx, "instances.standby", + attribute.String("instance_id", id), + attribute.String("operation", "standby"), + ) + defer func() { finishInstancesSpan(span, retErr) }() // 1. Load instance meta, err := m.loadMetadata(id) @@ -45,6 +44,7 @@ func (m *manager) standbyInstance( inst := m.toInstance(ctx, meta) stored := &meta.StoredMetadata + ctx = enrichInstancesTrace(ctx, attribute.String("hypervisor", string(stored.HypervisorType))) log.DebugContext(ctx, "loaded instance", "instance_id", id, "state", inst.State) // 2. Validate state transition (must be Running to start standby flow) @@ -99,10 +99,17 @@ func (m *manager) standbyInstance( // 6. Transition: Running → Paused log.DebugContext(ctx, "pausing VM", "instance_id", id) - if err := hv.Pause(ctx); err != nil { + pauseCtx, pauseSpanEnd := m.startLifecycleStep(ctx, "pause_vm", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "pause_vm"), + ) + if err := hv.Pause(pauseCtx); err != nil { + pauseSpanEnd(err) log.ErrorContext(ctx, "failed to pause VM", "instance_id", id, "error", err) return nil, fmt.Errorf("pause vm failed: %w", err) } + pauseSpanEnd(nil) // 7. Create snapshot snapshotDir := m.paths.InstanceSnapshotLatest(id) @@ -120,7 +127,14 @@ func (m *manager) standbyInstance( } } log.DebugContext(ctx, "creating snapshot", "instance_id", id, "snapshot_dir", snapshotDir) - if err := createSnapshot(ctx, hv, snapshotDir, reuseSnapshotBase); err != nil { + snapshotCtx, snapshotSpanEnd := m.startLifecycleStep(ctx, "create_snapshot", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "create_snapshot"), + attribute.Bool("reuse_snapshot_base", reuseSnapshotBase), + ) + if err := createSnapshot(snapshotCtx, hv, snapshotDir, reuseSnapshotBase); err != nil { + snapshotSpanEnd(err) // Snapshot failed - try to resume VM log.ErrorContext(ctx, "snapshot failed, attempting to resume VM", "instance_id", id, "error", err) if resumeErr := hv.Resume(ctx); resumeErr != nil { @@ -133,12 +147,21 @@ func (m *manager) standbyInstance( } return nil, fmt.Errorf("create snapshot: %w", err) } + snapshotSpanEnd(nil) // 8. Stop VMM gracefully (snapshot is complete) log.DebugContext(ctx, "shutting down hypervisor", "instance_id", id) - if err := m.shutdownHypervisor(ctx, &inst); err != nil { + shutdownCtx, shutdownSpanEnd := m.startLifecycleStep(ctx, "shutdown_hypervisor", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "shutdown_hypervisor"), + ) + if err := m.shutdownHypervisor(shutdownCtx, &inst); err != nil { + shutdownSpanEnd(err) // Log but continue - snapshot was created successfully log.WarnContext(ctx, "failed to shutdown hypervisor gracefully, snapshot still valid", "instance_id", id, "error", err) + } else { + shutdownSpanEnd(nil) } // Firecracker vsock sockets can persist across standby/restore if the process @@ -156,9 +179,17 @@ func (m *manager) standbyInstance( if inst.NetworkEnabled { m.unregisterEgressProxyInstance(ctx, id) log.DebugContext(ctx, "releasing network", "instance_id", id, "network", "default") - if err := m.networkManager.ReleaseAllocation(ctx, networkAlloc); err != nil { + releaseNetworkCtx, releaseNetworkSpanEnd := m.startLifecycleStep(ctx, "release_network", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "release_network"), + ) + if err := m.networkManager.ReleaseAllocation(releaseNetworkCtx, networkAlloc); err != nil { + releaseNetworkSpanEnd(err) // Log error but continue - snapshot was created successfully log.WarnContext(ctx, "failed to release network, continuing with standby", "instance_id", id, "error", err) + } else { + releaseNetworkSpanEnd(nil) } } @@ -183,7 +214,12 @@ func (m *manager) standbyInstance( finalInst := m.toInstance(ctx, meta) if compressionPolicy != nil { - m.startCompressionJob(ctx, compressionTarget{ + compressionCtx, compressionSpanEnd := m.startLifecycleStep(ctx, "enqueue_snapshot_compression", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "enqueue_snapshot_compression"), + ) + m.startCompressionJob(compressionCtx, compressionTarget{ Key: m.snapshotJobKeyForInstance(stored.Id), OwnerID: stored.Id, SnapshotDir: snapshotDir, @@ -191,6 +227,7 @@ func (m *manager) standbyInstance( Source: snapshotCompressionSourceStandby, Policy: *compressionPolicy, }) + compressionSpanEnd(nil) } log.InfoContext(ctx, "instance put in standby successfully", "instance_id", id, "state", finalInst.State) diff --git a/lib/instances/start.go b/lib/instances/start.go index 0345e395..bf0e385f 100644 --- a/lib/instances/start.go +++ b/lib/instances/start.go @@ -9,7 +9,7 @@ import ( "github.com/kernel/hypeman/lib/egressproxy" "github.com/kernel/hypeman/lib/logger" "github.com/kernel/hypeman/lib/network" - "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/attribute" "gvisor.dev/gvisor/pkg/cleanup" ) @@ -19,17 +19,16 @@ func (m *manager) startInstance( ctx context.Context, id string, req StartInstanceRequest, -) (*Instance, error) { +) (_ *Instance, retErr error) { start := time.Now() log := logger.FromContext(ctx) log.InfoContext(ctx, "starting instance", "instance_id", id) - // Start tracing span if tracer is available - if m.metrics != nil && m.metrics.tracer != nil { - var span trace.Span - ctx, span = m.metrics.tracer.Start(ctx, "StartInstance") - defer span.End() - } + ctx, span := m.startLifecycleSpan(ctx, "instances.start", + attribute.String("instance_id", id), + attribute.String("operation", "start"), + ) + defer func() { finishInstancesSpan(span, retErr) }() // 1. Load instance meta, err := m.loadMetadata(id) @@ -40,6 +39,7 @@ func (m *manager) startInstance( inst := m.toInstance(ctx, meta) stored := &meta.StoredMetadata + ctx = enrichInstancesTrace(ctx, attribute.String("hypervisor", string(stored.HypervisorType))) log.DebugContext(ctx, "loaded instance", "instance_id", id, "state", inst.State) // 2. Validate state (must be Stopped to start) @@ -72,7 +72,13 @@ func (m *manager) startInstance( // 3. Get image info (needed for buildHypervisorConfig) log.DebugContext(ctx, "getting image info", "instance_id", id, "image", stored.Image) - imageInfo, err := m.imageManager.GetImage(ctx, stored.Image) + imageCtx, imageSpanEnd := m.startLifecycleStep(ctx, "resolve_image", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "resolve_image"), + ) + imageInfo, err := m.imageManager.GetImage(imageCtx, stored.Image) + imageSpanEnd(err) if err != nil { log.ErrorContext(ctx, "failed to get image", "instance_id", id, "image", stored.Image, "error", err) return nil, fmt.Errorf("get image: %w", err) @@ -86,10 +92,17 @@ func (m *manager) startInstance( var netConfig *network.NetworkConfig if stored.NetworkEnabled { log.DebugContext(ctx, "allocating network for start", "instance_id", id, "network", "default") - netConfig, err = m.networkManager.CreateAllocation(ctx, network.AllocateRequest{ + networkCtx, networkSpanEnd := m.startLifecycleStep(ctx, "allocate_network", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "allocate_network"), + attribute.Bool("network_enabled", true), + ) + netConfig, err = m.networkManager.CreateAllocation(networkCtx, network.AllocateRequest{ InstanceID: id, InstanceName: stored.Name, }) + networkSpanEnd(err) if err != nil { log.ErrorContext(ctx, "failed to allocate network", "instance_id", id, "error", err) return nil, fmt.Errorf("allocate network: %w", err) @@ -143,10 +156,17 @@ func (m *manager) startInstance( // 5. Regenerate config disk with new network configuration instForConfig := &Instance{StoredMetadata: *stored} log.DebugContext(ctx, "regenerating config disk", "instance_id", id) - if err := m.createConfigDisk(ctx, instForConfig, imageInfo, netConfig, proxyGuestConfig); err != nil { + configDiskCtx, configDiskSpanEnd := m.startLifecycleStep(ctx, "create_config_disk", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "create_config_disk"), + ) + if err := m.createConfigDisk(configDiskCtx, instForConfig, imageInfo, netConfig, proxyGuestConfig); err != nil { + configDiskSpanEnd(err) log.ErrorContext(ctx, "failed to create config disk", "instance_id", id, "error", err) return nil, fmt.Errorf("create config disk: %w", err) } + configDiskSpanEnd(nil) if err := m.archiveAppLogForBoot(id); err != nil { log.WarnContext(ctx, "failed to archive app log before start", "instance_id", id, "error", err) @@ -157,10 +177,17 @@ func (m *manager) startInstance( stored.StartedAt = &bootStart log.InfoContext(ctx, "starting hypervisor and booting VM", "instance_id", id) - if err := m.startAndBootVM(ctx, stored, imageInfo, netConfig); err != nil { + startVMCtx, startVMSpanEnd := m.startLifecycleStep(ctx, "start_vm", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "start_vm"), + ) + if err := m.startAndBootVM(startVMCtx, stored, imageInfo, netConfig); err != nil { + startVMSpanEnd(err) log.ErrorContext(ctx, "failed to start and boot VM", "instance_id", id, "error", err) return nil, err } + startVMSpanEnd(nil) // Success - release cleanup stack (prevent cleanup) cu.Release() diff --git a/lib/instances/stop.go b/lib/instances/stop.go index 812c6db6..133e7156 100644 --- a/lib/instances/stop.go +++ b/lib/instances/stop.go @@ -2,6 +2,7 @@ package instances import ( "context" + "errors" "fmt" "os" "path/filepath" @@ -13,7 +14,7 @@ import ( "github.com/kernel/hypeman/lib/hypervisor" "github.com/kernel/hypeman/lib/logger" "github.com/kernel/hypeman/lib/network" - "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/attribute" ) // DefaultStopTimeout is the default grace period for graceful shutdown (seconds). @@ -21,6 +22,8 @@ const DefaultStopTimeout = 5 const shutdownRPCDeadline = 1500 * time.Millisecond const shutdownFailureFallbackWait = 500 * time.Millisecond +var errGracefulShutdownFailed = errors.New("graceful guest shutdown did not complete") + // resolveStopTimeout returns the configured stop timeout in seconds, // falling back to the package default when unset/invalid. func resolveStopTimeout(stored *StoredMetadata) int { @@ -139,17 +142,16 @@ func (m *manager) forceKillHypervisorProcess(ctx context.Context, inst *Instance func (m *manager) stopInstance( ctx context.Context, id string, -) (*Instance, error) { +) (_ *Instance, retErr error) { start := time.Now() log := logger.FromContext(ctx) log.InfoContext(ctx, "stopping instance", "instance_id", id) - // Start tracing span if tracer is available - if m.metrics != nil && m.metrics.tracer != nil { - var span trace.Span - ctx, span = m.metrics.tracer.Start(ctx, "StopInstance") - defer span.End() - } + ctx, span := m.startLifecycleSpan(ctx, "instances.stop", + attribute.String("instance_id", id), + attribute.String("operation", "stop"), + ) + defer func() { finishInstancesSpan(span, retErr) }() // 1. Load instance meta, err := m.loadMetadata(id) @@ -160,6 +162,7 @@ func (m *manager) stopInstance( inst := m.toInstance(ctx, meta) stored := &meta.StoredMetadata + ctx = enrichInstancesTrace(ctx, attribute.String("hypervisor", string(stored.HypervisorType))) log.DebugContext(ctx, "loaded instance", "instance_id", id, "state", inst.State) // 2. Validate state transition (must be active to stop) @@ -181,21 +184,46 @@ func (m *manager) stopInstance( // 4. Graceful shutdown: send signal to guest init via Shutdown RPC, // then wait for VM to power off cleanly. Fall back to hypervisor shutdown on timeout. stopTimeout := resolveStopTimeout(stored) - gracefulShutdown := m.tryGracefulGuestShutdown(ctx, &inst, stopTimeout) + gracefulCtx, gracefulSpanEnd := m.startLifecycleStep(ctx, "graceful_guest_shutdown", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "graceful_guest_shutdown"), + ) + gracefulShutdown := m.tryGracefulGuestShutdown(gracefulCtx, &inst, stopTimeout) + if gracefulShutdown { + gracefulSpanEnd(nil) + } else { + gracefulSpanEnd(errGracefulShutdownFailed) + } // 5. Fallback hypervisor shutdown if guest graceful shutdown didn't work if !gracefulShutdown { log.DebugContext(ctx, "shutting down hypervisor (fallback)", "instance_id", id) - if err := m.shutdownHypervisor(ctx, &inst); err != nil { + shutdownCtx, shutdownSpanEnd := m.startLifecycleStep(ctx, "shutdown_hypervisor", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "shutdown_hypervisor"), + ) + if err := m.shutdownHypervisor(shutdownCtx, &inst); err != nil { + shutdownSpanEnd(err) // Continue to final SIGKILL fallback if graceful shutdown API fails. log.WarnContext(ctx, "failed to shutdown hypervisor", "instance_id", id, "error", err) + } else { + shutdownSpanEnd(nil) } // Final fallback: force-kill the process if it's still alive. - if err := m.forceKillHypervisorProcess(ctx, &inst); err != nil { + killCtx, killSpanEnd := m.startLifecycleStep(ctx, "force_kill_hypervisor", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "force_kill_hypervisor"), + ) + if err := m.forceKillHypervisorProcess(killCtx, &inst); err != nil { + killSpanEnd(err) log.ErrorContext(ctx, "failed to force-kill hypervisor process", "instance_id", id, "error", err) return nil, err } + killSpanEnd(nil) } // 6. Release network allocation (delete TAP device) @@ -204,9 +232,17 @@ func (m *manager) stopInstance( } if inst.NetworkEnabled && networkAlloc != nil { log.DebugContext(ctx, "releasing network", "instance_id", id, "network", "default") - if err := m.networkManager.ReleaseAllocation(ctx, networkAlloc); err != nil { + releaseNetworkCtx, releaseNetworkSpanEnd := m.startLifecycleStep(ctx, "release_network", + attribute.String("instance_id", id), + attribute.String("hypervisor", string(stored.HypervisorType)), + attribute.String("operation", "release_network"), + ) + if err := m.networkManager.ReleaseAllocation(releaseNetworkCtx, networkAlloc); err != nil { + releaseNetworkSpanEnd(err) // Log error but continue log.WarnContext(ctx, "failed to release network, continuing", "instance_id", id, "error", err) + } else { + releaseNetworkSpanEnd(nil) } } diff --git a/lib/instances/tracing.go b/lib/instances/tracing.go new file mode 100644 index 00000000..c9b1e8df --- /dev/null +++ b/lib/instances/tracing.go @@ -0,0 +1,64 @@ +package instances + +import ( + "context" + + "github.com/kernel/hypeman/lib/hypervisor" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +func (m *manager) tracerOrDefault() trace.Tracer { + if m != nil && m.tracer != nil { + return m.tracer + } + return otel.Tracer("hypeman/instances") +} + +func (m *manager) startLifecycleSpan(ctx context.Context, name string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { + ctx = hypervisor.WithTraceAttributes(ctx, propagatedTraceAttributes(attrs...)...) + return startInstancesSpan(ctx, m.tracerOrDefault(), name, attrs...) +} + +func (m *manager) startLifecycleStep(ctx context.Context, name string, attrs ...attribute.KeyValue) (context.Context, func(error)) { + ctx, span := startInstancesSpan(ctx, m.tracerOrDefault(), name, attrs...) + return ctx, func(err error) { + finishInstancesSpan(span, err) + } +} + +func enrichInstancesTrace(ctx context.Context, attrs ...attribute.KeyValue) context.Context { + ctx = hypervisor.WithTraceAttributes(ctx, attrs...) + trace.SpanFromContext(ctx).SetAttributes(attrs...) + return ctx +} + +func startInstancesSpan(ctx context.Context, tracer trace.Tracer, name string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { + if len(attrs) == 0 { + return tracer.Start(ctx, name) + } + return tracer.Start(ctx, name, trace.WithAttributes(attrs...)) +} + +func propagatedTraceAttributes(attrs ...attribute.KeyValue) []attribute.KeyValue { + out := make([]attribute.KeyValue, 0, len(attrs)) + for _, attr := range attrs { + switch string(attr.Key) { + case "instance_id", "hypervisor", "snapshot_id": + out = append(out, attr) + } + } + return out +} + +func finishInstancesSpan(span trace.Span, err error) { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } else { + span.SetStatus(codes.Ok, "") + } + span.End() +} diff --git a/lib/providers/providers.go b/lib/providers/providers.go index 566e0077..4c112915 100644 --- a/lib/providers/providers.go +++ b/lib/providers/providers.go @@ -125,7 +125,7 @@ func ProvideInstanceManager(p *paths.Paths, cfg *config.Config, imageManager ima } meter := otel.GetMeterProvider().Meter("hypeman") - tracer := otel.GetTracerProvider().Tracer("hypeman") + tracer := otel.GetTracerProvider().Tracer("hypeman/instances") defaultHypervisor := hypervisor.Type(cfg.Hypervisor.Default) snapshotDefaults := snapshotDefaultsFromConfig(cfg) memoryPolicy := guestmemory.Policy{ diff --git a/lib/vmm/client.go b/lib/vmm/client.go index b4697955..70c1e849 100644 --- a/lib/vmm/client.go +++ b/lib/vmm/client.go @@ -11,9 +11,13 @@ import ( "syscall" "time" + "github.com/kernel/hypeman/lib/hypervisor" "github.com/kernel/hypeman/lib/paths" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" ) const cloudHypervisorSocketReadyTimeout = 10 * time.Second @@ -26,12 +30,33 @@ type VMM struct { // metricsRoundTripper wraps an http.RoundTripper to record metrics type metricsRoundTripper struct { - base http.RoundTripper + base http.RoundTripper + tracer trace.Tracer } func (m *metricsRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { start := time.Now() + attrs := hypervisor.TraceAttributesFromContext(req.Context()) + attrs = append(attrs, + attribute.String("operation", req.Method+" "+req.URL.Path), + attribute.String("http.method", req.Method), + attribute.String("http.route", req.URL.Path), + ) + ctx, span := m.tracer.Start(req.Context(), "hypervisor.http "+req.Method+" "+req.URL.Path, trace.WithAttributes(attrs...)) + req = req.WithContext(ctx) resp, err := m.base.RoundTrip(req) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } else { + span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode)) + if resp.StatusCode >= 400 { + span.SetStatus(codes.Error, resp.Status) + } else { + span.SetStatus(codes.Ok, "") + } + } + span.End() // Record metrics using global VMMMetrics if VMMMetrics != nil { @@ -66,7 +91,7 @@ func NewVMM(socketPath string) (*VMM, error) { } httpClient := &http.Client{ - Transport: &metricsRoundTripper{base: transport}, + Transport: &metricsRoundTripper{base: transport, tracer: otel.Tracer("hypeman/vmm")}, Timeout: 120 * time.Second, } diff --git a/lib/vmm/client_tracing_test.go b/lib/vmm/client_tracing_test.go new file mode 100644 index 00000000..35f8bb11 --- /dev/null +++ b/lib/vmm/client_tracing_test.go @@ -0,0 +1,74 @@ +package vmm + +import ( + "context" + "io" + "net/http" + "strings" + "testing" + + "github.com/kernel/hypeman/lib/hypervisor" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +type roundTripFunc func(*http.Request) (*http.Response, error) + +func (f roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return f(req) +} + +func TestMetricsRoundTripperCreatesTraceSpan(t *testing.T) { + recorder := tracetest.NewSpanRecorder() + provider := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(recorder)) + previous := otel.GetTracerProvider() + otel.SetTracerProvider(provider) + t.Cleanup(func() { + otel.SetTracerProvider(previous) + _ = provider.Shutdown(context.Background()) + }) + + rt := &metricsRoundTripper{ + base: roundTripFunc(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusNoContent, + Status: "204 No Content", + Body: io.NopCloser(strings.NewReader("")), + }, nil + }), + tracer: otel.Tracer("hypeman/vmm"), + } + + ctx := hypervisor.WithTraceAttributes(context.Background(), + attribute.String("instance_id", "inst_789"), + attribute.String("hypervisor", string(hypervisor.TypeCloudHypervisor)), + ) + req, err := http.NewRequestWithContext(ctx, http.MethodPut, "http://localhost/api/v1/vm.resume", nil) + require.NoError(t, err) + + _, err = rt.RoundTrip(req) + require.NoError(t, err) + + var found sdktrace.ReadOnlySpan + for _, span := range recorder.Ended() { + if span.Name() == "hypervisor.http PUT /api/v1/vm.resume" { + found = span + break + } + } + require.NotNil(t, found) + + attrs := make(map[string]string, len(found.Attributes())) + for _, attr := range found.Attributes() { + attrs[string(attr.Key)] = attr.Value.Emit() + } + + assert.Equal(t, "inst_789", attrs["instance_id"]) + assert.Equal(t, string(hypervisor.TypeCloudHypervisor), attrs["hypervisor"]) + assert.Equal(t, "PUT /api/v1/vm.resume", attrs["operation"]) + assert.Equal(t, "204", attrs["http.status_code"]) +}