diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 7af86128..9247aa27 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -119,6 +119,7 @@ Common settings: | `metrics.listen_address` | Bind address for `/metrics` endpoint | `127.0.0.1` | | `metrics.port` | Port for `/metrics` endpoint | `9464` | | `metrics.vm_label_budget` | Warning threshold for observed per-VM metric labels | `200` | +| `metrics.resource_refresh_interval` | Refresh interval for cached resource capacity metrics | `120s` | | `limits.max_concurrent_builds` | Max concurrent image builds | `1` | | `limits.max_overlay_size` | Max overlay filesystem size | `100GB` | | `acme.email` | Email for ACME certificate registration | _(empty)_ | diff --git a/cmd/api/api/registry_test.go b/cmd/api/api/registry_test.go index b92f6846..0b89337f 100644 --- a/cmd/api/api/registry_test.go +++ b/cmd/api/api/registry_test.go @@ -265,7 +265,7 @@ func TestRegistryLayerCaching(t *testing.T) { // that share layers reuses the cached shared layers. func TestRegistrySharedLayerCaching(t *testing.T) { t.Parallel() - _, serverHost := setupRegistryTest(t) + svc, serverHost := setupRegistryTest(t) // Pull alpine image (this will be our base) t.Log("Pulling alpine:latest...") @@ -297,6 +297,7 @@ func TestRegistrySharedLayerCaching(t *testing.T) { err = remote.Write(dstRef, alpineImg, remote.WithTransport(transport1)) require.NoError(t, err) t.Logf("First push (alpine): %d blob uploads", firstPushBlobUploads) + waitForImageReady(t, svc, "shared/alpine@"+alpineDigest.String(), 60*time.Second) // Now pull a different alpine-based image (e.g., alpine:3.18) // which should share the base layer with alpine:latest @@ -339,8 +340,7 @@ func TestRegistrySharedLayerCaching(t *testing.T) { assert.LessOrEqual(t, secondPushBlobUploads, firstPushBlobUploads, "Second push should upload same or fewer blobs due to layer sharing") - // Wait for async conversion - time.Sleep(2 * time.Second) + waitForImageReady(t, svc, "shared/alpine318@"+alpine318Digest.String(), 60*time.Second) } // TestRegistryTagPush verifies that pushing with a tag reference (not digest) diff --git a/cmd/api/config/config.go b/cmd/api/config/config.go index 8508537b..6abfe952 100644 --- a/cmd/api/config/config.go +++ b/cmd/api/config/config.go @@ -98,9 +98,10 @@ type APIConfig struct { // MetricsConfig holds metrics endpoint settings. type MetricsConfig struct { - ListenAddress string `koanf:"listen_address"` - Port int `koanf:"port"` - VMLabelBudget int `koanf:"vm_label_budget"` + ListenAddress string `koanf:"listen_address"` + Port int `koanf:"port"` + VMLabelBudget int `koanf:"vm_label_budget"` + ResourceRefreshInterval string `koanf:"resource_refresh_interval"` } // OtelConfig holds OpenTelemetry settings. @@ -294,9 +295,10 @@ func defaultConfig() *Config { }, Metrics: MetricsConfig{ - ListenAddress: "127.0.0.1", - Port: 9464, - VMLabelBudget: 200, + ListenAddress: "127.0.0.1", + Port: 9464, + VMLabelBudget: 200, + ResourceRefreshInterval: "120s", }, Otel: OtelConfig{ @@ -462,6 +464,16 @@ func (c *Config) Validate() error { if c.Metrics.VMLabelBudget <= 0 { return fmt.Errorf("metrics.vm_label_budget must be positive, got %d", c.Metrics.VMLabelBudget) } + if strings.TrimSpace(c.Metrics.ResourceRefreshInterval) == "" { + return fmt.Errorf("metrics.resource_refresh_interval must not be empty") + } + interval, err := time.ParseDuration(c.Metrics.ResourceRefreshInterval) + if err != nil { + return fmt.Errorf("metrics.resource_refresh_interval must be a valid duration, got %q: %w", c.Metrics.ResourceRefreshInterval, err) + } + if interval <= 0 { + return fmt.Errorf("metrics.resource_refresh_interval must be positive, got %q", c.Metrics.ResourceRefreshInterval) + } if c.Otel.MetricExportInterval != "" { if _, err := time.ParseDuration(c.Otel.MetricExportInterval); err != nil { return fmt.Errorf("otel.metric_export_interval must be a valid duration, got %q: %w", c.Otel.MetricExportInterval, err) diff --git a/cmd/api/config/config_test.go b/cmd/api/config/config_test.go index 6fd9b594..2923c973 100644 --- a/cmd/api/config/config_test.go +++ b/cmd/api/config/config_test.go @@ -22,6 +22,9 @@ func TestDefaultConfigIncludesMetricsSettings(t *testing.T) { if cfg.Metrics.VMLabelBudget != 200 { t.Fatalf("expected default metrics.vm_label_budget to be 200, got %d", cfg.Metrics.VMLabelBudget) } + if cfg.Metrics.ResourceRefreshInterval != "120s" { + t.Fatalf("expected default metrics.resource_refresh_interval to be 120s, got %q", cfg.Metrics.ResourceRefreshInterval) + } if cfg.Otel.MetricExportInterval != "60s" { t.Fatalf("expected default otel.metric_export_interval to be 60s, got %q", cfg.Otel.MetricExportInterval) } @@ -31,6 +34,7 @@ func TestLoadEnvOverridesMetricsAndOtelInterval(t *testing.T) { t.Setenv("METRICS__LISTEN_ADDRESS", "0.0.0.0") t.Setenv("METRICS__PORT", "9999") t.Setenv("METRICS__VM_LABEL_BUDGET", "350") + t.Setenv("METRICS__RESOURCE_REFRESH_INTERVAL", "30s") t.Setenv("OTEL__METRIC_EXPORT_INTERVAL", "15s") tmp := t.TempDir() @@ -53,6 +57,9 @@ func TestLoadEnvOverridesMetricsAndOtelInterval(t *testing.T) { if cfg.Metrics.VMLabelBudget != 350 { t.Fatalf("expected metrics.vm_label_budget override, got %d", cfg.Metrics.VMLabelBudget) } + if cfg.Metrics.ResourceRefreshInterval != "30s" { + t.Fatalf("expected metrics.resource_refresh_interval override, got %q", cfg.Metrics.ResourceRefreshInterval) + } if cfg.Otel.MetricExportInterval != "15s" { t.Fatalf("expected otel.metric_export_interval override, got %q", cfg.Otel.MetricExportInterval) } @@ -88,6 +95,32 @@ func TestValidateRejectsInvalidVMLabelBudget(t *testing.T) { } } +func TestValidateRejectsInvalidResourceRefreshInterval(t *testing.T) { + cfg := defaultConfig() + cfg.Metrics.ResourceRefreshInterval = "" + + err := cfg.Validate() + if err == nil { + t.Fatalf("expected validation error for empty resource refresh interval") + } + + cfg = defaultConfig() + cfg.Metrics.ResourceRefreshInterval = "not-a-duration" + + err = cfg.Validate() + if err == nil { + t.Fatalf("expected validation error for invalid resource refresh interval") + } + + cfg = defaultConfig() + cfg.Metrics.ResourceRefreshInterval = "0s" + + err = cfg.Validate() + if err == nil { + t.Fatalf("expected validation error for non-positive resource refresh interval") + } +} + func TestValidateRejectsEmptyActiveBallooningDurations(t *testing.T) { cfg := defaultConfig() cfg.Hypervisor.Memory.ActiveBallooning.PollInterval = " " diff --git a/cmd/api/main.go b/cmd/api/main.go index bfdeaf8e..dcf9f748 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -139,6 +139,14 @@ func run() error { logger := app.Logger + resourceRefreshInterval, err := time.ParseDuration(app.Config.Metrics.ResourceRefreshInterval) + if err != nil { + return fmt.Errorf("invalid metrics resource refresh interval %q: %w", app.Config.Metrics.ResourceRefreshInterval, err) + } + if err := app.ResourceManager.StartMonitoring(ctx, otelProvider.Meter, resourceRefreshInterval); err != nil { + return fmt.Errorf("start resource monitoring: %w", err) + } + // Log OTel status if cfg.Otel.Enabled { logger.Info("OpenTelemetry push enabled", "endpoint", cfg.Otel.Endpoint, "service", cfg.Otel.ServiceName, "metric_export_interval", cfg.Otel.MetricExportInterval) diff --git a/config.example.darwin.yaml b/config.example.darwin.yaml index b3ee8813..0ace51dc 100644 --- a/config.example.darwin.yaml +++ b/config.example.darwin.yaml @@ -111,6 +111,7 @@ limits: # listen_address: 127.0.0.1 # port: 9464 # vm_label_budget: 200 +# resource_refresh_interval: 120s # # otel: # enabled: false diff --git a/config.example.yaml b/config.example.yaml index bfd30a27..345c2da9 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -105,6 +105,7 @@ data_dir: /var/lib/hypeman # listen_address: 127.0.0.1 # port: 9464 # vm_label_budget: 200 +# resource_refresh_interval: 120s # # otel: # enabled: false diff --git a/lib/instances/compression_integration_linux_test.go b/lib/instances/compression_integration_linux_test.go index c02be929..f58b5b1f 100644 --- a/lib/instances/compression_integration_linux_test.go +++ b/lib/instances/compression_integration_linux_test.go @@ -31,6 +31,8 @@ type compressionIntegrationHarness struct { waitHypervisorUp func(ctx context.Context, inst *Instance) error } +const compressionGuestExecTimeout = 20 * time.Second + func TestCloudHypervisorStandbyRestoreCompressionScenarios(t *testing.T) { t.Parallel() @@ -261,14 +263,20 @@ func waitForRunningAndExecReady(t *testing.T, ctx context.Context, mgr *manager, func writeGuestMarker(t *testing.T, ctx context.Context, inst *Instance, path string, value string) { t.Helper() - output, exitCode, err := execCommand(ctx, inst, "sh", "-c", fmt.Sprintf("printf %q > %s && sync", value, path)) + execCtx, cancel := context.WithTimeout(ctx, integrationTestTimeout(compressionGuestExecTimeout)) + defer cancel() + + output, exitCode, err := execCommand(execCtx, inst, "sh", "-c", fmt.Sprintf("printf %q > %s && sync", value, path)) require.NoError(t, err) require.Equal(t, 0, exitCode, output) } func assertGuestMarker(t *testing.T, ctx context.Context, inst *Instance, path string, expected string) { t.Helper() - output, exitCode, err := execCommand(ctx, inst, "cat", path) + execCtx, cancel := context.WithTimeout(ctx, integrationTestTimeout(compressionGuestExecTimeout)) + defer cancel() + + output, exitCode, err := execCommand(execCtx, inst, "cat", path) require.NoError(t, err) require.Equal(t, 0, exitCode, output) assert.Equal(t, expected, output) diff --git a/lib/instances/manager_test.go b/lib/instances/manager_test.go index 0e87b8e3..b6979cb9 100644 --- a/lib/instances/manager_test.go +++ b/lib/instances/manager_test.go @@ -1117,7 +1117,7 @@ func TestEntrypointEnvVars(t *testing.T) { require.NoError(t, err) require.NotNil(t, inst) assert.Contains(t, []State{StateInitializing, StateRunning}, inst.State) - inst, err = waitForInstanceState(ctx, mgr, inst.Id, StateRunning, 20*time.Second) + inst, err = waitForInstanceState(ctx, mgr, inst.Id, StateRunning, 60*time.Second) require.NoError(t, err) t.Logf("Instance created: %s", inst.Id) diff --git a/lib/instances/qemu_test.go b/lib/instances/qemu_test.go index 155435b8..033503a4 100644 --- a/lib/instances/qemu_test.go +++ b/lib/instances/qemu_test.go @@ -671,7 +671,7 @@ func TestQEMUEntrypointEnvVars(t *testing.T) { require.NoError(t, err) require.NotNil(t, inst) assert.Contains(t, []State{StateInitializing, StateRunning}, inst.State) - inst, err = waitForInstanceState(ctx, mgr, inst.Id, StateRunning, 20*time.Second) + inst, err = waitForInstanceState(ctx, mgr, inst.Id, StateRunning, 60*time.Second) require.NoError(t, err) assert.Equal(t, hypervisor.TypeQEMU, inst.HypervisorType, "Instance should use QEMU hypervisor") t.Logf("Instance created: %s", inst.Id) diff --git a/lib/resources/monitoring.go b/lib/resources/monitoring.go new file mode 100644 index 00000000..322a532a --- /dev/null +++ b/lib/resources/monitoring.go @@ -0,0 +1,244 @@ +package resources + +import ( + "context" + "fmt" + "runtime/debug" + "sync" + "time" + + "github.com/kernel/hypeman/lib/logger" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +type monitoringState struct { + mu sync.RWMutex + started bool + metricsRegistered bool + snapshot monitoringSnapshot + hasSnapshot bool +} + +type monitoringSnapshot struct { + status FullResourceStatus + imageStorageCurrent int64 + imageStorageMax int64 +} + +func (m *Manager) StartMonitoring(ctx context.Context, meter metric.Meter, refreshInterval time.Duration) error { + if meter == nil { + return nil + } + if refreshInterval <= 0 { + return fmt.Errorf("resource monitoring refresh interval must be positive, got %s", refreshInterval) + } + + m.monitoring.mu.Lock() + if m.monitoring.started { + m.monitoring.mu.Unlock() + return nil + } + if !m.monitoring.metricsRegistered { + if err := newMonitoringMetrics(meter, m); err != nil { + m.monitoring.mu.Unlock() + return err + } + m.monitoring.metricsRegistered = true + } + m.monitoring.mu.Unlock() + + if err := m.refreshMonitoringSnapshot(ctx); err != nil { + return err + } + + m.monitoring.mu.Lock() + if m.monitoring.started { + m.monitoring.mu.Unlock() + return nil + } + m.monitoring.started = true + m.monitoring.mu.Unlock() + + go func() { + log := logger.FromContext(ctx) + defer func() { + if r := recover(); r != nil { + m.monitoring.mu.Lock() + m.monitoring.started = false + m.monitoring.mu.Unlock() + log.ErrorContext(ctx, "resource monitoring refresh loop panicked", + "panic", r, + "stack", string(debug.Stack()), + ) + } + }() + + ticker := time.NewTicker(refreshInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := m.refreshMonitoringSnapshot(ctx); err != nil { + log.WarnContext(ctx, "resource monitoring snapshot refresh failed", "error", err) + } + } + } + }() + + return nil +} + +func (m *Manager) refreshMonitoringSnapshot(ctx context.Context) error { + status, err := m.GetFullStatus(ctx) + if err != nil { + return err + } + + snapshot := monitoringSnapshot{ + status: *status, + } + if status.DiskDetail != nil { + snapshot.imageStorageCurrent = status.DiskDetail.Images + status.DiskDetail.OCICache + } + snapshot.imageStorageMax = m.MaxImageStorageBytes() + + m.monitoring.mu.Lock() + m.monitoring.snapshot = snapshot + m.monitoring.hasSnapshot = true + m.monitoring.mu.Unlock() + + return nil +} + +func (m *Manager) currentMonitoringSnapshot() (monitoringSnapshot, bool) { + m.monitoring.mu.RLock() + defer m.monitoring.mu.RUnlock() + + if !m.monitoring.hasSnapshot { + return monitoringSnapshot{}, false + } + + return m.monitoring.snapshot, true +} + +func newMonitoringMetrics(meter metric.Meter, mgr *Manager) error { + capacity, err := meter.Int64ObservableGauge( + "hypeman_resources_capacity", + metric.WithDescription("Raw host capacity by resource type"), + ) + if err != nil { + return err + } + + effectiveLimit, err := meter.Int64ObservableGauge( + "hypeman_resources_effective_limit", + metric.WithDescription("Effective allocatable limit by resource type after oversubscription"), + ) + if err != nil { + return err + } + + allocated, err := meter.Int64ObservableGauge( + "hypeman_resources_allocated", + metric.WithDescription("Current allocated amount by resource type"), + ) + if err != nil { + return err + } + + oversubRatio, err := meter.Float64ObservableGauge( + "hypeman_resources_oversub_ratio", + metric.WithDescription("Oversubscription ratio by resource type"), + ) + if err != nil { + return err + } + + diskBreakdown, err := meter.Int64ObservableGauge( + "hypeman_resources_disk_breakdown_bytes", + metric.WithDescription("Disk usage broken down by component"), + metric.WithUnit("By"), + ) + if err != nil { + return err + } + + imageStorage, err := meter.Int64ObservableGauge( + "hypeman_resources_image_storage_bytes", + metric.WithDescription("Current and maximum image storage bytes"), + metric.WithUnit("By"), + ) + if err != nil { + return err + } + + gpuSlots, err := meter.Int64ObservableGauge( + "hypeman_resources_gpu_slots", + metric.WithDescription("Total and used GPU slots"), + ) + if err != nil { + return err + } + + gpuProfileSlots, err := meter.Int64ObservableGauge( + "hypeman_resources_gpu_profile_slots", + metric.WithDescription("Available GPU slots by vGPU profile"), + ) + if err != nil { + return err + } + + if _, err := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { + snapshot, ok := mgr.currentMonitoringSnapshot() + if !ok { + return nil + } + + resourceStatuses := []ResourceStatus{ + snapshot.status.CPU, + snapshot.status.Memory, + snapshot.status.Disk, + snapshot.status.Network, + snapshot.status.DiskIO, + } + for _, status := range resourceStatuses { + attrs := metric.WithAttributes(attribute.String("resource", string(status.Type))) + o.ObserveInt64(capacity, status.Capacity, attrs) + o.ObserveInt64(effectiveLimit, status.EffectiveLimit, attrs) + o.ObserveInt64(allocated, status.Allocated, attrs) + o.ObserveFloat64(oversubRatio, status.OversubRatio, attrs) + } + + if snapshot.status.DiskDetail != nil { + o.ObserveInt64(diskBreakdown, snapshot.status.DiskDetail.Images, metric.WithAttributes(attribute.String("component", "images"))) + o.ObserveInt64(diskBreakdown, snapshot.status.DiskDetail.OCICache, metric.WithAttributes(attribute.String("component", "oci_cache"))) + o.ObserveInt64(diskBreakdown, snapshot.status.DiskDetail.Volumes, metric.WithAttributes(attribute.String("component", "volumes"))) + o.ObserveInt64(diskBreakdown, snapshot.status.DiskDetail.Overlays, metric.WithAttributes(attribute.String("component", "overlays"))) + o.ObserveInt64(imageStorage, snapshot.imageStorageCurrent, metric.WithAttributes(attribute.String("kind", "current"))) + } + o.ObserveInt64(imageStorage, snapshot.imageStorageMax, metric.WithAttributes(attribute.String("kind", "max"))) + + if snapshot.status.GPU != nil { + o.ObserveInt64(gpuSlots, int64(snapshot.status.GPU.UsedSlots), metric.WithAttributes(attribute.String("kind", "used"))) + o.ObserveInt64(gpuSlots, int64(snapshot.status.GPU.TotalSlots), metric.WithAttributes(attribute.String("kind", "total"))) + for _, profile := range snapshot.status.GPU.Profiles { + o.ObserveInt64(gpuProfileSlots, int64(profile.Available), + metric.WithAttributes( + attribute.String("profile", profile.Name), + attribute.String("kind", "available"), + ), + ) + } + } + + return nil + }, capacity, effectiveLimit, allocated, oversubRatio, diskBreakdown, imageStorage, gpuSlots, gpuProfileSlots); err != nil { + return err + } + + return nil +} diff --git a/lib/resources/monitoring_test.go b/lib/resources/monitoring_test.go new file mode 100644 index 00000000..d19751a6 --- /dev/null +++ b/lib/resources/monitoring_test.go @@ -0,0 +1,277 @@ +package resources + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/kernel/hypeman/cmd/api/config" + "github.com/kernel/hypeman/lib/devices" + "github.com/kernel/hypeman/lib/paths" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + otelmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +type monitoringInstanceLister struct { + mu sync.RWMutex + allocations []InstanceAllocation +} + +func (m *monitoringInstanceLister) ListInstanceAllocations(ctx context.Context) ([]InstanceAllocation, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return append([]InstanceAllocation(nil), m.allocations...), nil +} + +func (m *monitoringInstanceLister) SetAllocations(allocations []InstanceAllocation) { + m.mu.Lock() + defer m.mu.Unlock() + m.allocations = append([]InstanceAllocation(nil), allocations...) +} + +type monitoringImageLister struct { + mu sync.RWMutex + totalBytes int64 + ociCacheBytes int64 +} + +func (m *monitoringImageLister) TotalImageBytes(ctx context.Context) (int64, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return m.totalBytes, nil +} + +func (m *monitoringImageLister) TotalOCICacheBytes(ctx context.Context) (int64, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return m.ociCacheBytes, nil +} + +func (m *monitoringImageLister) SetSizes(totalBytes, ociCacheBytes int64) { + m.mu.Lock() + defer m.mu.Unlock() + m.totalBytes = totalBytes + m.ociCacheBytes = ociCacheBytes +} + +func monitoringTestManager(t *testing.T) (*Manager, *monitoringInstanceLister, *monitoringImageLister) { + t.Helper() + + cfg := &config.Config{ + DataDir: t.TempDir(), + Limits: config.LimitsConfig{ + MaxImageStorage: 0.2, + }, + Oversubscription: config.OversubscriptionConfig{ + CPU: 2.0, Memory: 1.5, Disk: 1.0, Network: 1.0, DiskIO: 2.0, + }, + Capacity: config.CapacityConfig{ + Network: "10Gbps", + DiskIO: "1GB/s", + }, + } + + instanceLister := &monitoringInstanceLister{ + allocations: []InstanceAllocation{ + { + ID: "vm-1", + Name: "test-vm", + Vcpus: 4, + MemoryBytes: 8 * 1024 * 1024 * 1024, + OverlayBytes: 10 * 1024 * 1024 * 1024, + VolumeOverlayBytes: 5 * 1024 * 1024 * 1024, + NetworkDownloadBps: 125000000, + NetworkUploadBps: 125000000, + DiskIOBps: 64 * 1024 * 1024, + State: "Running", + }, + }, + } + imageLister := &monitoringImageLister{ + totalBytes: 50 * 1024 * 1024 * 1024, + ociCacheBytes: 25 * 1024 * 1024 * 1024, + } + volumeLister := &mockVolumeLister{totalBytes: 100 * 1024 * 1024 * 1024} + + mgr := NewManager(cfg, paths.New(cfg.DataDir)) + mgr.SetInstanceLister(instanceLister) + mgr.SetImageLister(imageLister) + mgr.SetVolumeLister(volumeLister) + require.NoError(t, mgr.Initialize(context.Background())) + + return mgr, instanceLister, imageLister +} + +func TestStartMonitoringPublishesCapacityMetrics(t *testing.T) { + mgr, _, _ := monitoringTestManager(t) + + reader := otelmetric.NewManualReader() + provider := otelmetric.NewMeterProvider(otelmetric.WithReader(reader)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + require.NoError(t, mgr.StartMonitoring(ctx, provider.Meter("test"), time.Hour)) + + status, err := mgr.GetFullStatus(context.Background()) + require.NoError(t, err) + + rm := collectMonitoringMetrics(t, reader) + + require.Equal(t, status.CPU.Capacity, int64GaugeValue(t, rm, "hypeman_resources_capacity", map[string]string{"resource": "cpu"})) + require.Equal(t, status.CPU.EffectiveLimit, int64GaugeValue(t, rm, "hypeman_resources_effective_limit", map[string]string{"resource": "cpu"})) + require.Equal(t, status.CPU.Allocated, int64GaugeValue(t, rm, "hypeman_resources_allocated", map[string]string{"resource": "cpu"})) + require.Equal(t, status.CPU.OversubRatio, float64GaugeValue(t, rm, "hypeman_resources_oversub_ratio", map[string]string{"resource": "cpu"})) + + require.NotNil(t, status.DiskDetail) + require.Equal(t, status.DiskDetail.Images, int64GaugeValue(t, rm, "hypeman_resources_disk_breakdown_bytes", map[string]string{"component": "images"})) + require.Equal(t, status.DiskDetail.OCICache, int64GaugeValue(t, rm, "hypeman_resources_disk_breakdown_bytes", map[string]string{"component": "oci_cache"})) + require.Equal(t, status.DiskDetail.Volumes, int64GaugeValue(t, rm, "hypeman_resources_disk_breakdown_bytes", map[string]string{"component": "volumes"})) + require.Equal(t, status.DiskDetail.Overlays, int64GaugeValue(t, rm, "hypeman_resources_disk_breakdown_bytes", map[string]string{"component": "overlays"})) + + currentImageStorage := status.DiskDetail.Images + status.DiskDetail.OCICache + require.Equal(t, currentImageStorage, int64GaugeValue(t, rm, "hypeman_resources_image_storage_bytes", map[string]string{"kind": "current"})) + require.Equal(t, mgr.MaxImageStorageBytes(), int64GaugeValue(t, rm, "hypeman_resources_image_storage_bytes", map[string]string{"kind": "max"})) +} + +func TestStartMonitoringRefreshesSnapshot(t *testing.T) { + mgr, instanceLister, imageLister := monitoringTestManager(t) + + reader := otelmetric.NewManualReader() + provider := otelmetric.NewMeterProvider(otelmetric.WithReader(reader)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + require.NoError(t, mgr.StartMonitoring(ctx, provider.Meter("test"), 20*time.Millisecond)) + + instanceLister.SetAllocations([]InstanceAllocation{ + { + ID: "vm-2", + Name: "updated-vm", + Vcpus: 7, + MemoryBytes: 12 * 1024 * 1024 * 1024, + OverlayBytes: 15 * 1024 * 1024 * 1024, + VolumeOverlayBytes: 1 * 1024 * 1024 * 1024, + NetworkDownloadBps: 200000000, + NetworkUploadBps: 180000000, + DiskIOBps: 128 * 1024 * 1024, + State: "Running", + }, + }) + imageLister.SetSizes(60*1024*1024*1024, 10*1024*1024*1024) + + require.Eventually(t, func() bool { + rm := collectMonitoringMetrics(t, reader) + cpuAllocated := int64GaugeValue(t, rm, "hypeman_resources_allocated", map[string]string{"resource": "cpu"}) + imageCurrent := int64GaugeValue(t, rm, "hypeman_resources_image_storage_bytes", map[string]string{"kind": "current"}) + return cpuAllocated == 7 && imageCurrent == 70*1024*1024*1024 + }, 500*time.Millisecond, 20*time.Millisecond) +} + +func TestStartMonitoringPublishesGPUMetrics(t *testing.T) { + mgr, _, _ := monitoringTestManager(t) + + originalProvider := currentGPUStatusProvider() + setGPUStatusProvider(func() *GPUResourceStatus { + return &GPUResourceStatus{ + Mode: "vgpu", + TotalSlots: 8, + UsedSlots: 3, + Profiles: []devices.GPUProfile{ + {Name: "L40S-1Q", Available: 5}, + {Name: "L40S-2Q", Available: 2}, + }, + } + }) + defer func() { + setGPUStatusProvider(originalProvider) + }() + + reader := otelmetric.NewManualReader() + provider := otelmetric.NewMeterProvider(otelmetric.WithReader(reader)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + require.NoError(t, mgr.StartMonitoring(ctx, provider.Meter("test"), time.Hour)) + + rm := collectMonitoringMetrics(t, reader) + require.Equal(t, int64(3), int64GaugeValue(t, rm, "hypeman_resources_gpu_slots", map[string]string{"kind": "used"})) + require.Equal(t, int64(8), int64GaugeValue(t, rm, "hypeman_resources_gpu_slots", map[string]string{"kind": "total"})) + require.Equal(t, int64(5), int64GaugeValue(t, rm, "hypeman_resources_gpu_profile_slots", map[string]string{"profile": "L40S-1Q", "kind": "available"})) + require.Equal(t, int64(2), int64GaugeValue(t, rm, "hypeman_resources_gpu_profile_slots", map[string]string{"profile": "L40S-2Q", "kind": "available"})) +} + +func collectMonitoringMetrics(t *testing.T, reader *otelmetric.ManualReader) metricdata.ResourceMetrics { + t.Helper() + + var rm metricdata.ResourceMetrics + require.NoError(t, reader.Collect(context.Background(), &rm)) + return rm +} + +func int64GaugeValue(t *testing.T, rm metricdata.ResourceMetrics, name string, wantAttrs map[string]string) int64 { + t.Helper() + + metric := findMonitoringMetric(t, rm, name) + gauge, ok := metric.Data.(metricdata.Gauge[int64]) + require.True(t, ok, "expected int64 gauge metric data for %s", name) + for _, point := range gauge.DataPoints { + if metricAttrsMatch(point.Attributes, wantAttrs) { + return point.Value + } + } + t.Fatalf("metric %s with attrs %v not found", name, wantAttrs) + return 0 +} + +func float64GaugeValue(t *testing.T, rm metricdata.ResourceMetrics, name string, wantAttrs map[string]string) float64 { + t.Helper() + + metric := findMonitoringMetric(t, rm, name) + gauge, ok := metric.Data.(metricdata.Gauge[float64]) + require.True(t, ok, "expected float64 gauge metric data for %s", name) + for _, point := range gauge.DataPoints { + if metricAttrsMatch(point.Attributes, wantAttrs) { + return point.Value + } + } + t.Fatalf("metric %s with attrs %v not found", name, wantAttrs) + return 0 +} + +func findMonitoringMetric(t *testing.T, rm metricdata.ResourceMetrics, name string) metricdata.Metrics { + t.Helper() + + for _, scope := range rm.ScopeMetrics { + for _, metric := range scope.Metrics { + if metric.Name == name { + return metric + } + } + } + + t.Fatalf("metric %s not found", name) + return metricdata.Metrics{} +} + +func metricAttrsMatch(set attribute.Set, want map[string]string) bool { + if len(want) == 0 { + return true + } + + attrs := make(map[string]string, len(set.ToSlice())) + for _, kv := range set.ToSlice() { + attrs[string(kv.Key)] = kv.Value.AsString() + } + for key, value := range want { + if attrs[key] != value { + return false + } + } + return true +} diff --git a/lib/resources/resource.go b/lib/resources/resource.go index b22f7978..7435421e 100644 --- a/lib/resources/resource.go +++ b/lib/resources/resource.go @@ -32,6 +32,26 @@ const ( SourceConfigured SourceType = "configured" // Explicitly configured by operator ) +var ( + gpuStatusProviderMu sync.RWMutex + gpuStatusProvider = GetGPUStatus +) + +func currentGPUStatusProvider() func() *GPUResourceStatus { + gpuStatusProviderMu.RLock() + defer gpuStatusProviderMu.RUnlock() + return gpuStatusProvider +} + +func setGPUStatusProvider(fn func() *GPUResourceStatus) { + if fn == nil { + fn = GetGPUStatus + } + gpuStatusProviderMu.Lock() + defer gpuStatusProviderMu.Unlock() + gpuStatusProvider = fn +} + // Resource represents a discoverable and allocatable host resource. type Resource interface { // Type returns the resource type identifier. @@ -140,8 +160,9 @@ type Manager struct { cfg *config.Config paths *paths.Paths - mu sync.RWMutex - resources map[ResourceType]Resource + mu sync.RWMutex + resources map[ResourceType]Resource + monitoring *monitoringState // Dependencies for allocation calculations instanceLister InstanceLister @@ -152,9 +173,10 @@ type Manager struct { // NewManager creates a new resource manager. func NewManager(cfg *config.Config, p *paths.Paths) *Manager { return &Manager{ - cfg: cfg, - paths: p, - resources: make(map[ResourceType]Resource), + cfg: cfg, + paths: p, + resources: make(map[ResourceType]Resource), + monitoring: &monitoringState{}, } } @@ -362,7 +384,7 @@ func (m *Manager) GetFullStatus(ctx context.Context) (*FullResourceStatus, error } // Get GPU status - gpuStatus := GetGPUStatus() + gpuStatus := currentGPUStatusProvider()() return &FullResourceStatus{ CPU: *cpuStatus,