Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/cbox-init/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func runServe(cmd *cobra.Command, args []string) {

// Monitor process health
pm.MonitorProcessHealth(ctx)
pm.StartReadinessMonitor(ctx)

// Always start Unix socket for local management (TUI, CLI commands)
socketPath := resolveSocketPath(cfg.Global.APISocket)
Expand Down
3 changes: 3 additions & 0 deletions internal/process/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ func (hm *HealthMonitor) Start(ctx context.Context) <-chan HealthStatus {
}
}

status := hm.performCheck(ctx)
statusCh <- status

ticker := time.NewTicker(time.Duration(hm.config.Period) * time.Second)
defer ticker.Stop()

Expand Down
30 changes: 30 additions & 0 deletions internal/process/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,36 @@ func TestExecHealthChecker(t *testing.T) {
func TestHealthMonitor(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))

t.Run("first check runs immediately after initial delay", func(t *testing.T) {
cfg := &config.HealthCheck{
Type: "exec",
Command: []string{"echo", "healthy"},
InitialDelay: 0,
Period: 30,
Timeout: 1,
FailureThreshold: 1,
}

monitor, err := NewHealthMonitor("test-process", cfg, logger)
if err != nil {
t.Fatalf("NewHealthMonitor() unexpected error: %v", err)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

statusCh := monitor.Start(ctx)

select {
case status := <-statusCh:
if !status.LastCheckSucceeded {
t.Fatalf("Expected first health check to succeed, got error: %v", status.Error)
}
case <-time.After(500 * time.Millisecond):
t.Fatal("Expected first health check before first period elapsed")
}
})

t.Run("successful monitoring", func(t *testing.T) {
cfg := &config.HealthCheck{
Type: "tcp",
Expand Down
4 changes: 4 additions & 0 deletions internal/process/manager_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,10 @@ func (m *Manager) Shutdown(ctx context.Context) error {
close(m.shutdownCh)
})

if err := m.StopReadinessManager(); err != nil {
m.logger.Warn("Failed to stop readiness manager", "error", err)
}

// Stop the scheduler first (prevents new job executions)
if m.scheduler.IsStarted() {
m.logger.Info("Stopping scheduler")
Expand Down
17 changes: 12 additions & 5 deletions internal/process/manager_readiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,26 @@ func (m *Manager) updateReadinessStates() {
defer m.mu.RUnlock()

for name, sup := range m.processes {
// Use the supervisor's overall state (includes health check status)
supState := sup.GetState()
instances := sup.GetInstances()
healthStatus, lastCheckSucceeded, hasHealthCheck := sup.HealthSnapshot()

var processState readiness.ProcessState
var health string

switch supState {
case StateRunning:
// Process is running - consider it healthy for readiness purposes
// (health checks are evaluated separately by the readiness manager modes)
processState = readiness.StateHealthy
health = "healthy"
if hasHealthCheck {
health = healthStatus
if lastCheckSucceeded {
processState = readiness.StateHealthy
} else {
processState = readiness.StateRunning
}
} else {
processState = readiness.StateHealthy
health = "healthy"
}
case StateStopped:
processState = readiness.StateStopped
health = "unknown"
Expand Down
34 changes: 32 additions & 2 deletions internal/process/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ type Supervisor struct {
readinessCh chan struct{} // Closed when service becomes ready
readinessOnce sync.Once // CRITICAL: Ensures readinessCh closed exactly once
isReady bool // Track readiness state
healthKnown bool // Whether at least one health check result has been observed
healthHealthy bool // Liveness health after thresholds/hysteresis
lastCheckSucceeded bool // Raw result from the most recent health check
goroutines sync.WaitGroup // CRITICAL: Track all goroutines for clean shutdown
mu sync.RWMutex
operationMu sync.Mutex // Serializes lifecycle/scale operations so reads can proceed
Expand Down Expand Up @@ -1189,8 +1192,16 @@ func (s *Supervisor) handleHealthStatus(ctx context.Context) {
return
}

if status.Healthy {
// Signal readiness on first successful health check
s.mu.Lock()
s.healthKnown = true
s.healthHealthy = status.Healthy
s.lastCheckSucceeded = status.LastCheckSucceeded
s.mu.Unlock()

if status.LastCheckSucceeded {
// Signal readiness on first successful health check.
// Liveness can stay optimistic across transient failures, but
// readiness must only pass after a real probe success.
s.markReady("health check passed")
} else if !status.Healthy {
s.logger.Error("Process unhealthy, triggering restart",
Expand Down Expand Up @@ -1255,6 +1266,25 @@ func (s *Supervisor) GetState() ProcessState {
return s.state
}

// HealthSnapshot returns the latest health check state for readiness reporting.
// Health is "healthy", "unhealthy", or "unknown". For processes without a
// health check, running is treated as healthy by callers that need a value.
func (s *Supervisor) HealthSnapshot() (health string, lastCheckSucceeded bool, hasHealthCheck bool) {
s.mu.RLock()
defer s.mu.RUnlock()

if s.config.HealthCheck == nil {
return "healthy", true, false
}
if !s.healthKnown {
return "unknown", false, true
}
if s.lastCheckSucceeded {
return "healthy", true, true
}
return "unhealthy", false, true
}

// InstanceInfo represents exported instance information
type InstanceInfo struct {
ID string
Expand Down
45 changes: 45 additions & 0 deletions internal/process/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,51 @@ func TestSupervisor_WaitForReadiness(t *testing.T) {
}
}

func TestSupervisor_ReadinessRequiresSuccessfulProbe(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError}))
auditLogger := audit.NewLogger(logger, false)

cfg := &config.Process{
Enabled: true,
Command: []string{"sleep", "60"},
Restart: "never",
Scale: 1,
HealthCheck: &config.HealthCheck{
Type: "tcp",
Address: "127.0.0.1:65535",
Mode: "readiness",
},
}

globalCfg := &config.GlobalConfig{
LogLevel: "error",
MaxRestartAttempts: 3,
RestartBackoff: 5,
}

sup := NewSupervisor("test-proc", cfg, globalCfg, logger, auditLogger, nil)
statusCh := make(chan HealthStatus, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sup.healthStatus = statusCh

go sup.handleHealthStatus(ctx)

statusCh <- HealthStatus{Healthy: true, LastCheckSucceeded: false}

waitCtx, waitCancel := context.WithTimeout(context.Background(), time.Second)
defer waitCancel()
if err := sup.WaitForReadiness(waitCtx, 50*time.Millisecond); err == nil {
t.Fatal("Expected readiness wait to time out after failed probe")
}

statusCh <- HealthStatus{Healthy: true, LastCheckSucceeded: true}

if err := sup.WaitForReadiness(waitCtx, time.Second); err != nil {
t.Fatalf("Expected successful probe to mark ready: %v", err)
}
}

func TestSupervisor_StreamEnabled(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError}))
auditLogger := audit.NewLogger(logger, false)
Expand Down
Loading