From 43fddb809c2919daaa0f729783fb8d17185bbc26 Mon Sep 17 00:00:00 2001 From: abhinavuser Date: Sat, 16 May 2026 20:09:32 +0530 Subject: [PATCH 1/6] feat(export): add slack, pagerduty, and discord webhook sinks fixes #29 Signed-off-by: abhinavuser --- internal/cli/doctor.go | 102 +++++++++++++++++--- internal/metrics/metrics.go | 19 ++++ internal/sinks/dedupe.go | 69 ++++++++++++++ internal/sinks/discord.go | 33 +++++++ internal/sinks/pagerduty.go | 127 +++++++++++++++++++++++++ internal/sinks/sink.go | 147 +++++++++++++++++++++++++++++ internal/sinks/sinks_test.go | 176 +++++++++++++++++++++++++++++++++++ internal/sinks/slack.go | 128 +++++++++++++++++++++++++ 8 files changed, 788 insertions(+), 13 deletions(-) create mode 100644 internal/sinks/dedupe.go create mode 100644 internal/sinks/discord.go create mode 100644 internal/sinks/pagerduty.go create mode 100644 internal/sinks/sink.go create mode 100644 internal/sinks/sinks_test.go create mode 100644 internal/sinks/slack.go diff --git a/internal/cli/doctor.go b/internal/cli/doctor.go index aeecc8f..0cf62cf 100644 --- a/internal/cli/doctor.go +++ b/internal/cli/doctor.go @@ -21,6 +21,7 @@ import ( "github.com/optiqor/kerno/internal/collector" "github.com/optiqor/kerno/internal/config" "github.com/optiqor/kerno/internal/doctor" + "github.com/optiqor/kerno/internal/sinks" ) func newDoctorCmd() *cobra.Command { @@ -34,6 +35,10 @@ func newDoctorCmd() *cobra.Command { noAI bool quiet bool noBanner bool + sinkURLs []string + sevFloor string + sinkDedupe time.Duration + dryRunSink bool ) cmd := &cobra.Command{ @@ -79,10 +84,14 @@ Add --ai to enrich findings with AI-powered analysis (requires API key).`, exitCode: exitCode, continuous: continuous, interval: interval, - output: output, - aiEnabled: aiEnabled, - quiet: quiet, - noBanner: noBanner, + output: output, + aiEnabled: aiEnabled, + quiet: quiet, + noBanner: noBanner, + sinkURLs: sinkURLs, + severityFloor: sevFloor, + sinkDedupe: sinkDedupe, + dryRunSinks: dryRunSink, }) }, } @@ -102,19 +111,27 @@ Add --ai to enrich findings with AI-powered analysis (requires API key).`, _ = cmd.RegisterFlagCompletionFunc("output", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) { return []string{"pretty", "json"}, cobra.ShellCompDirectiveNoFileComp }) + flags.StringSliceVar(&sinkURLs, "sink", nil, "webhook sinks (e.g. slack://..., pagerduty://...)") + flags.StringVar(&sevFloor, "severity-floor", "warning", "minimum severity to send to sinks (info, warning, critical)") + flags.DurationVar(&sinkDedupe, "sink-dedupe", 5*time.Minute, "deduplication window to prevent alert flapping") + flags.BoolVar(&dryRunSink, "dry-run-sinks", false, "print payloads instead of sending (useful for debugging sinks)") return cmd } type doctorOpts struct { - duration time.Duration - exitCode bool - continuous bool - interval time.Duration - output string - aiEnabled bool - quiet bool - noBanner bool + duration time.Duration + exitCode bool + continuous bool + interval time.Duration + output string + aiEnabled bool + quiet bool + noBanner bool + sinkURLs []string + severityFloor string + sinkDedupe time.Duration + dryRunSinks bool } func runDoctor(ctx context.Context, opts doctorOpts) error { @@ -171,9 +188,20 @@ func runDoctor(ctx context.Context, opts doctorOpts) error { } }() + // Build sinks + var activeSinks []sinks.Sink + if len(opts.sinkURLs) > 0 { + var err error + activeSinks, err = sinks.BuildSinks(opts.sinkURLs, logger) + if err != nil { + return fmt.Errorf("building sinks: %w", err) + } + } + deduper := sinks.NewDeduper(opts.sinkDedupe) + // Run the diagnostic loop (once, or continuous). for { - if err := runDiagnosticCycle(ctx, engine, build, renderer, opts, logger); err != nil { + if err := runDiagnosticCycle(ctx, engine, build, renderer, opts, activeSinks, deduper, logger); err != nil { return err } @@ -458,6 +486,8 @@ func runDiagnosticCycle( build collectorBuildResult, renderer doctor.Renderer, opts doctorOpts, + activeSinks []sinks.Sink, + deduper *sinks.Deduper, logger *slog.Logger, ) error { registry := build.registry @@ -567,6 +597,52 @@ func runDiagnosticCycle( return fmt.Errorf("rendering report: %w", err) } + // Phase 4.5: Sink export + if len(activeSinks) > 0 { + // Filter by severity floor + var minSev doctor.Severity + switch strings.ToLower(opts.severityFloor) { + case "info": + minSev = doctor.SeverityInfo + case "warning": + minSev = doctor.SeverityWarning + case "critical": + minSev = doctor.SeverityCritical + default: + minSev = doctor.SeverityWarning + } + + var filtered []doctor.Finding + for _, f := range report.Findings { + if f.Severity >= minSev { + filtered = append(filtered, f) + } + } + + deduped := deduper.Filter(filtered) + for _, sink := range activeSinks { + var err error + if sink.Name() == "pagerduty" { + // PagerDuty maintains its own state and needs full list to resolve. + if opts.dryRunSinks { + logger.Info("dry-run sink (pagerduty)", "findings", len(filtered)) + } else { + err = sink.Send(ctx, filtered) + } + } else { + // Slack/Discord use deduped list to prevent flapping. + if opts.dryRunSinks { + logger.Info("dry-run sink", "name", sink.Name(), "findings", len(deduped)) + } else { + err = sink.Send(ctx, deduped) + } + } + if err != nil { + logger.Error("failed to send to sink", "sink", sink.Name(), "error", err) + } + } + } + // Phase 5: Exit code handling for CI/CD. if opts.exitCode && report.HasCritical() { return &exitError{code: 1} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 8b41ac3..5aaae8c 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -155,6 +155,22 @@ var InfoMetric = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Help: "Kerno build information.", }, []string{"version"}) +// ─── Sink Metrics ───────────────────────────────────────────────────────── + +// SinksDedupedTotal counts findings dropped by the deduplicator. +var SinksDedupedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Name: "sinks_deduped_total", + Help: "Total findings dropped by the deduplicator.", +}, []string{"sink"}) + +// SinksFailedTotal counts findings that failed to send after retries. +var SinksFailedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Name: "sinks_failed_total", + Help: "Total findings that failed to send after retries.", +}, []string{"sink"}) + func init() { Registry.MustRegister( // Syscall @@ -181,5 +197,8 @@ func init() { CollectorErrorsTotal, BPFProgramsLoaded, InfoMetric, + // Sinks + SinksDedupedTotal, + SinksFailedTotal, ) } diff --git a/internal/sinks/dedupe.go b/internal/sinks/dedupe.go new file mode 100644 index 0000000..4a9472c --- /dev/null +++ b/internal/sinks/dedupe.go @@ -0,0 +1,69 @@ +package sinks + +import ( + "crypto/sha256" + "fmt" + "sync" + "time" + + "github.com/optiqor/kerno/internal/doctor" + "github.com/optiqor/kerno/internal/metrics" +) + +// Deduper filters out repeated findings to prevent alert fatigue. +type Deduper struct { + ttl time.Duration + mu sync.RWMutex + cache map[string]time.Time +} + +// NewDeduper creates a new Deduper with the given time-to-live. +func NewDeduper(ttl time.Duration) *Deduper { + return &Deduper{ + ttl: ttl, + cache: make(map[string]time.Time), + } +} + +// Filter returns only the findings that are novel (not seen within TTL). +// It increments the deduplication metric for any dropped findings. +func (d *Deduper) Filter(findings []doctor.Finding) []doctor.Finding { + if d.ttl <= 0 || len(findings) == 0 { + return findings + } + + now := time.Now() + var novel []doctor.Finding + + d.mu.Lock() + defer d.mu.Unlock() + + // Clean up stale entries on every filter to prevent unbounded memory growth. + // In a high-throughput system, this would be a background goroutine, but + // kerno doctor runs relatively infrequently. + for k, v := range d.cache { + if now.Sub(v) > d.ttl { + delete(d.cache, k) + } + } + + for _, f := range findings { + key := fingerprint(f) + if lastSeen, exists := d.cache[key]; exists && now.Sub(lastSeen) <= d.ttl { + metrics.SinksDedupedTotal.WithLabelValues("all").Inc() + continue + } + d.cache[key] = now + novel = append(novel, f) + } + + return novel +} + +// fingerprint creates a unique key for deduplication. +func fingerprint(f doctor.Finding) string { + // Include Rule, Severity, and Process. + h := sha256.New() + h.Write([]byte(fmt.Sprintf("%s:%d:%s", f.Rule, f.Severity, f.Process))) + return fmt.Sprintf("%x", h.Sum(nil)) +} diff --git a/internal/sinks/discord.go b/internal/sinks/discord.go new file mode 100644 index 0000000..465a44b --- /dev/null +++ b/internal/sinks/discord.go @@ -0,0 +1,33 @@ +package sinks + +import ( + "context" + "log/slog" + "strings" + + "github.com/optiqor/kerno/internal/doctor" +) + +// DiscordSink implements Sink for Discord using its built-in Slack compatibility. +type DiscordSink struct { + slack *SlackSink +} + +// NewDiscordSink creates a new DiscordSink. +func NewDiscordSink(url string, logger *slog.Logger) *DiscordSink { + // Discord natively supports Slack payloads if you append /slack to the webhook URL. + if !strings.HasSuffix(url, "/slack") { + url = strings.TrimRight(url, "/") + "/slack" + } + + return &DiscordSink{ + slack: NewSlackSink(url, logger), + } +} + +func (s *DiscordSink) Name() string { return "discord" } + +func (s *DiscordSink) Send(ctx context.Context, findings []doctor.Finding) error { + // Delegate to the Slack sink implementation since the endpoint accepts Slack payloads. + return s.slack.Send(ctx, findings) +} diff --git a/internal/sinks/pagerduty.go b/internal/sinks/pagerduty.go new file mode 100644 index 0000000..09c71c2 --- /dev/null +++ b/internal/sinks/pagerduty.go @@ -0,0 +1,127 @@ +package sinks + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "log/slog" + "net/http" + "sync" + "time" + + "github.com/optiqor/kerno/internal/doctor" +) + +// PagerDutySink implements Sink for PagerDuty Events API v2. +type PagerDutySink struct { + routingKey string + logger *slog.Logger + client *http.Client + + mu sync.Mutex + active map[string]struct{} +} + +// NewPagerDutySink creates a new PagerDutySink. +func NewPagerDutySink(routingKey string, logger *slog.Logger) *PagerDutySink { + return &PagerDutySink{ + routingKey: routingKey, + logger: logger, + client: &http.Client{Timeout: 10 * time.Second}, + active: make(map[string]struct{}), + } +} + +func (s *PagerDutySink) Name() string { return "pagerduty" } + +// Send delivers findings to PagerDuty. It triggers incidents for active findings +// and resolves incidents for findings that have cleared since the last cycle. +func (s *PagerDutySink) Send(ctx context.Context, findings []doctor.Finding) error { + s.mu.Lock() + defer s.mu.Unlock() + + current := make(map[string]doctor.Finding) + for _, f := range findings { + key := fingerprint(f) + current[key] = f + } + + var errs []error + + // Trigger or update currently active findings. + for key, f := range current { + if err := s.sendEvent(ctx, "trigger", key, f); err != nil { + errs = append(errs, fmt.Errorf("triggering %s: %w", key, err)) + } else { + s.active[key] = struct{}{} + } + } + + // Resolve findings that are no longer present. + for key := range s.active { + if _, exists := current[key]; !exists { + // Create a dummy finding just to hold the key for resolution. + // The Events API v2 only strictly needs the routing_key and dedup_key for a resolve. + dummy := doctor.Finding{Rule: "cleared"} + if err := s.sendEvent(ctx, "resolve", key, dummy); err != nil { + errs = append(errs, fmt.Errorf("resolving %s: %w", key, err)) + } else { + delete(s.active, key) + } + } + } + + if len(errs) > 0 { + return fmt.Errorf("pagerduty sink encountered %d errors, first: %v", len(errs), errs[0]) + } + return nil +} + +func (s *PagerDutySink) sendEvent(ctx context.Context, action, dedupKey string, f doctor.Finding) error { + payload := map[string]interface{}{ + "routing_key": s.routingKey, + "event_action": action, + "dedup_key": dedupKey, + } + + if action == "trigger" { + severity := "warning" + if f.Severity == doctor.SeverityCritical { + severity = "critical" + } else if f.Severity == doctor.SeverityInfo { + severity = "info" + } + + payload["payload"] = map[string]interface{}{ + "summary": fmt.Sprintf("[%s] %s", f.Severity.String(), f.Title), + "source": "kerno", + "severity": severity, + "custom_details": map[string]string{ + "rule": f.Rule, + "process": f.Process, + "signal": f.Signal, + "cause": f.Cause, + "evidence": f.Evidence, + }, + } + } + + body, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("marshaling pagerduty payload: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://events.pagerduty.com/v2/enqueue", bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("creating pagerduty request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + if err := sendWithRetry(ctx, s.client, req, s.Name()); err != nil { + return err + } + + s.logger.Debug("sent pagerduty event", "action", action, "dedup_key", dedupKey) + return nil +} diff --git a/internal/sinks/sink.go b/internal/sinks/sink.go new file mode 100644 index 0000000..9764cc6 --- /dev/null +++ b/internal/sinks/sink.go @@ -0,0 +1,147 @@ +package sinks + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "log/slog" + "net/http" + "os" + "strings" + "time" + + "github.com/optiqor/kerno/internal/doctor" + "github.com/optiqor/kerno/internal/metrics" +) + +// Sink represents a destination for doctor findings. +type Sink interface { + // Name returns the identifier of the sink (e.g., "slack", "pagerduty"). + Name() string + // Send delivers the findings to the destination. It may be called concurrently. + Send(ctx context.Context, findings []doctor.Finding) error +} + +// BuildSinks parses sink URLs and returns a list of configured Sinks. +// It resolves environment variable references (e.g., slack://$KERNO_SLACK_WEBHOOK_URL). +func BuildSinks(urls []string, logger *slog.Logger) ([]Sink, error) { + var sinks []Sink + for _, rawURL := range urls { + u := expandEnvInURL(rawURL) + + switch { + case strings.HasPrefix(u, "slack://"): + webhookURL := strings.TrimPrefix(u, "slack://") + if webhookURL == "" { + return nil, errors.New("slack sink requires a webhook URL") + } + // Re-add https if they stripped it in the slack:// prefix, assuming https:// for hooks. + if !strings.HasPrefix(webhookURL, "http") { + webhookURL = "https://" + webhookURL + } + sinks = append(sinks, NewSlackSink(webhookURL, logger)) + + case strings.HasPrefix(u, "pagerduty://"): + routingKey := strings.TrimPrefix(u, "pagerduty://") + if routingKey == "" { + return nil, errors.New("pagerduty sink requires a routing key") + } + sinks = append(sinks, NewPagerDutySink(routingKey, logger)) + + case strings.HasPrefix(u, "discord://"): + webhookURL := strings.TrimPrefix(u, "discord://") + if webhookURL == "" { + return nil, errors.New("discord sink requires a webhook URL") + } + if !strings.HasPrefix(webhookURL, "http") { + webhookURL = "https://" + webhookURL + } + sinks = append(sinks, NewDiscordSink(webhookURL, logger)) + + default: + return nil, fmt.Errorf("unsupported sink type: %s", rawURL) + } + } + return sinks, nil +} + +// expandEnvInURL allows users to pass "slack://$KERNO_SLACK_WEBHOOK_URL" +// to avoid putting secrets in shell history. +func expandEnvInURL(u string) string { + return os.Expand(u, func(key string) string { + return os.Getenv(key) + }) +} + +// sendWithRetry executes an HTTP request with exponential backoff. +// Max 3 attempts. +func sendWithRetry(ctx context.Context, client *http.Client, req *http.Request, sinkName string) error { + const maxRetries = 3 + baseDelay := 1 * time.Second + + var lastErr error + for attempt := 0; attempt < maxRetries; attempt++ { + // We must clone the request body for retries + var bodyCopy []byte + if req.Body != nil { + var err error + bodyCopy, err = readAllAndClose(req.Body) + if err != nil { + return fmt.Errorf("reading request body: %w", err) + } + req.Body = ioNopCloser(bytes.NewReader(bodyCopy)) + } + + resp, err := client.Do(req) + if err != nil { + lastErr = err + } else { + resp.Body.Close() + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return nil + } + lastErr = fmt.Errorf("HTTP %d", resp.StatusCode) + if resp.StatusCode >= 400 && resp.StatusCode < 500 && resp.StatusCode != 429 { + // Client error (not rate limit) - don't retry. + metrics.SinksFailedTotal.WithLabelValues(sinkName).Inc() + return lastErr + } + } + + if attempt < maxRetries-1 { + select { + case <-time.After(baseDelay * time.Duration(1< should resolve + actions = nil + err = sink.Send(context.Background(), []doctor.Finding{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(actions) != 1 || actions[0] != "resolve" { + t.Fatalf("expected 1 resolve, got %v", actions) + } +} + +func TestRetryBackoff(t *testing.T) { + var calls int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&calls, 1) + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + + req, _ := http.NewRequestWithContext(context.Background(), "POST", srv.URL, nil) + err := sendWithRetry(context.Background(), srv.Client(), req, "test") + + if err == nil { + t.Fatal("expected error after retries, got nil") + } + + if atomic.LoadInt32(&calls) != 3 { + t.Errorf("expected 3 calls, got %d", calls) + } +} + +type nopCloser struct{} + +func (nopCloser) Read(p []byte) (n int, err error) { return 0, nil } +func (nopCloser) Close() error { return nil } + +type mockTransport struct { + fn func(*http.Request) (*http.Response, error) +} + +func (m *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) { + return m.fn(req) +} diff --git a/internal/sinks/slack.go b/internal/sinks/slack.go new file mode 100644 index 0000000..c338e7d --- /dev/null +++ b/internal/sinks/slack.go @@ -0,0 +1,128 @@ +package sinks + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "log/slog" + "net/http" + "time" + + "github.com/optiqor/kerno/internal/doctor" +) + +// SlackSink implements Sink for Slack Incoming Webhooks using Block Kit. +type SlackSink struct { + url string + logger *slog.Logger + client *http.Client +} + +// NewSlackSink creates a new SlackSink. +func NewSlackSink(url string, logger *slog.Logger) *SlackSink { + return &SlackSink{ + url: url, + logger: logger, + client: &http.Client{Timeout: 10 * time.Second}, + } +} + +func (s *SlackSink) Name() string { return "slack" } + +func (s *SlackSink) Send(ctx context.Context, findings []doctor.Finding) error { + if len(findings) == 0 { + return nil + } + + payload := s.buildPayload(findings) + body, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("marshaling slack payload: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("creating slack request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + if err := sendWithRetry(ctx, s.client, req, s.Name()); err != nil { + return fmt.Errorf("slack sink failed: %w", err) + } + + s.logger.Debug("sent findings to slack", "count", len(findings)) + return nil +} + +func (s *SlackSink) buildPayload(findings []doctor.Finding) map[string]interface{} { + var attachments []map[string]interface{} + + for _, f := range findings { + color := "#f2c744" // yellow for WARNING + if f.Severity == doctor.SeverityCritical { + color = "#e01e5a" // red for CRITICAL + } + + blocks := []map[string]interface{}{ + { + "type": "header", + "text": map[string]interface{}{ + "type": "plain_text", + "text": fmt.Sprintf("[%s] %s", f.Severity.String(), f.Title), + }, + }, + { + "type": "section", + "text": map[string]interface{}{ + "type": "mrkdwn", + "text": fmt.Sprintf("*Cause:*\n%s\n\n*Impact:*\n%s", f.Cause, f.Impact), + }, + }, + } + + if f.Process != "" { + blocks = append(blocks, map[string]interface{}{ + "type": "section", + "text": map[string]interface{}{ + "type": "mrkdwn", + "text": fmt.Sprintf("*Process:* `%s` | *Signal:* `%s`", f.Process, f.Signal), + }, + }) + } + + if f.Evidence != "" { + blocks = append(blocks, map[string]interface{}{ + "type": "section", + "text": map[string]interface{}{ + "type": "mrkdwn", + "text": fmt.Sprintf("*Evidence:*\n```%s```", f.Evidence), + }, + }) + } + + if len(f.Fix) > 0 { + var fixText string + for _, fix := range f.Fix { + fixText += fmt.Sprintf("• %s\n", fix) + } + blocks = append(blocks, map[string]interface{}{ + "type": "section", + "text": map[string]interface{}{ + "type": "mrkdwn", + "text": fmt.Sprintf("*Fix:*\n%s", fixText), + }, + }) + } + + attachments = append(attachments, map[string]interface{}{ + "color": color, + "blocks": blocks, + }) + } + + return map[string]interface{}{ + "text": "Kerno Diagnostic Findings", + "attachments": attachments, + } +} From fa6c43c64247a737a8026d1742b56aa398e5f60a Mon Sep 17 00:00:00 2001 From: abhinavuser Date: Sun, 31 May 2026 17:57:40 +0530 Subject: [PATCH 2/6] fix: resolve compilation errors in sink.go (unused import, missing io package) Signed-off-by: abhinavuser --- internal/sinks/sink.go | 28 +++++----------------------- 1 file changed, 5 insertions(+), 23 deletions(-) diff --git a/internal/sinks/sink.go b/internal/sinks/sink.go index 9764cc6..878ed2d 100644 --- a/internal/sinks/sink.go +++ b/internal/sinks/sink.go @@ -3,9 +3,9 @@ package sinks import ( "bytes" "context" - "encoding/json" "errors" "fmt" + "io" "log/slog" "net/http" "os" @@ -87,11 +87,12 @@ func sendWithRetry(ctx context.Context, client *http.Client, req *http.Request, var bodyCopy []byte if req.Body != nil { var err error - bodyCopy, err = readAllAndClose(req.Body) + bodyCopy, err = io.ReadAll(req.Body) + req.Body.Close() if err != nil { return fmt.Errorf("reading request body: %w", err) } - req.Body = ioNopCloser(bytes.NewReader(bodyCopy)) + req.Body = io.NopCloser(bytes.NewReader(bodyCopy)) } resp, err := client.Do(req) @@ -118,7 +119,7 @@ func sendWithRetry(ctx context.Context, client *http.Client, req *http.Request, } // Restore body for the next attempt if bodyCopy != nil { - req.Body = ioNopCloser(bytes.NewReader(bodyCopy)) + req.Body = io.NopCloser(bytes.NewReader(bodyCopy)) } } } @@ -126,22 +127,3 @@ func sendWithRetry(ctx context.Context, client *http.Client, req *http.Request, metrics.SinksFailedTotal.WithLabelValues(sinkName).Inc() return fmt.Errorf("failed after %d attempts: %w", maxRetries, lastErr) } - -// Helper to read and close body -func readAllAndClose(rc interface{ Read(p []byte) (n int, err error); Close() error }) ([]byte, error) { - defer rc.Close() - buf := new(bytes.Buffer) - _, err := buf.ReadFrom(rc) - return buf.Bytes(), err -} - -// Helper to create a ReadCloser -type nopCloser struct { - *bytes.Reader -} - -func (nopCloser) Close() error { return nil } - -func ioNopCloser(r *bytes.Reader) interface{ Read(p []byte) (n int, err error); Close() error } { - return nopCloser{r} -} From 47c32de1a7108e213acab07342d9696518a87679 Mon Sep 17 00:00:00 2001 From: abhinavuser Date: Sun, 31 May 2026 18:03:16 +0530 Subject: [PATCH 3/6] test: clean up unused server and custom nopCloser to fix lint errors Signed-off-by: abhinavuser --- internal/sinks/sinks_test.go | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/internal/sinks/sinks_test.go b/internal/sinks/sinks_test.go index 9007641..3a09f8c 100644 --- a/internal/sinks/sinks_test.go +++ b/internal/sinks/sinks_test.go @@ -1,8 +1,10 @@ package sinks import ( + "bytes" "context" "encoding/json" + "io" "log/slog" "net/http" "net/http/httptest" @@ -84,16 +86,8 @@ func TestSlackSink(t *testing.T) { func TestPagerDutySink(t *testing.T) { var actions []string - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - var payload map[string]interface{} - json.NewDecoder(r.Body).Decode(&payload) - actions = append(actions, payload["event_action"].(string)) - w.WriteHeader(http.StatusAccepted) - })) - defer srv.Close() sink := NewPagerDutySink("dummy-key", slog.Default()) - sink.client.Transport = srv.Client().Transport // use same transport // We override the URL in the sink to point to our test server // Note: since the URL is hardcoded in pagerduty.go, we would normally make it configurable. @@ -105,7 +99,7 @@ func TestPagerDutySink(t *testing.T) { var payload map[string]interface{} json.NewDecoder(req.Body).Decode(&payload) actions = append(actions, payload["event_action"].(string)) - return &http.Response{StatusCode: 202, Body: nopCloser{}}, nil + return &http.Response{StatusCode: 202, Body: io.NopCloser(bytes.NewReader(nil))}, nil }, } @@ -162,11 +156,6 @@ func TestRetryBackoff(t *testing.T) { } } -type nopCloser struct{} - -func (nopCloser) Read(p []byte) (n int, err error) { return 0, nil } -func (nopCloser) Close() error { return nil } - type mockTransport struct { fn func(*http.Request) (*http.Response, error) } From cc7307a55ad15e360f26f1bb857dd614d4293db7 Mon Sep 17 00:00:00 2001 From: abhinavuser Date: Sun, 31 May 2026 18:14:10 +0530 Subject: [PATCH 4/6] fix: address golangci-lint failures (errorlint, gocritic, gofmt, prealloc) Signed-off-by: abhinavuser --- internal/cli/doctor.go | 8 ++++---- internal/sinks/dedupe.go | 4 ++-- internal/sinks/discord.go | 2 +- internal/sinks/pagerduty.go | 2 +- internal/sinks/sink.go | 4 +--- internal/sinks/sinks_test.go | 2 +- internal/sinks/slack.go | 2 +- 7 files changed, 11 insertions(+), 13 deletions(-) diff --git a/internal/cli/doctor.go b/internal/cli/doctor.go index 0cf62cf..5d56390 100644 --- a/internal/cli/doctor.go +++ b/internal/cli/doctor.go @@ -80,10 +80,10 @@ Add --ai to enrich findings with AI-powered analysis (requires API key).`, } return runDoctor(cmd.Context(), doctorOpts{ - duration: duration, - exitCode: exitCode, - continuous: continuous, - interval: interval, + duration: duration, + exitCode: exitCode, + continuous: continuous, + interval: interval, output: output, aiEnabled: aiEnabled, quiet: quiet, diff --git a/internal/sinks/dedupe.go b/internal/sinks/dedupe.go index 4a9472c..1cb719f 100644 --- a/internal/sinks/dedupe.go +++ b/internal/sinks/dedupe.go @@ -33,7 +33,7 @@ func (d *Deduper) Filter(findings []doctor.Finding) []doctor.Finding { } now := time.Now() - var novel []doctor.Finding + novel := make([]doctor.Finding, 0, len(findings)) d.mu.Lock() defer d.mu.Unlock() @@ -64,6 +64,6 @@ func (d *Deduper) Filter(findings []doctor.Finding) []doctor.Finding { func fingerprint(f doctor.Finding) string { // Include Rule, Severity, and Process. h := sha256.New() - h.Write([]byte(fmt.Sprintf("%s:%d:%s", f.Rule, f.Severity, f.Process))) + fmt.Fprintf(h, "%s:%d:%s", f.Rule, f.Severity, f.Process) return fmt.Sprintf("%x", h.Sum(nil)) } diff --git a/internal/sinks/discord.go b/internal/sinks/discord.go index 465a44b..e712cde 100644 --- a/internal/sinks/discord.go +++ b/internal/sinks/discord.go @@ -19,7 +19,7 @@ func NewDiscordSink(url string, logger *slog.Logger) *DiscordSink { if !strings.HasSuffix(url, "/slack") { url = strings.TrimRight(url, "/") + "/slack" } - + return &DiscordSink{ slack: NewSlackSink(url, logger), } diff --git a/internal/sinks/pagerduty.go b/internal/sinks/pagerduty.go index 09c71c2..a9c7152 100644 --- a/internal/sinks/pagerduty.go +++ b/internal/sinks/pagerduty.go @@ -73,7 +73,7 @@ func (s *PagerDutySink) Send(ctx context.Context, findings []doctor.Finding) err } if len(errs) > 0 { - return fmt.Errorf("pagerduty sink encountered %d errors, first: %v", len(errs), errs[0]) + return fmt.Errorf("pagerduty sink encountered %d errors, first: %w", len(errs), errs[0]) } return nil } diff --git a/internal/sinks/sink.go b/internal/sinks/sink.go index 878ed2d..6998e44 100644 --- a/internal/sinks/sink.go +++ b/internal/sinks/sink.go @@ -70,9 +70,7 @@ func BuildSinks(urls []string, logger *slog.Logger) ([]Sink, error) { // expandEnvInURL allows users to pass "slack://$KERNO_SLACK_WEBHOOK_URL" // to avoid putting secrets in shell history. func expandEnvInURL(u string) string { - return os.Expand(u, func(key string) string { - return os.Getenv(key) - }) + return os.Expand(u, os.Getenv) } // sendWithRetry executes an HTTP request with exponential backoff. diff --git a/internal/sinks/sinks_test.go b/internal/sinks/sinks_test.go index 3a09f8c..c7dd27c 100644 --- a/internal/sinks/sinks_test.go +++ b/internal/sinks/sinks_test.go @@ -77,7 +77,7 @@ func TestSlackSink(t *testing.T) { if !ok || len(attachments) != 1 { t.Fatalf("expected 1 attachment, got %v", attachments) } - + att := attachments[0].(map[string]interface{}) if att["color"] != "#e01e5a" { t.Errorf("expected critical color #e01e5a, got %v", att["color"]) diff --git a/internal/sinks/slack.go b/internal/sinks/slack.go index c338e7d..a2a9378 100644 --- a/internal/sinks/slack.go +++ b/internal/sinks/slack.go @@ -56,7 +56,7 @@ func (s *SlackSink) Send(ctx context.Context, findings []doctor.Finding) error { } func (s *SlackSink) buildPayload(findings []doctor.Finding) map[string]interface{} { - var attachments []map[string]interface{} + attachments := make([]map[string]interface{}, 0, len(findings)) for _, f := range findings { color := "#f2c744" // yellow for WARNING From 13e9bbd12dcd981c6c6c8e95ab65fd1fddf622b0 Mon Sep 17 00:00:00 2001 From: abhinavuser Date: Sun, 31 May 2026 18:24:52 +0530 Subject: [PATCH 5/6] fix: resolve remaining gofmt trailing whitespace errors Signed-off-by: abhinavuser --- internal/sinks/pagerduty.go | 2 +- internal/sinks/sinks_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/sinks/pagerduty.go b/internal/sinks/pagerduty.go index a9c7152..1ab963f 100644 --- a/internal/sinks/pagerduty.go +++ b/internal/sinks/pagerduty.go @@ -63,7 +63,7 @@ func (s *PagerDutySink) Send(ctx context.Context, findings []doctor.Finding) err if _, exists := current[key]; !exists { // Create a dummy finding just to hold the key for resolution. // The Events API v2 only strictly needs the routing_key and dedup_key for a resolve. - dummy := doctor.Finding{Rule: "cleared"} + dummy := doctor.Finding{Rule: "cleared"} if err := s.sendEvent(ctx, "resolve", key, dummy); err != nil { errs = append(errs, fmt.Errorf("resolving %s: %w", key, err)) } else { diff --git a/internal/sinks/sinks_test.go b/internal/sinks/sinks_test.go index c7dd27c..5fb46e1 100644 --- a/internal/sinks/sinks_test.go +++ b/internal/sinks/sinks_test.go @@ -92,7 +92,7 @@ func TestPagerDutySink(t *testing.T) { // We override the URL in the sink to point to our test server // Note: since the URL is hardcoded in pagerduty.go, we would normally make it configurable. // For this test, we'll just test the Send loop with a mocked http.Client or RoundTripper. - + // Let's mock the RoundTripper to intercept the hardcoded URL. sink.client.Transport = &mockTransport{ fn: func(req *http.Request) (*http.Response, error) { From 2c49c7728ecf686910e6f96b6bb08aee048f857c Mon Sep 17 00:00:00 2001 From: abhinavuser Date: Sun, 31 May 2026 18:43:14 +0530 Subject: [PATCH 6/6] fix: resolve final gofmt trailing whitespace error in sinks_test.go Signed-off-by: abhinavuser --- internal/sinks/sinks_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/sinks/sinks_test.go b/internal/sinks/sinks_test.go index 5fb46e1..cccad54 100644 --- a/internal/sinks/sinks_test.go +++ b/internal/sinks/sinks_test.go @@ -104,7 +104,7 @@ func TestPagerDutySink(t *testing.T) { } f1 := doctor.Finding{Rule: "rule1", Severity: doctor.SeverityWarning} - + // Cycle 1: trigger err := sink.Send(context.Background(), []doctor.Finding{f1}) if err != nil {