Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 61 additions & 57 deletions internal/process/manager_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,17 +284,18 @@ func (m *Manager) ReloadConfig(ctx context.Context) error {
"to_update", toUpdate,
)

// Stop removed processes
m.stopRemovedProcesses(ctx, toStop)
stopSet := namesToSet(append(toStop, toUpdate...))
startSet := namesToSet(append(toStart, toUpdate...))

// Stop removed and changed processes in old shutdown order.
m.stopReloadProcesses(ctx, stopSet)

// Update config
m.config = newCfg

// Start new processes
m.startNewProcesses(newCfg, toStart)

// Update changed processes
m.updateChangedProcesses(ctx, newCfg, toUpdate)
if err := m.startReloadProcesses(ctx, newCfg, startSet); err != nil {
return err
}

m.logger.Info("Configuration reloaded successfully")

Expand All @@ -304,77 +305,80 @@ func (m *Manager) ReloadConfig(ctx context.Context) error {
return nil
}

// stopRemovedProcesses stops processes that were removed from config
func (m *Manager) stopRemovedProcesses(ctx context.Context, names []string) {
func namesToSet(names []string) map[string]bool {
set := make(map[string]bool, len(names))
for _, name := range names {
set[name] = true
}
return set
}

// stopReloadProcesses stops removed and changed processes in reverse dependency order.
func (m *Manager) stopReloadProcesses(ctx context.Context, names map[string]bool) {
for _, name := range m.getShutdownOrder() {
if !names[name] {
continue
}
m.unregisterScheduledProcess(name)
if supervisor, running := m.processes[name]; running {
m.logger.Info("Stopping removed process", "name", name)
m.logger.Info("Stopping process during reload", "name", name)
if err := supervisor.Stop(ctx); err != nil {
m.logger.Error("Failed to stop process during reload", "name", name, "error", err)
continue
}
delete(m.processes, name)
}
}
}

// startNewProcesses starts newly added processes
func (m *Manager) startNewProcesses(cfg *config.Config, names []string) {
for _, name := range names {
procCfg := cfg.Processes[name]
if procCfg.Enabled {
m.logger.Info("Starting new process", "name", name)
supervisor := NewSupervisor(name, procCfg, &cfg.Global, m.logger, m.auditLogger, m.resourceCollector)
supervisor.SetOneshotHistory(m.oneshotHistory)
supervisor.SetLogBroadcaster(m.logBroadcaster)
// Use background context for supervisor lifetime (independent of reload request)
if err := supervisor.Start(context.Background()); err != nil {
m.logger.Error("Failed to start new process during reload", "name", name, "error", err)
continue
}
m.processes[name] = supervisor
func (m *Manager) unregisterScheduledProcess(name string) {
if _, exists := m.scheduler.GetJob(name); exists {
if err := m.scheduler.RemoveJob(name); err != nil {
m.logger.Error("Failed to remove scheduled job during reload", "name", name, "error", err)
}
}
m.scheduleExecutor.UnregisterProcess(name)
}

// updateChangedProcesses restarts processes whose config changed
func (m *Manager) updateChangedProcesses(ctx context.Context, cfg *config.Config, names []string) {
for _, name := range names {
// startReloadProcesses starts new and changed processes in dependency order.
func (m *Manager) startReloadProcesses(ctx context.Context, cfg *config.Config, names map[string]bool) error {
startupOrder, err := m.getStartupOrder()
if err != nil {
return fmt.Errorf("failed to determine reload startup order: %w", err)
}

for _, name := range startupOrder {
if !names[name] {
continue
}

procCfg := cfg.Processes[name]
if procCfg == nil || !procCfg.Enabled {
delete(m.processes, name)
continue
}

if supervisor, running := m.processes[name]; running {
m.logger.Info("Restarting updated process", "name", name)
if err := m.waitForDependencies(ctx, name, procCfg.DependsOn); err != nil {
return err
}

if err := supervisor.Stop(ctx); err != nil {
m.logger.Error("Failed to stop process during update", "name", name, "error", err)
continue
if procCfg.Schedule != "" {
if err := m.registerScheduledProcess(name, procCfg); err != nil {
return err
}
continue
}

if procCfg.Enabled {
newSupervisor := NewSupervisor(name, procCfg, &cfg.Global, m.logger, m.auditLogger, m.resourceCollector)
newSupervisor.SetOneshotHistory(m.oneshotHistory)
newSupervisor.SetLogBroadcaster(m.logBroadcaster)
// Use background context for supervisor lifetime (independent of reload request)
if err := newSupervisor.Start(context.Background()); err != nil {
m.logger.Error("Failed to start updated process", "name", name, "error", err)
continue
}
m.processes[name] = newSupervisor
} else {
delete(m.processes, name)
}
} else if procCfg.Enabled {
m.logger.Info("Starting previously disabled process", "name", name)
supervisor := NewSupervisor(name, procCfg, &cfg.Global, m.logger, m.auditLogger, m.resourceCollector)
supervisor.SetOneshotHistory(m.oneshotHistory)
supervisor.SetLogBroadcaster(m.logBroadcaster)
// Use background context for supervisor lifetime (independent of reload request)
if err := supervisor.Start(context.Background()); err != nil {
m.logger.Error("Failed to start process during reload", "name", name, "error", err)
continue
}
m.processes[name] = supervisor
if err := m.startRegularProcess(ctx, name, procCfg); err != nil {
return err
}
}

if stats := m.scheduler.Stats(); stats.TotalJobs > 0 && !stats.Started {
m.scheduler.Start()
}

return nil
}

// GetConfig returns a copy of the current configuration.
Expand Down
65 changes: 65 additions & 0 deletions internal/process/manager_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,71 @@ func TestManager_ReloadConfig_AddNewProcess(t *testing.T) {
}
}

func TestManager_StartReloadProcesses_UsesDependencyOrder(t *testing.T) {
tmpDir := t.TempDir()
orderPath := filepath.Join(tmpDir, "startup-order.txt")
readyPath := filepath.Join(tmpDir, "php-fpm-ready")

cfg := &config.Config{
Global: config.GlobalConfig{
ShutdownTimeout: 10,
LogLevel: "error",
MaxRestartAttempts: 3,
RestartBackoff: 1,
},
Processes: map[string]*config.Process{
"nginx": {
Enabled: true,
InitialState: "running",
Command: []string{"sh", "-c", "echo nginx >> " + orderPath + "; sleep 300"},
Restart: "never",
Scale: 1,
DependsOn: []string{"php-fpm"},
},
"php-fpm": {
Enabled: true,
InitialState: "running",
Command: []string{"sh", "-c", "echo php-fpm >> " + orderPath + "; touch " + readyPath + "; sleep 300"},
Restart: "never",
Scale: 1,
HealthCheck: &config.HealthCheck{
Type: "exec",
Command: []string{"test", "-f", readyPath},
Period: 1,
Timeout: 1,
InitialDelay: 0,
Mode: "readiness",
},
},
},
}

logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError}))
auditLogger := audit.NewLogger(logger, false)
manager := NewManager(cfg, logger, auditLogger)

ctx := context.Background()
if err := manager.startReloadProcesses(ctx, cfg, map[string]bool{
"nginx": true,
"php-fpm": true,
}); err != nil {
t.Fatalf("startReloadProcesses() error = %v", err)
}
defer func() {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = manager.Shutdown(shutdownCtx)
}()

testutil.Eventually(t, func() bool {
data, err := os.ReadFile(orderPath)
if err != nil {
return false
}
return strings.TrimSpace(string(data)) == "php-fpm\nnginx"
}, "reload processes to start in dependency order")
}

// TestManager_ReloadConfig_RemoveProcess tests removing a process during reload
func TestManager_ReloadConfig_RemoveProcess(t *testing.T) {
tmpDir := t.TempDir()
Expand Down
Loading