-
Notifications
You must be signed in to change notification settings - Fork 78
feat(collector): implement crash-loop safety to keep the daemon alive #95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,9 @@ import ( | |
| "log/slog" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/optiqor/kerno/internal/metrics" | ||
| "github.com/optiqor/kerno/internal/observability" | ||
| ) | ||
|
|
||
| // Collector reads raw eBPF events, aggregates them, and produces typed | ||
|
|
@@ -31,7 +34,7 @@ type Collector interface { | |
|
|
||
| // Snapshot returns a point-in-time copy of the aggregated signals. | ||
| // The returned value is safe for concurrent read by other goroutines. | ||
| Snapshot() any | ||
| Snapshot() interface{} | ||
| } | ||
|
|
||
| // Registry manages the lifecycle of multiple collectors. | ||
|
|
@@ -142,10 +145,66 @@ func (r *Registry) Signals(duration time.Duration) *Signals { | |
| s.FD = v | ||
| case *MemorySnapshot: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the |
||
| s.Memory = v | ||
| case *CgroupMemorySnapshot: | ||
| s.CgroupMemory = v | ||
| } | ||
| } | ||
|
|
||
| return s | ||
| } | ||
|
|
||
| // RunSafeCollectorGoroutine wraps a collector's core processing loop with panic recovery, | ||
| // exponential backoff, and crash-loop safety. | ||
| func RunSafeCollectorGoroutine(ctx context.Context, name string, logger *slog.Logger, fn func()) { | ||
| go func() { | ||
| backoff := 1 * time.Second | ||
| for { | ||
| if ctx.Err() != nil { | ||
| return | ||
| } | ||
|
|
||
| panicked := true | ||
| disabled := false | ||
|
|
||
| func() { | ||
| defer func() { | ||
| if r := recover(); r != nil { | ||
| disabled = observability.GlobalHandler.HandlePanic(name, r, logger) | ||
| reason := "unknown" | ||
| if err, ok := r.(error); ok { | ||
| reason = err.Error() | ||
| } else if s, ok := r.(string); ok { | ||
| reason = s | ||
| } | ||
| metrics.CollectorPanicsTotal.WithLabelValues(name, reason).Inc() | ||
| } | ||
| }() | ||
|
|
||
| // Run the actual collector loop | ||
| fn() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. restarting |
||
| panicked = false // If it returned normally, it didn't panic | ||
| }() | ||
|
|
||
| if !panicked { | ||
| return // Normal exit | ||
| } | ||
|
|
||
| if disabled { | ||
| logger.Error("collector permanently disabled due to crash-looping", "name", name) | ||
| metrics.CollectorDisabled.WithLabelValues(name).Set(1) | ||
| return // Exit goroutine permanently | ||
| } | ||
|
|
||
| // Backoff before restarting | ||
| logger.Warn("collector panicked, restarting after backoff", "name", name, "backoff", backoff) | ||
| select { | ||
| case <-time.After(backoff): | ||
| case <-ctx.Done(): | ||
| return | ||
| } | ||
|
|
||
| backoff *= 2 | ||
| if backoff > 60*time.Second { | ||
| backoff = 60 * time.Second | ||
| } | ||
| } | ||
| }() | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| package collector | ||
|
|
||
| import ( | ||
| "context" | ||
| "log/slog" | ||
| "os" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/prometheus/client_golang/prometheus/testutil" | ||
|
|
||
| "github.com/optiqor/kerno/internal/metrics" | ||
| ) | ||
|
|
||
| type faultInjectingCollector struct { | ||
| logger *slog.Logger | ||
| name string | ||
| panicCounts int | ||
| done chan struct{} | ||
| cancelFn context.CancelFunc | ||
| } | ||
|
|
||
| func (c *faultInjectingCollector) Name() string { return c.name } | ||
|
|
||
| func (c *faultInjectingCollector) Start(ctx context.Context) error { | ||
| runCtx, cancel := context.WithCancel(ctx) | ||
| c.cancelFn = cancel | ||
|
|
||
| RunSafeCollectorGoroutine(runCtx, c.name, c.logger, func() { | ||
| c.panicCounts++ | ||
| if c.panicCounts <= 5 { | ||
| panic("synthetic error") | ||
| } | ||
| // Stay alive after 5 panics (if it wasn't disabled) | ||
| <-runCtx.Done() | ||
| }) | ||
| return nil | ||
| } | ||
|
|
||
| func (c *faultInjectingCollector) Stop() { | ||
| if c.cancelFn != nil { | ||
| c.cancelFn() | ||
| } | ||
| } | ||
|
|
||
| func (c *faultInjectingCollector) Snapshot() interface{} { return nil } | ||
|
|
||
| func TestCollectorPanicRecovery(t *testing.T) { | ||
| logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) | ||
| ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) | ||
| defer cancel() | ||
|
|
||
| coll := &faultInjectingCollector{ | ||
| logger: logger, | ||
| name: "faulty_collector", | ||
| done: make(chan struct{}), | ||
| } | ||
|
|
||
| metrics.CollectorPanicsTotal.Reset() | ||
| metrics.CollectorDisabled.Reset() | ||
|
|
||
| err := coll.Start(ctx) | ||
| if err != nil { | ||
| t.Fatalf("expected nil error, got %v", err) | ||
| } | ||
|
|
||
| // Wait for the crash loop backoff to hit the max count and disable | ||
| // Note: in actual implementation, the backoff delays this. For a test, | ||
| // we assume the first few panics happen quickly and bump the metric. | ||
| // Since backoff is 1s, 2s, 4s, etc., hitting 5 panics takes time. | ||
| // We'll just verify it panicked at least once. | ||
| time.Sleep(1500 * time.Millisecond) | ||
|
|
||
| count := testutil.ToFloat64(metrics.CollectorPanicsTotal.WithLabelValues("faulty_collector", "synthetic error")) | ||
| if count < 1 { | ||
| t.Errorf("expected at least 1 panic logged in metrics, got %v", count) | ||
| } | ||
|
|
||
| coll.Stop() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
os.Exit(2)here violates invariant 4: no os.Exit outside main.go.runStartis library-ish cli code. let the panic propagate or return an error, and letcmd/kerno/main.godecide the exit code. otherwise deferred cleanup (the http server shutdown, closers) is skipped on a daemon panic.