Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions internal/process/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type Manager struct {

// Configurable timeouts and limits (initialized from global config or defaults)
dependencyTimeout time.Duration
processStartTimeout time.Duration
processStopTimeout time.Duration
maxProcessScale int
}
Expand Down Expand Up @@ -122,6 +123,11 @@ func NewManager(cfg *config.Config, logger *slog.Logger, auditLogger *audit.Logg
dependencyTimeout = cfg.Global.DependencyTimeout
}

processStartTimeout := DefaultProcessStartTimeout
if cfg.Global.ProcessStartTimeout > 0 {
processStartTimeout = cfg.Global.ProcessStartTimeout
}

processStopTimeout := DefaultProcessStopTimeout
if cfg.Global.ProcessStopTimeout > 0 {
processStopTimeout = cfg.Global.ProcessStopTimeout
Expand All @@ -148,6 +154,7 @@ func NewManager(cfg *config.Config, logger *slog.Logger, auditLogger *audit.Logg
startTime: startTime,
logBroadcaster: logpkg.NewLogBroadcaster(),
dependencyTimeout: dependencyTimeout,
processStartTimeout: processStartTimeout,
processStopTimeout: processStopTimeout,
maxProcessScale: maxProcessScale,
}
Expand Down
16 changes: 8 additions & 8 deletions internal/process/manager_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (m *Manager) AddProcess(ctx context.Context, name string, procCfg *config.P
supervisor.SetOneshotHistory(m.oneshotHistory)
supervisor.SetLogBroadcaster(m.logBroadcaster)
// Use background context for supervisor lifetime (independent of API request)
if err := supervisor.Start(context.Background()); err != nil {
if err := m.startSupervisor(ctx, supervisor); err != nil {
// Remove from config on failure
delete(m.config.Processes, name)
return fmt.Errorf("failed to start process: %w", err)
Expand Down Expand Up @@ -151,7 +151,7 @@ func (m *Manager) updateProcessLocked(ctx context.Context, name string, procCfg
newSupervisor.SetOneshotHistory(m.oneshotHistory)
newSupervisor.SetLogBroadcaster(m.logBroadcaster)
// Use background context for supervisor lifetime (independent of API request)
if err := newSupervisor.Start(context.Background()); err != nil {
if err := m.startSupervisor(ctx, newSupervisor); err != nil {
// Rollback config change on error
m.config.Processes[name] = oldCfg
return fmt.Errorf("failed to start process with new config: %w", err)
Expand All @@ -172,7 +172,7 @@ func (m *Manager) updateProcessLocked(ctx context.Context, name string, procCfg
supervisor.SetOneshotHistory(m.oneshotHistory)
supervisor.SetLogBroadcaster(m.logBroadcaster)
// Use background context for supervisor lifetime (independent of API request)
if err := supervisor.Start(context.Background()); err != nil {
if err := m.startSupervisor(ctx, supervisor); err != nil {
// Rollback config change on error
m.config.Processes[name] = oldCfg
return fmt.Errorf("failed to start process: %w", err)
Expand Down Expand Up @@ -291,7 +291,7 @@ func (m *Manager) ReloadConfig(ctx context.Context) error {
m.config = newCfg

// Start new processes
m.startNewProcesses(newCfg, toStart)
m.startNewProcesses(ctx, newCfg, toStart)

// Update changed processes
m.updateChangedProcesses(ctx, newCfg, toUpdate)
Expand All @@ -318,7 +318,7 @@ func (m *Manager) stopRemovedProcesses(ctx context.Context, names []string) {
}

// startNewProcesses starts newly added processes
func (m *Manager) startNewProcesses(cfg *config.Config, names []string) {
func (m *Manager) startNewProcesses(ctx context.Context, cfg *config.Config, names []string) {
for _, name := range names {
procCfg := cfg.Processes[name]
if procCfg.Enabled {
Expand All @@ -327,7 +327,7 @@ func (m *Manager) startNewProcesses(cfg *config.Config, names []string) {
supervisor.SetOneshotHistory(m.oneshotHistory)
supervisor.SetLogBroadcaster(m.logBroadcaster)
// Use background context for supervisor lifetime (independent of reload request)
if err := supervisor.Start(context.Background()); err != nil {
if err := m.startSupervisor(ctx, supervisor); err != nil {
m.logger.Error("Failed to start new process during reload", "name", name, "error", err)
continue
}
Expand All @@ -354,7 +354,7 @@ func (m *Manager) updateChangedProcesses(ctx context.Context, cfg *config.Config
newSupervisor.SetOneshotHistory(m.oneshotHistory)
newSupervisor.SetLogBroadcaster(m.logBroadcaster)
// Use background context for supervisor lifetime (independent of reload request)
if err := newSupervisor.Start(context.Background()); err != nil {
if err := m.startSupervisor(ctx, newSupervisor); err != nil {
m.logger.Error("Failed to start updated process", "name", name, "error", err)
continue
}
Expand All @@ -368,7 +368,7 @@ func (m *Manager) updateChangedProcesses(ctx context.Context, cfg *config.Config
supervisor.SetOneshotHistory(m.oneshotHistory)
supervisor.SetLogBroadcaster(m.logBroadcaster)
// Use background context for supervisor lifetime (independent of reload request)
if err := supervisor.Start(context.Background()); err != nil {
if err := m.startSupervisor(ctx, supervisor); err != nil {
m.logger.Error("Failed to start process during reload", "name", name, "error", err)
continue
}
Expand Down
6 changes: 3 additions & 3 deletions internal/process/manager_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (m *Manager) startRegularProcess(ctx context.Context, name string, procCfg
attribute.String("process_name", name),
attribute.Int("scale", procCfg.Scale))

if err := sup.Start(processCtx); err != nil {
if err := m.startSupervisor(processCtx, sup); err != nil {
tracing.RecordError(processSpan, err, "Failed to start process")
processSpan.End()
return fmt.Errorf("failed to start process %s: %w", name, err)
Expand Down Expand Up @@ -412,7 +412,7 @@ func (m *Manager) StartProcess(ctx context.Context, name string) error {

// Use background context for supervisor lifetime (not the request context)
// The supervisor should live independently of the API request that started it
if err := sup.Start(context.Background()); err != nil {
if err := m.startSupervisor(ctx, sup); err != nil {
m.logger.Error("Failed to start process",
"name", name,
"error", err,
Expand Down Expand Up @@ -512,7 +512,7 @@ func (m *Manager) RestartProcess(ctx context.Context, name string) error {
stopCancel()

// Start process with fresh background context so it isn't tied to API request lifetime
if err := sup.Start(context.Background()); err != nil {
if err := m.startSupervisor(ctx, sup); err != nil {
m.logger.Error("Failed to start process after restart",
"name", name,
"error", err,
Expand Down
2 changes: 1 addition & 1 deletion internal/process/manager_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (m *Manager) scaleFromZero(ctx context.Context, name string, sup *Superviso
if cfg := m.config.Processes[name]; cfg != nil {
cfg.Scale = desiredScale
}
if err := sup.Start(context.Background()); err != nil {
if err := m.startSupervisor(ctx, sup); err != nil {
return fmt.Errorf("failed to start process %s for scale %d: %w", name, desiredScale, err)
}
metrics.SetDesiredScale(name, desiredScale)
Expand Down
47 changes: 47 additions & 0 deletions internal/process/manager_start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package process

import (
"context"
"fmt"
"time"
)

// startSupervisor starts a supervisor with a bounded startup wait while keeping
// the child process lifetime independent from the timeout context.
func (m *Manager) startSupervisor(ctx context.Context, sup *Supervisor) error {
if ctx == nil {
ctx = context.Background()
}

timeout := m.processStartTimeout
if timeout <= 0 {
timeout = DefaultProcessStartTimeout
}

errCh := make(chan error, 1)
go func() {
errCh <- sup.Start(context.Background())
}()

timer := time.NewTimer(timeout)
defer timer.Stop()

select {
case err := <-errCh:
return err
case <-ctx.Done():
m.stopAfterFailedStart(sup)
return fmt.Errorf("process start cancelled: %w", ctx.Err())
case <-timer.C:
m.stopAfterFailedStart(sup)
return fmt.Errorf("process start timed out after %v", timeout)
}
}

func (m *Manager) stopAfterFailedStart(sup *Supervisor) {
stopCtx, cancel := context.WithTimeout(context.Background(), m.processStopTimeout)
defer cancel()
if err := sup.Stop(stopCtx); err != nil {
m.logger.Warn("Failed to stop supervisor after unsuccessful start", "error", err)
}
}
17 changes: 17 additions & 0 deletions internal/process/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,23 @@ import (
"github.com/cboxdk/init/internal/testutil"
)

func TestNewManager_UsesConfiguredProcessStartTimeout(t *testing.T) {
cfg := &config.Config{
Global: config.GlobalConfig{
ProcessStartTimeout: 2 * time.Second,
},
Processes: map[string]*config.Process{},
}

logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError}))
auditLogger := audit.NewLogger(logger, false)
manager := NewManager(cfg, logger, auditLogger)

if manager.processStartTimeout != 2*time.Second {
t.Fatalf("processStartTimeout = %v, want 2s", manager.processStartTimeout)
}
}

// TestManager_GracefulShutdown tests graceful shutdown with timeout
func TestManager_GracefulShutdown(t *testing.T) {
cfg := &config.Config{
Expand Down
39 changes: 26 additions & 13 deletions internal/process/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,9 +926,8 @@ func (s *Supervisor) stopInstance(ctx context.Context, instance *Instance) error
"timeout", timeout,
)

// Force kill
if err := instance.cmd.Process.Kill(); err != nil {
return fmt.Errorf("failed to kill process: %w", err)
if err := s.signalProcessGroup(instance, syscall.SIGKILL, "force kill"); err != nil {
return fmt.Errorf("failed to force kill process group: %w", err)
}

// Wait for monitorInstance to detect the exit and close doneCh
Expand Down Expand Up @@ -967,12 +966,21 @@ func (s *Supervisor) sendShutdownSignal(instance *Instance) error {
sig = parseSignal(s.config.Shutdown.Signal)
}

// Since we use Setpgid, we need to send signal to the process group
return s.signalProcessGroup(instance, sig, "shutdown")
}

// signalProcessGroup sends a signal to the instance's process group, falling
// back to the parent process if the process group cannot be addressed.
func (s *Supervisor) signalProcessGroup(instance *Instance, sig syscall.Signal, reason string) error {
if instance == nil || instance.cmd == nil || instance.cmd.Process == nil {
return fmt.Errorf("process is not available for signal")
}

pgid, err := syscall.Getpgid(instance.pid)
if err != nil {
// Fallback to single process signal if we can't get pgid
s.logger.Warn("Failed to get process group, sending signal to process only",
"instance_id", instance.id,
"reason", reason,
"error", err,
)
if err := instance.cmd.Process.Signal(sig); err != nil {
Expand All @@ -981,12 +989,11 @@ func (s *Supervisor) sendShutdownSignal(instance *Instance) error {
return nil
}

// Send signal to entire process group (negative PID)
if err := syscall.Kill(-pgid, sig); err != nil {
// Fallback to single process signal if process group signal fails
s.logger.Warn("Failed to send signal to process group, falling back to direct signal",
"instance_id", instance.id,
"pgid", pgid,
"reason", reason,
"error", err,
)
if err := instance.cmd.Process.Signal(sig); err != nil {
Expand Down Expand Up @@ -1215,9 +1222,11 @@ func (s *Supervisor) handleHealthStatus(ctx context.Context) {
"pid", instance.pid,
)

// Kill the unhealthy instance
if instance.cmd.Process != nil {
_ = instance.cmd.Process.Kill()
if err := s.signalProcessGroup(instance, syscall.SIGKILL, "health check restart"); err != nil {
s.logger.Warn("Failed to kill unhealthy process group",
"instance_id", instance.id,
"error", err,
)
}

// The monitorInstance goroutine will handle the restart
Expand Down Expand Up @@ -1396,12 +1405,16 @@ func (s *Supervisor) collectInstanceMetrics() {
"memory_mb", memoryMB,
"max_memory_mb", maxMemoryMB,
)
// Kill the process - monitorInstance will handle restart based on policy

inst.mu.Lock()
if inst.state == StateRunning && inst.cmd != nil && inst.cmd.Process != nil {
// Record memory restart metric
metrics.RecordProcessRestart(s.name, "memory_limit")
_ = inst.cmd.Process.Kill()
if err := s.signalProcessGroup(inst, syscall.SIGKILL, "memory limit restart"); err != nil {
s.logger.Warn("Failed to kill memory-limited process group",
"instance_id", instanceID,
"error", err,
)
}
}
inst.mu.Unlock()
}
Expand Down
Loading