From 5cbb9e8aac2a63c1a44aeaa9f7009fa189a3d6a2 Mon Sep 17 00:00:00 2001 From: Sylvester Damgaard Date: Fri, 19 Jun 2026 19:09:57 +0200 Subject: [PATCH] fix health readiness semantics --- cmd/cbox-init/serve.go | 1 + internal/process/healthcheck.go | 3 ++ internal/process/healthcheck_test.go | 30 ++++++++++++++++++ internal/process/manager_lifecycle.go | 4 +++ internal/process/manager_readiness.go | 17 +++++++--- internal/process/supervisor.go | 34 ++++++++++++++++++-- internal/process/supervisor_test.go | 45 +++++++++++++++++++++++++++ 7 files changed, 127 insertions(+), 7 deletions(-) diff --git a/cmd/cbox-init/serve.go b/cmd/cbox-init/serve.go index a6ba005..0dad649 100644 --- a/cmd/cbox-init/serve.go +++ b/cmd/cbox-init/serve.go @@ -173,6 +173,7 @@ func runServe(cmd *cobra.Command, args []string) { // Monitor process health pm.MonitorProcessHealth(ctx) + pm.StartReadinessMonitor(ctx) // Always start Unix socket for local management (TUI, CLI commands) socketPath := resolveSocketPath(cfg.Global.APISocket) diff --git a/internal/process/healthcheck.go b/internal/process/healthcheck.go index 682f5f1..842951b 100644 --- a/internal/process/healthcheck.go +++ b/internal/process/healthcheck.go @@ -173,6 +173,9 @@ func (hm *HealthMonitor) Start(ctx context.Context) <-chan HealthStatus { } } + status := hm.performCheck(ctx) + statusCh <- status + ticker := time.NewTicker(time.Duration(hm.config.Period) * time.Second) defer ticker.Stop() diff --git a/internal/process/healthcheck_test.go b/internal/process/healthcheck_test.go index d8b8468..03c046d 100644 --- a/internal/process/healthcheck_test.go +++ b/internal/process/healthcheck_test.go @@ -245,6 +245,36 @@ func TestExecHealthChecker(t *testing.T) { func TestHealthMonitor(t *testing.T) { logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + t.Run("first check runs immediately after initial delay", func(t *testing.T) { + cfg := &config.HealthCheck{ + Type: "exec", + Command: []string{"echo", "healthy"}, + InitialDelay: 0, + Period: 30, + Timeout: 1, + FailureThreshold: 1, + } + + monitor, err := NewHealthMonitor("test-process", cfg, logger) + if err != nil { + t.Fatalf("NewHealthMonitor() unexpected error: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + statusCh := monitor.Start(ctx) + + select { + case status := <-statusCh: + if !status.LastCheckSucceeded { + t.Fatalf("Expected first health check to succeed, got error: %v", status.Error) + } + case <-time.After(500 * time.Millisecond): + t.Fatal("Expected first health check before first period elapsed") + } + }) + t.Run("successful monitoring", func(t *testing.T) { cfg := &config.HealthCheck{ Type: "tcp", diff --git a/internal/process/manager_lifecycle.go b/internal/process/manager_lifecycle.go index 42b78af..010bd89 100644 --- a/internal/process/manager_lifecycle.go +++ b/internal/process/manager_lifecycle.go @@ -271,6 +271,10 @@ func (m *Manager) Shutdown(ctx context.Context) error { close(m.shutdownCh) }) + if err := m.StopReadinessManager(); err != nil { + m.logger.Warn("Failed to stop readiness manager", "error", err) + } + // Stop the scheduler first (prevents new job executions) if m.scheduler.IsStarted() { m.logger.Info("Stopping scheduler") diff --git a/internal/process/manager_readiness.go b/internal/process/manager_readiness.go index 0ea427a..301f8c5 100644 --- a/internal/process/manager_readiness.go +++ b/internal/process/manager_readiness.go @@ -77,19 +77,26 @@ func (m *Manager) updateReadinessStates() { defer m.mu.RUnlock() for name, sup := range m.processes { - // Use the supervisor's overall state (includes health check status) supState := sup.GetState() instances := sup.GetInstances() + healthStatus, lastCheckSucceeded, hasHealthCheck := sup.HealthSnapshot() var processState readiness.ProcessState var health string switch supState { case StateRunning: - // Process is running - consider it healthy for readiness purposes - // (health checks are evaluated separately by the readiness manager modes) - processState = readiness.StateHealthy - health = "healthy" + if hasHealthCheck { + health = healthStatus + if lastCheckSucceeded { + processState = readiness.StateHealthy + } else { + processState = readiness.StateRunning + } + } else { + processState = readiness.StateHealthy + health = "healthy" + } case StateStopped: processState = readiness.StateStopped health = "unknown" diff --git a/internal/process/supervisor.go b/internal/process/supervisor.go index 7892bf0..0d0186d 100644 --- a/internal/process/supervisor.go +++ b/internal/process/supervisor.go @@ -114,6 +114,9 @@ type Supervisor struct { readinessCh chan struct{} // Closed when service becomes ready readinessOnce sync.Once // CRITICAL: Ensures readinessCh closed exactly once isReady bool // Track readiness state + healthKnown bool // Whether at least one health check result has been observed + healthHealthy bool // Liveness health after thresholds/hysteresis + lastCheckSucceeded bool // Raw result from the most recent health check goroutines sync.WaitGroup // CRITICAL: Track all goroutines for clean shutdown mu sync.RWMutex operationMu sync.Mutex // Serializes lifecycle/scale operations so reads can proceed @@ -1189,8 +1192,16 @@ func (s *Supervisor) handleHealthStatus(ctx context.Context) { return } - if status.Healthy { - // Signal readiness on first successful health check + s.mu.Lock() + s.healthKnown = true + s.healthHealthy = status.Healthy + s.lastCheckSucceeded = status.LastCheckSucceeded + s.mu.Unlock() + + if status.LastCheckSucceeded { + // Signal readiness on first successful health check. + // Liveness can stay optimistic across transient failures, but + // readiness must only pass after a real probe success. s.markReady("health check passed") } else if !status.Healthy { s.logger.Error("Process unhealthy, triggering restart", @@ -1255,6 +1266,25 @@ func (s *Supervisor) GetState() ProcessState { return s.state } +// HealthSnapshot returns the latest health check state for readiness reporting. +// Health is "healthy", "unhealthy", or "unknown". For processes without a +// health check, running is treated as healthy by callers that need a value. +func (s *Supervisor) HealthSnapshot() (health string, lastCheckSucceeded bool, hasHealthCheck bool) { + s.mu.RLock() + defer s.mu.RUnlock() + + if s.config.HealthCheck == nil { + return "healthy", true, false + } + if !s.healthKnown { + return "unknown", false, true + } + if s.lastCheckSucceeded { + return "healthy", true, true + } + return "unhealthy", false, true +} + // InstanceInfo represents exported instance information type InstanceInfo struct { ID string diff --git a/internal/process/supervisor_test.go b/internal/process/supervisor_test.go index f47f06c..ec62293 100644 --- a/internal/process/supervisor_test.go +++ b/internal/process/supervisor_test.go @@ -123,6 +123,51 @@ func TestSupervisor_WaitForReadiness(t *testing.T) { } } +func TestSupervisor_ReadinessRequiresSuccessfulProbe(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError})) + auditLogger := audit.NewLogger(logger, false) + + cfg := &config.Process{ + Enabled: true, + Command: []string{"sleep", "60"}, + Restart: "never", + Scale: 1, + HealthCheck: &config.HealthCheck{ + Type: "tcp", + Address: "127.0.0.1:65535", + Mode: "readiness", + }, + } + + globalCfg := &config.GlobalConfig{ + LogLevel: "error", + MaxRestartAttempts: 3, + RestartBackoff: 5, + } + + sup := NewSupervisor("test-proc", cfg, globalCfg, logger, auditLogger, nil) + statusCh := make(chan HealthStatus, 2) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sup.healthStatus = statusCh + + go sup.handleHealthStatus(ctx) + + statusCh <- HealthStatus{Healthy: true, LastCheckSucceeded: false} + + waitCtx, waitCancel := context.WithTimeout(context.Background(), time.Second) + defer waitCancel() + if err := sup.WaitForReadiness(waitCtx, 50*time.Millisecond); err == nil { + t.Fatal("Expected readiness wait to time out after failed probe") + } + + statusCh <- HealthStatus{Healthy: true, LastCheckSucceeded: true} + + if err := sup.WaitForReadiness(waitCtx, time.Second); err != nil { + t.Fatalf("Expected successful probe to mark ready: %v", err) + } +} + func TestSupervisor_StreamEnabled(t *testing.T) { logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError})) auditLogger := audit.NewLogger(logger, false)