diff --git a/internal/process/manager.go b/internal/process/manager.go index 1970b7c..f7863c7 100644 --- a/internal/process/manager.go +++ b/internal/process/manager.go @@ -66,6 +66,7 @@ type Manager struct { // Configurable timeouts and limits (initialized from global config or defaults) dependencyTimeout time.Duration + processStartTimeout time.Duration processStopTimeout time.Duration maxProcessScale int } @@ -122,6 +123,11 @@ func NewManager(cfg *config.Config, logger *slog.Logger, auditLogger *audit.Logg dependencyTimeout = cfg.Global.DependencyTimeout } + processStartTimeout := DefaultProcessStartTimeout + if cfg.Global.ProcessStartTimeout > 0 { + processStartTimeout = cfg.Global.ProcessStartTimeout + } + processStopTimeout := DefaultProcessStopTimeout if cfg.Global.ProcessStopTimeout > 0 { processStopTimeout = cfg.Global.ProcessStopTimeout @@ -148,6 +154,7 @@ func NewManager(cfg *config.Config, logger *slog.Logger, auditLogger *audit.Logg startTime: startTime, logBroadcaster: logpkg.NewLogBroadcaster(), dependencyTimeout: dependencyTimeout, + processStartTimeout: processStartTimeout, processStopTimeout: processStopTimeout, maxProcessScale: maxProcessScale, } diff --git a/internal/process/manager_config.go b/internal/process/manager_config.go index 4e18eff..6c185ee 100644 --- a/internal/process/manager_config.go +++ b/internal/process/manager_config.go @@ -49,7 +49,7 @@ func (m *Manager) AddProcess(ctx context.Context, name string, procCfg *config.P supervisor.SetOneshotHistory(m.oneshotHistory) supervisor.SetLogBroadcaster(m.logBroadcaster) // Use background context for supervisor lifetime (independent of API request) - if err := supervisor.Start(context.Background()); err != nil { + if err := m.startSupervisor(ctx, supervisor); err != nil { // Remove from config on failure delete(m.config.Processes, name) return fmt.Errorf("failed to start process: %w", err) @@ -151,7 +151,7 @@ func (m *Manager) updateProcessLocked(ctx context.Context, name string, procCfg newSupervisor.SetOneshotHistory(m.oneshotHistory) newSupervisor.SetLogBroadcaster(m.logBroadcaster) // Use background context for supervisor lifetime (independent of API request) - if err := newSupervisor.Start(context.Background()); err != nil { + if err := m.startSupervisor(ctx, newSupervisor); err != nil { // Rollback config change on error m.config.Processes[name] = oldCfg return fmt.Errorf("failed to start process with new config: %w", err) @@ -172,7 +172,7 @@ func (m *Manager) updateProcessLocked(ctx context.Context, name string, procCfg supervisor.SetOneshotHistory(m.oneshotHistory) supervisor.SetLogBroadcaster(m.logBroadcaster) // Use background context for supervisor lifetime (independent of API request) - if err := supervisor.Start(context.Background()); err != nil { + if err := m.startSupervisor(ctx, supervisor); err != nil { // Rollback config change on error m.config.Processes[name] = oldCfg return fmt.Errorf("failed to start process: %w", err) @@ -291,7 +291,7 @@ func (m *Manager) ReloadConfig(ctx context.Context) error { m.config = newCfg // Start new processes - m.startNewProcesses(newCfg, toStart) + m.startNewProcesses(ctx, newCfg, toStart) // Update changed processes m.updateChangedProcesses(ctx, newCfg, toUpdate) @@ -318,7 +318,7 @@ func (m *Manager) stopRemovedProcesses(ctx context.Context, names []string) { } // startNewProcesses starts newly added processes -func (m *Manager) startNewProcesses(cfg *config.Config, names []string) { +func (m *Manager) startNewProcesses(ctx context.Context, cfg *config.Config, names []string) { for _, name := range names { procCfg := cfg.Processes[name] if procCfg.Enabled { @@ -327,7 +327,7 @@ func (m *Manager) startNewProcesses(cfg *config.Config, names []string) { supervisor.SetOneshotHistory(m.oneshotHistory) supervisor.SetLogBroadcaster(m.logBroadcaster) // Use background context for supervisor lifetime (independent of reload request) - if err := supervisor.Start(context.Background()); err != nil { + if err := m.startSupervisor(ctx, supervisor); err != nil { m.logger.Error("Failed to start new process during reload", "name", name, "error", err) continue } @@ -354,7 +354,7 @@ func (m *Manager) updateChangedProcesses(ctx context.Context, cfg *config.Config newSupervisor.SetOneshotHistory(m.oneshotHistory) newSupervisor.SetLogBroadcaster(m.logBroadcaster) // Use background context for supervisor lifetime (independent of reload request) - if err := newSupervisor.Start(context.Background()); err != nil { + if err := m.startSupervisor(ctx, newSupervisor); err != nil { m.logger.Error("Failed to start updated process", "name", name, "error", err) continue } @@ -368,7 +368,7 @@ func (m *Manager) updateChangedProcesses(ctx context.Context, cfg *config.Config supervisor.SetOneshotHistory(m.oneshotHistory) supervisor.SetLogBroadcaster(m.logBroadcaster) // Use background context for supervisor lifetime (independent of reload request) - if err := supervisor.Start(context.Background()); err != nil { + if err := m.startSupervisor(ctx, supervisor); err != nil { m.logger.Error("Failed to start process during reload", "name", name, "error", err) continue } diff --git a/internal/process/manager_lifecycle.go b/internal/process/manager_lifecycle.go index 42b78af..7a82a53 100644 --- a/internal/process/manager_lifecycle.go +++ b/internal/process/manager_lifecycle.go @@ -225,7 +225,7 @@ func (m *Manager) startRegularProcess(ctx context.Context, name string, procCfg attribute.String("process_name", name), attribute.Int("scale", procCfg.Scale)) - if err := sup.Start(processCtx); err != nil { + if err := m.startSupervisor(processCtx, sup); err != nil { tracing.RecordError(processSpan, err, "Failed to start process") processSpan.End() return fmt.Errorf("failed to start process %s: %w", name, err) @@ -412,7 +412,7 @@ func (m *Manager) StartProcess(ctx context.Context, name string) error { // Use background context for supervisor lifetime (not the request context) // The supervisor should live independently of the API request that started it - if err := sup.Start(context.Background()); err != nil { + if err := m.startSupervisor(ctx, sup); err != nil { m.logger.Error("Failed to start process", "name", name, "error", err, @@ -512,7 +512,7 @@ func (m *Manager) RestartProcess(ctx context.Context, name string) error { stopCancel() // Start process with fresh background context so it isn't tied to API request lifetime - if err := sup.Start(context.Background()); err != nil { + if err := m.startSupervisor(ctx, sup); err != nil { m.logger.Error("Failed to start process after restart", "name", name, "error", err, diff --git a/internal/process/manager_scaling.go b/internal/process/manager_scaling.go index f05d40d..e90801b 100644 --- a/internal/process/manager_scaling.go +++ b/internal/process/manager_scaling.go @@ -122,7 +122,7 @@ func (m *Manager) scaleFromZero(ctx context.Context, name string, sup *Superviso if cfg := m.config.Processes[name]; cfg != nil { cfg.Scale = desiredScale } - if err := sup.Start(context.Background()); err != nil { + if err := m.startSupervisor(ctx, sup); err != nil { return fmt.Errorf("failed to start process %s for scale %d: %w", name, desiredScale, err) } metrics.SetDesiredScale(name, desiredScale) diff --git a/internal/process/manager_start.go b/internal/process/manager_start.go new file mode 100644 index 0000000..b5144dc --- /dev/null +++ b/internal/process/manager_start.go @@ -0,0 +1,47 @@ +package process + +import ( + "context" + "fmt" + "time" +) + +// startSupervisor starts a supervisor with a bounded startup wait while keeping +// the child process lifetime independent from the timeout context. +func (m *Manager) startSupervisor(ctx context.Context, sup *Supervisor) error { + if ctx == nil { + ctx = context.Background() + } + + timeout := m.processStartTimeout + if timeout <= 0 { + timeout = DefaultProcessStartTimeout + } + + errCh := make(chan error, 1) + go func() { + errCh <- sup.Start(context.Background()) + }() + + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case err := <-errCh: + return err + case <-ctx.Done(): + m.stopAfterFailedStart(sup) + return fmt.Errorf("process start cancelled: %w", ctx.Err()) + case <-timer.C: + m.stopAfterFailedStart(sup) + return fmt.Errorf("process start timed out after %v", timeout) + } +} + +func (m *Manager) stopAfterFailedStart(sup *Supervisor) { + stopCtx, cancel := context.WithTimeout(context.Background(), m.processStopTimeout) + defer cancel() + if err := sup.Stop(stopCtx); err != nil { + m.logger.Warn("Failed to stop supervisor after unsuccessful start", "error", err) + } +} diff --git a/internal/process/manager_test.go b/internal/process/manager_test.go index 2bd0885..adba433 100644 --- a/internal/process/manager_test.go +++ b/internal/process/manager_test.go @@ -12,6 +12,23 @@ import ( "github.com/cboxdk/init/internal/testutil" ) +func TestNewManager_UsesConfiguredProcessStartTimeout(t *testing.T) { + cfg := &config.Config{ + Global: config.GlobalConfig{ + ProcessStartTimeout: 2 * time.Second, + }, + Processes: map[string]*config.Process{}, + } + + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError})) + auditLogger := audit.NewLogger(logger, false) + manager := NewManager(cfg, logger, auditLogger) + + if manager.processStartTimeout != 2*time.Second { + t.Fatalf("processStartTimeout = %v, want 2s", manager.processStartTimeout) + } +} + // TestManager_GracefulShutdown tests graceful shutdown with timeout func TestManager_GracefulShutdown(t *testing.T) { cfg := &config.Config{ diff --git a/internal/process/supervisor.go b/internal/process/supervisor.go index 7892bf0..3691e8e 100644 --- a/internal/process/supervisor.go +++ b/internal/process/supervisor.go @@ -926,9 +926,8 @@ func (s *Supervisor) stopInstance(ctx context.Context, instance *Instance) error "timeout", timeout, ) - // Force kill - if err := instance.cmd.Process.Kill(); err != nil { - return fmt.Errorf("failed to kill process: %w", err) + if err := s.signalProcessGroup(instance, syscall.SIGKILL, "force kill"); err != nil { + return fmt.Errorf("failed to force kill process group: %w", err) } // Wait for monitorInstance to detect the exit and close doneCh @@ -967,12 +966,21 @@ func (s *Supervisor) sendShutdownSignal(instance *Instance) error { sig = parseSignal(s.config.Shutdown.Signal) } - // Since we use Setpgid, we need to send signal to the process group + return s.signalProcessGroup(instance, sig, "shutdown") +} + +// signalProcessGroup sends a signal to the instance's process group, falling +// back to the parent process if the process group cannot be addressed. +func (s *Supervisor) signalProcessGroup(instance *Instance, sig syscall.Signal, reason string) error { + if instance == nil || instance.cmd == nil || instance.cmd.Process == nil { + return fmt.Errorf("process is not available for signal") + } + pgid, err := syscall.Getpgid(instance.pid) if err != nil { - // Fallback to single process signal if we can't get pgid s.logger.Warn("Failed to get process group, sending signal to process only", "instance_id", instance.id, + "reason", reason, "error", err, ) if err := instance.cmd.Process.Signal(sig); err != nil { @@ -981,12 +989,11 @@ func (s *Supervisor) sendShutdownSignal(instance *Instance) error { return nil } - // Send signal to entire process group (negative PID) if err := syscall.Kill(-pgid, sig); err != nil { - // Fallback to single process signal if process group signal fails s.logger.Warn("Failed to send signal to process group, falling back to direct signal", "instance_id", instance.id, "pgid", pgid, + "reason", reason, "error", err, ) if err := instance.cmd.Process.Signal(sig); err != nil { @@ -1215,9 +1222,11 @@ func (s *Supervisor) handleHealthStatus(ctx context.Context) { "pid", instance.pid, ) - // Kill the unhealthy instance - if instance.cmd.Process != nil { - _ = instance.cmd.Process.Kill() + if err := s.signalProcessGroup(instance, syscall.SIGKILL, "health check restart"); err != nil { + s.logger.Warn("Failed to kill unhealthy process group", + "instance_id", instance.id, + "error", err, + ) } // The monitorInstance goroutine will handle the restart @@ -1396,12 +1405,16 @@ func (s *Supervisor) collectInstanceMetrics() { "memory_mb", memoryMB, "max_memory_mb", maxMemoryMB, ) - // Kill the process - monitorInstance will handle restart based on policy + inst.mu.Lock() if inst.state == StateRunning && inst.cmd != nil && inst.cmd.Process != nil { - // Record memory restart metric metrics.RecordProcessRestart(s.name, "memory_limit") - _ = inst.cmd.Process.Kill() + if err := s.signalProcessGroup(inst, syscall.SIGKILL, "memory limit restart"); err != nil { + s.logger.Warn("Failed to kill memory-limited process group", + "instance_id", instanceID, + "error", err, + ) + } } inst.mu.Unlock() }