Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions cmd/api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,13 @@ type MetricsConfig struct {

// OtelConfig holds OpenTelemetry settings.
type OtelConfig struct {
Enabled bool `koanf:"enabled"`
Endpoint string `koanf:"endpoint"`
ServiceName string `koanf:"service_name"`
ServiceInstanceID string `koanf:"service_instance_id"`
Insecure bool `koanf:"insecure"`
MetricExportInterval string `koanf:"metric_export_interval"`
Enabled bool `koanf:"enabled"`
Endpoint string `koanf:"endpoint"`
ServiceName string `koanf:"service_name"`
ServiceInstanceID string `koanf:"service_instance_id"`
Insecure bool `koanf:"insecure"`
MetricExportInterval string `koanf:"metric_export_interval"`
SuccessfulGetSampleRatio float64 `koanf:"successful_get_sample_ratio"`
}

// LoggingConfig holds log rotation and level settings.
Expand Down Expand Up @@ -302,12 +303,13 @@ func defaultConfig() *Config {
},

Otel: OtelConfig{
Enabled: false,
Endpoint: "127.0.0.1:4317",
ServiceName: "hypeman",
ServiceInstanceID: getHostname(),
Insecure: true,
MetricExportInterval: "60s",
Enabled: false,
Endpoint: "127.0.0.1:4317",
ServiceName: "hypeman",
ServiceInstanceID: getHostname(),
Insecure: true,
MetricExportInterval: "60s",
SuccessfulGetSampleRatio: 0.1,
},

Logging: LoggingConfig{
Expand Down Expand Up @@ -479,6 +481,9 @@ func (c *Config) Validate() error {
return fmt.Errorf("otel.metric_export_interval must be a valid duration, got %q: %w", c.Otel.MetricExportInterval, err)
}
}
if c.Otel.SuccessfulGetSampleRatio < 0 || c.Otel.SuccessfulGetSampleRatio > 1 {
return fmt.Errorf("otel.successful_get_sample_ratio must be between 0 and 1, got %v", c.Otel.SuccessfulGetSampleRatio)
}
if c.Oversubscription.CPU <= 0 {
return fmt.Errorf("oversubscription.cpu must be positive, got %v", c.Oversubscription.CPU)
}
Expand Down
17 changes: 17 additions & 0 deletions cmd/api/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ func TestDefaultConfigIncludesMetricsSettings(t *testing.T) {
if cfg.Otel.MetricExportInterval != "60s" {
t.Fatalf("expected default otel.metric_export_interval to be 60s, got %q", cfg.Otel.MetricExportInterval)
}
if cfg.Otel.SuccessfulGetSampleRatio != 0.1 {
t.Fatalf("expected default otel.successful_get_sample_ratio to be 0.1, got %v", cfg.Otel.SuccessfulGetSampleRatio)
}
}

func TestLoadEnvOverridesMetricsAndOtelInterval(t *testing.T) {
Expand All @@ -36,6 +39,7 @@ func TestLoadEnvOverridesMetricsAndOtelInterval(t *testing.T) {
t.Setenv("METRICS__VM_LABEL_BUDGET", "350")
t.Setenv("METRICS__RESOURCE_REFRESH_INTERVAL", "30s")
t.Setenv("OTEL__METRIC_EXPORT_INTERVAL", "15s")
t.Setenv("OTEL__SUCCESSFUL_GET_SAMPLE_RATIO", "0.25")

tmp := t.TempDir()
cfgPath := filepath.Join(tmp, "config.yaml")
Expand Down Expand Up @@ -63,6 +67,9 @@ func TestLoadEnvOverridesMetricsAndOtelInterval(t *testing.T) {
if cfg.Otel.MetricExportInterval != "15s" {
t.Fatalf("expected otel.metric_export_interval override, got %q", cfg.Otel.MetricExportInterval)
}
if cfg.Otel.SuccessfulGetSampleRatio != 0.25 {
t.Fatalf("expected otel.successful_get_sample_ratio override, got %v", cfg.Otel.SuccessfulGetSampleRatio)
}
}

func TestValidateRejectsInvalidMetricsPort(t *testing.T) {
Expand All @@ -85,6 +92,16 @@ func TestValidateRejectsInvalidMetricExportInterval(t *testing.T) {
}
}

func TestValidateRejectsInvalidSuccessfulGetSampleRatio(t *testing.T) {
cfg := defaultConfig()
cfg.Otel.SuccessfulGetSampleRatio = 1.1

err := cfg.Validate()
if err == nil {
t.Fatalf("expected validation error for invalid successful get sample ratio")
}
}

func TestValidateRejectsInvalidVMLabelBudget(t *testing.T) {
cfg := defaultConfig()
cfg.Metrics.VMLabelBudget = 0
Expand Down
19 changes: 10 additions & 9 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,15 @@ func run() error {

// Initialize OpenTelemetry (before wire initialization)
otelCfg := otel.Config{
Enabled: cfg.Otel.Enabled,
Endpoint: cfg.Otel.Endpoint,
ServiceName: cfg.Otel.ServiceName,
ServiceInstanceID: cfg.Otel.ServiceInstanceID,
Insecure: cfg.Otel.Insecure,
MetricExportInterval: cfg.Otel.MetricExportInterval,
Version: cfg.Version,
Env: cfg.Env,
Enabled: cfg.Otel.Enabled,
Endpoint: cfg.Otel.Endpoint,
ServiceName: cfg.Otel.ServiceName,
ServiceInstanceID: cfg.Otel.ServiceInstanceID,
Insecure: cfg.Otel.Insecure,
MetricExportInterval: cfg.Otel.MetricExportInterval,
SuccessfulGetSampleRatio: cfg.Otel.SuccessfulGetSampleRatio,
Version: cfg.Version,
Env: cfg.Env,
}

otelProvider, otelShutdown, err := otel.Init(context.Background(), otelCfg)
Expand Down Expand Up @@ -149,7 +150,7 @@ func run() error {

// Log OTel status
if cfg.Otel.Enabled {
logger.Info("OpenTelemetry push enabled", "endpoint", cfg.Otel.Endpoint, "service", cfg.Otel.ServiceName, "metric_export_interval", cfg.Otel.MetricExportInterval)
logger.Info("OpenTelemetry push enabled", "endpoint", cfg.Otel.Endpoint, "service", cfg.Otel.ServiceName, "metric_export_interval", cfg.Otel.MetricExportInterval, "successful_get_sample_ratio", cfg.Otel.SuccessfulGetSampleRatio)
} else {
logger.Info("OpenTelemetry push disabled; Prometheus pull metrics remain available")
}
Expand Down
50 changes: 36 additions & 14 deletions lib/hypervisor/firecracker/firecracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,24 +283,38 @@ func (f *Firecracker) do(ctx context.Context, method, path string, reqBody any,
attribute.String("http.method", method),
attribute.String("http.route", path),
)
ctx, span := otel.Tracer("hypeman/hypervisor/firecracker").Start(ctx, "hypervisor.http "+method+" "+path, trace.WithAttributes(attrs...))
defer span.End()
tracer := otel.Tracer("hypeman/hypervisor/firecracker")
spanName := "hypervisor.http " + method + " " + path
shouldTrace := hypervisor.ShouldTraceHypervisorHTTPSpan(method, path)

var span trace.Span
if shouldTrace {
var spanCtx context.Context
spanCtx, span = tracer.Start(ctx, spanName, trace.WithAttributes(attrs...))
ctx = spanCtx
defer span.End()
}

recordError := func(err error) {
if shouldTrace {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
}

var bodyReader io.Reader
if reqBody != nil {
data, err := json.Marshal(reqBody)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
recordError(err)
return nil, fmt.Errorf("marshal request body: %w", err)
}
bodyReader = bytes.NewReader(data)
}

req, err := http.NewRequestWithContext(ctx, method, "http://localhost"+path, bodyReader)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
recordError(err)
return nil, fmt.Errorf("create request: %w", err)
}
req.Header.Set("Accept", "application/json")
Expand All @@ -310,35 +324,43 @@ func (f *Firecracker) do(ctx context.Context, method, path string, reqBody any,

resp, err := f.client.Do(req)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
recordError(err)
return nil, fmt.Errorf("request %s %s: %w", method, path, err)
}
defer resp.Body.Close()
span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
if shouldTrace {
span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
}

data, err := io.ReadAll(resp.Body)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
recordError(err)
return nil, fmt.Errorf("read response body: %w", err)
}

for _, status := range expectedStatus {
if resp.StatusCode == status {
span.SetStatus(codes.Ok, "")
if shouldTrace {
span.SetStatus(codes.Ok, "")
}
return data, nil
}
}

if len(data) > 0 {
var apiErr apiError
if err := json.Unmarshal(data, &apiErr); err == nil && apiErr.FaultMessage != "" {
span.SetStatus(codes.Error, apiErr.FaultMessage)
if shouldTrace {
span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
span.SetStatus(codes.Error, apiErr.FaultMessage)
}
return nil, fmt.Errorf("status %d: %s", resp.StatusCode, apiErr.FaultMessage)
}
}
span.SetStatus(codes.Error, resp.Status)
if shouldTrace {
span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
span.SetStatus(codes.Error, resp.Status)
}
return nil, fmt.Errorf("status %d: %s", resp.StatusCode, string(data))
}

Expand Down
42 changes: 39 additions & 3 deletions lib/hypervisor/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package hypervisor

import (
"context"
"net/http"
"time"

"github.com/kernel/hypeman/lib/paths"
Expand Down Expand Up @@ -52,6 +53,18 @@ func TraceAttributesFromContext(ctx context.Context) []attribute.KeyValue {
return out
}

func ShouldTraceHypervisorHTTPSpan(method, path string) bool {
if method != http.MethodGet {
return true
}
switch path {
case "/", "/api/v1/vm.info":
return false
default:
return true
}
}

func WrapHypervisor(hvType Type, hv Hypervisor) Hypervisor {
if hv == nil {
return nil
Expand Down Expand Up @@ -115,6 +128,21 @@ func FinishTraceSpan(span trace.Span, err error) {
finishTraceSpan(span, err)
}

func StartDetachedTraceSpan(ctx context.Context, tracer trace.Tracer, name string, attrs ...attribute.KeyValue) (context.Context, trace.Span) {
allAttrs := TraceAttributesFromContext(ctx)
if len(attrs) > 0 {
allAttrs = append(allAttrs, attrs...)
}

spanOpts := []trace.SpanStartOption{
trace.WithNewRoot(),
}
if len(allAttrs) > 0 {
spanOpts = append(spanOpts, trace.WithAttributes(allAttrs...))
}
return tracer.Start(context.Background(), name, spanOpts...)
}

func startTraceSpan(ctx context.Context, tracer trace.Tracer, name string, attrs ...attribute.KeyValue) (context.Context, trace.Span) {
allAttrs := TraceAttributesFromContext(ctx)
if len(attrs) > 0 {
Expand Down Expand Up @@ -164,9 +192,17 @@ func (h *tracingHypervisor) Shutdown(ctx context.Context) (err error) {
}

func (h *tracingHypervisor) GetVMInfo(ctx context.Context) (_ *VMInfo, err error) {
ctx, span := startTraceSpan(ctx, h.tracer, "hypervisor.get_vm_info", h.spanAttrs(attribute.String("operation", "get_vm_info"))...)
defer func() { finishTraceSpan(span, err) }()
return h.next.GetVMInfo(ctx)
info, err := h.next.GetVMInfo(ctx)
if err != nil {
_, span := StartDetachedTraceSpan(ctx, h.tracer, "hypervisor.get_vm_info",
h.spanAttrs(
attribute.String("operation", "get_vm_info"),
attribute.String("sampled_from", "error_only"),
)...,
)
finishTraceSpan(span, err)
}
return info, err
}

func (h *tracingHypervisor) Pause(ctx context.Context) (err error) {
Expand Down
56 changes: 55 additions & 1 deletion lib/hypervisor/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package hypervisor

import (
"context"
"errors"
"testing"
"time"

Expand All @@ -16,6 +17,7 @@ import (
)

type fakeHypervisor struct{}
type fakeHypervisorGetVMInfoError struct{}

func (fakeHypervisor) DeleteVM(context.Context) error { return nil }
func (fakeHypervisor) Shutdown(context.Context) error { return nil }
Expand All @@ -33,7 +35,28 @@ func (fakeHypervisor) SetTargetGuestMemoryBytes(context.Context, int64) error {
func (fakeHypervisor) GetTargetGuestMemoryBytes(context.Context) (int64, error) {
return 0, nil
}
func (fakeHypervisor) Capabilities() Capabilities { return Capabilities{} }
func (fakeHypervisor) Capabilities() Capabilities { return Capabilities{} }
func (fakeHypervisorGetVMInfoError) DeleteVM(context.Context) error { return nil }
func (fakeHypervisorGetVMInfoError) Shutdown(context.Context) error { return nil }
func (fakeHypervisorGetVMInfoError) GetVMInfo(context.Context) (*VMInfo, error) {
return nil, errors.New("vm info failed")
}
func (fakeHypervisorGetVMInfoError) Pause(context.Context) error { return nil }
func (fakeHypervisorGetVMInfoError) Resume(context.Context) error { return nil }
func (fakeHypervisorGetVMInfoError) Snapshot(context.Context, string) error { return nil }
func (fakeHypervisorGetVMInfoError) ResizeMemory(context.Context, int64) error {
return nil
}
func (fakeHypervisorGetVMInfoError) ResizeMemoryAndWait(context.Context, int64, time.Duration) error {
return nil
}
func (fakeHypervisorGetVMInfoError) SetTargetGuestMemoryBytes(context.Context, int64) error {
return nil
}
func (fakeHypervisorGetVMInfoError) GetTargetGuestMemoryBytes(context.Context) (int64, error) {
return 0, nil
}
func (fakeHypervisorGetVMInfoError) Capabilities() Capabilities { return Capabilities{} }

type fakeStarter struct {
returned Hypervisor
Expand Down Expand Up @@ -100,6 +123,37 @@ func TestWrapVMStarterWrapsReturnedHypervisor(t *testing.T) {
assert.Equal(t, string(TypeCloudHypervisor), attrs["hypervisor"])
}

func TestWrapHypervisorSkipsGetVMInfoTraceByDefault(t *testing.T) {
recorder, _ := newTestTracerProvider(t)

hv := WrapHypervisor(TypeQEMU, fakeHypervisor{})
_, err := hv.GetVMInfo(context.Background())
require.NoError(t, err)

for _, span := range recorder.Ended() {
if span.Name() == "hypervisor.get_vm_info" {
t.Fatalf("expected get vm info to be skipped by default")
}
}
}

func TestWrapHypervisorCreatesDetachedErrorSpanForGetVMInfoFailures(t *testing.T) {
recorder, _ := newTestTracerProvider(t)

ctx := WithTraceAttributes(context.Background(), attribute.String("instance_id", "inst_999"))
hv := WrapHypervisor(TypeQEMU, fakeHypervisorGetVMInfoError{})
_, err := hv.GetVMInfo(ctx)
require.Error(t, err)

span := findSpanByName(t, recorder.Ended(), "hypervisor.get_vm_info")
require.False(t, span.Parent().IsValid())

attrs := attrsToMap(span.Attributes())
assert.Equal(t, "inst_999", attrs["instance_id"])
assert.Equal(t, string(TypeQEMU), attrs["hypervisor"])
assert.Equal(t, "error_only", attrs["sampled_from"])
}

func newTestTracerProvider(t *testing.T) (*tracetest.SpanRecorder, *sdktrace.TracerProvider) {
t.Helper()

Expand Down
Loading
Loading