diff --git a/internal/cli/doctor.go b/internal/cli/doctor.go index aeecc8f..5d56390 100644 --- a/internal/cli/doctor.go +++ b/internal/cli/doctor.go @@ -21,6 +21,7 @@ import ( "github.com/optiqor/kerno/internal/collector" "github.com/optiqor/kerno/internal/config" "github.com/optiqor/kerno/internal/doctor" + "github.com/optiqor/kerno/internal/sinks" ) func newDoctorCmd() *cobra.Command { @@ -34,6 +35,10 @@ func newDoctorCmd() *cobra.Command { noAI bool quiet bool noBanner bool + sinkURLs []string + sevFloor string + sinkDedupe time.Duration + dryRunSink bool ) cmd := &cobra.Command{ @@ -75,14 +80,18 @@ Add --ai to enrich findings with AI-powered analysis (requires API key).`, } return runDoctor(cmd.Context(), doctorOpts{ - duration: duration, - exitCode: exitCode, - continuous: continuous, - interval: interval, - output: output, - aiEnabled: aiEnabled, - quiet: quiet, - noBanner: noBanner, + duration: duration, + exitCode: exitCode, + continuous: continuous, + interval: interval, + output: output, + aiEnabled: aiEnabled, + quiet: quiet, + noBanner: noBanner, + sinkURLs: sinkURLs, + severityFloor: sevFloor, + sinkDedupe: sinkDedupe, + dryRunSinks: dryRunSink, }) }, } @@ -102,19 +111,27 @@ Add --ai to enrich findings with AI-powered analysis (requires API key).`, _ = cmd.RegisterFlagCompletionFunc("output", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) { return []string{"pretty", "json"}, cobra.ShellCompDirectiveNoFileComp }) + flags.StringSliceVar(&sinkURLs, "sink", nil, "webhook sinks (e.g. slack://..., pagerduty://...)") + flags.StringVar(&sevFloor, "severity-floor", "warning", "minimum severity to send to sinks (info, warning, critical)") + flags.DurationVar(&sinkDedupe, "sink-dedupe", 5*time.Minute, "deduplication window to prevent alert flapping") + flags.BoolVar(&dryRunSink, "dry-run-sinks", false, "print payloads instead of sending (useful for debugging sinks)") return cmd } type doctorOpts struct { - duration time.Duration - exitCode bool - continuous bool - interval time.Duration - output string - aiEnabled bool - quiet bool - noBanner bool + duration time.Duration + exitCode bool + continuous bool + interval time.Duration + output string + aiEnabled bool + quiet bool + noBanner bool + sinkURLs []string + severityFloor string + sinkDedupe time.Duration + dryRunSinks bool } func runDoctor(ctx context.Context, opts doctorOpts) error { @@ -171,9 +188,20 @@ func runDoctor(ctx context.Context, opts doctorOpts) error { } }() + // Build sinks + var activeSinks []sinks.Sink + if len(opts.sinkURLs) > 0 { + var err error + activeSinks, err = sinks.BuildSinks(opts.sinkURLs, logger) + if err != nil { + return fmt.Errorf("building sinks: %w", err) + } + } + deduper := sinks.NewDeduper(opts.sinkDedupe) + // Run the diagnostic loop (once, or continuous). for { - if err := runDiagnosticCycle(ctx, engine, build, renderer, opts, logger); err != nil { + if err := runDiagnosticCycle(ctx, engine, build, renderer, opts, activeSinks, deduper, logger); err != nil { return err } @@ -458,6 +486,8 @@ func runDiagnosticCycle( build collectorBuildResult, renderer doctor.Renderer, opts doctorOpts, + activeSinks []sinks.Sink, + deduper *sinks.Deduper, logger *slog.Logger, ) error { registry := build.registry @@ -567,6 +597,52 @@ func runDiagnosticCycle( return fmt.Errorf("rendering report: %w", err) } + // Phase 4.5: Sink export + if len(activeSinks) > 0 { + // Filter by severity floor + var minSev doctor.Severity + switch strings.ToLower(opts.severityFloor) { + case "info": + minSev = doctor.SeverityInfo + case "warning": + minSev = doctor.SeverityWarning + case "critical": + minSev = doctor.SeverityCritical + default: + minSev = doctor.SeverityWarning + } + + var filtered []doctor.Finding + for _, f := range report.Findings { + if f.Severity >= minSev { + filtered = append(filtered, f) + } + } + + deduped := deduper.Filter(filtered) + for _, sink := range activeSinks { + var err error + if sink.Name() == "pagerduty" { + // PagerDuty maintains its own state and needs full list to resolve. + if opts.dryRunSinks { + logger.Info("dry-run sink (pagerduty)", "findings", len(filtered)) + } else { + err = sink.Send(ctx, filtered) + } + } else { + // Slack/Discord use deduped list to prevent flapping. + if opts.dryRunSinks { + logger.Info("dry-run sink", "name", sink.Name(), "findings", len(deduped)) + } else { + err = sink.Send(ctx, deduped) + } + } + if err != nil { + logger.Error("failed to send to sink", "sink", sink.Name(), "error", err) + } + } + } + // Phase 5: Exit code handling for CI/CD. if opts.exitCode && report.HasCritical() { return &exitError{code: 1} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 8b41ac3..5aaae8c 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -155,6 +155,22 @@ var InfoMetric = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Help: "Kerno build information.", }, []string{"version"}) +// ─── Sink Metrics ───────────────────────────────────────────────────────── + +// SinksDedupedTotal counts findings dropped by the deduplicator. +var SinksDedupedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Name: "sinks_deduped_total", + Help: "Total findings dropped by the deduplicator.", +}, []string{"sink"}) + +// SinksFailedTotal counts findings that failed to send after retries. +var SinksFailedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Name: "sinks_failed_total", + Help: "Total findings that failed to send after retries.", +}, []string{"sink"}) + func init() { Registry.MustRegister( // Syscall @@ -181,5 +197,8 @@ func init() { CollectorErrorsTotal, BPFProgramsLoaded, InfoMetric, + // Sinks + SinksDedupedTotal, + SinksFailedTotal, ) } diff --git a/internal/sinks/dedupe.go b/internal/sinks/dedupe.go new file mode 100644 index 0000000..1cb719f --- /dev/null +++ b/internal/sinks/dedupe.go @@ -0,0 +1,69 @@ +package sinks + +import ( + "crypto/sha256" + "fmt" + "sync" + "time" + + "github.com/optiqor/kerno/internal/doctor" + "github.com/optiqor/kerno/internal/metrics" +) + +// Deduper filters out repeated findings to prevent alert fatigue. +type Deduper struct { + ttl time.Duration + mu sync.RWMutex + cache map[string]time.Time +} + +// NewDeduper creates a new Deduper with the given time-to-live. +func NewDeduper(ttl time.Duration) *Deduper { + return &Deduper{ + ttl: ttl, + cache: make(map[string]time.Time), + } +} + +// Filter returns only the findings that are novel (not seen within TTL). +// It increments the deduplication metric for any dropped findings. +func (d *Deduper) Filter(findings []doctor.Finding) []doctor.Finding { + if d.ttl <= 0 || len(findings) == 0 { + return findings + } + + now := time.Now() + novel := make([]doctor.Finding, 0, len(findings)) + + d.mu.Lock() + defer d.mu.Unlock() + + // Clean up stale entries on every filter to prevent unbounded memory growth. + // In a high-throughput system, this would be a background goroutine, but + // kerno doctor runs relatively infrequently. + for k, v := range d.cache { + if now.Sub(v) > d.ttl { + delete(d.cache, k) + } + } + + for _, f := range findings { + key := fingerprint(f) + if lastSeen, exists := d.cache[key]; exists && now.Sub(lastSeen) <= d.ttl { + metrics.SinksDedupedTotal.WithLabelValues("all").Inc() + continue + } + d.cache[key] = now + novel = append(novel, f) + } + + return novel +} + +// fingerprint creates a unique key for deduplication. +func fingerprint(f doctor.Finding) string { + // Include Rule, Severity, and Process. + h := sha256.New() + fmt.Fprintf(h, "%s:%d:%s", f.Rule, f.Severity, f.Process) + return fmt.Sprintf("%x", h.Sum(nil)) +} diff --git a/internal/sinks/discord.go b/internal/sinks/discord.go new file mode 100644 index 0000000..e712cde --- /dev/null +++ b/internal/sinks/discord.go @@ -0,0 +1,33 @@ +package sinks + +import ( + "context" + "log/slog" + "strings" + + "github.com/optiqor/kerno/internal/doctor" +) + +// DiscordSink implements Sink for Discord using its built-in Slack compatibility. +type DiscordSink struct { + slack *SlackSink +} + +// NewDiscordSink creates a new DiscordSink. +func NewDiscordSink(url string, logger *slog.Logger) *DiscordSink { + // Discord natively supports Slack payloads if you append /slack to the webhook URL. + if !strings.HasSuffix(url, "/slack") { + url = strings.TrimRight(url, "/") + "/slack" + } + + return &DiscordSink{ + slack: NewSlackSink(url, logger), + } +} + +func (s *DiscordSink) Name() string { return "discord" } + +func (s *DiscordSink) Send(ctx context.Context, findings []doctor.Finding) error { + // Delegate to the Slack sink implementation since the endpoint accepts Slack payloads. + return s.slack.Send(ctx, findings) +} diff --git a/internal/sinks/pagerduty.go b/internal/sinks/pagerduty.go new file mode 100644 index 0000000..1ab963f --- /dev/null +++ b/internal/sinks/pagerduty.go @@ -0,0 +1,127 @@ +package sinks + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "log/slog" + "net/http" + "sync" + "time" + + "github.com/optiqor/kerno/internal/doctor" +) + +// PagerDutySink implements Sink for PagerDuty Events API v2. +type PagerDutySink struct { + routingKey string + logger *slog.Logger + client *http.Client + + mu sync.Mutex + active map[string]struct{} +} + +// NewPagerDutySink creates a new PagerDutySink. +func NewPagerDutySink(routingKey string, logger *slog.Logger) *PagerDutySink { + return &PagerDutySink{ + routingKey: routingKey, + logger: logger, + client: &http.Client{Timeout: 10 * time.Second}, + active: make(map[string]struct{}), + } +} + +func (s *PagerDutySink) Name() string { return "pagerduty" } + +// Send delivers findings to PagerDuty. It triggers incidents for active findings +// and resolves incidents for findings that have cleared since the last cycle. +func (s *PagerDutySink) Send(ctx context.Context, findings []doctor.Finding) error { + s.mu.Lock() + defer s.mu.Unlock() + + current := make(map[string]doctor.Finding) + for _, f := range findings { + key := fingerprint(f) + current[key] = f + } + + var errs []error + + // Trigger or update currently active findings. + for key, f := range current { + if err := s.sendEvent(ctx, "trigger", key, f); err != nil { + errs = append(errs, fmt.Errorf("triggering %s: %w", key, err)) + } else { + s.active[key] = struct{}{} + } + } + + // Resolve findings that are no longer present. + for key := range s.active { + if _, exists := current[key]; !exists { + // Create a dummy finding just to hold the key for resolution. + // The Events API v2 only strictly needs the routing_key and dedup_key for a resolve. + dummy := doctor.Finding{Rule: "cleared"} + if err := s.sendEvent(ctx, "resolve", key, dummy); err != nil { + errs = append(errs, fmt.Errorf("resolving %s: %w", key, err)) + } else { + delete(s.active, key) + } + } + } + + if len(errs) > 0 { + return fmt.Errorf("pagerduty sink encountered %d errors, first: %w", len(errs), errs[0]) + } + return nil +} + +func (s *PagerDutySink) sendEvent(ctx context.Context, action, dedupKey string, f doctor.Finding) error { + payload := map[string]interface{}{ + "routing_key": s.routingKey, + "event_action": action, + "dedup_key": dedupKey, + } + + if action == "trigger" { + severity := "warning" + if f.Severity == doctor.SeverityCritical { + severity = "critical" + } else if f.Severity == doctor.SeverityInfo { + severity = "info" + } + + payload["payload"] = map[string]interface{}{ + "summary": fmt.Sprintf("[%s] %s", f.Severity.String(), f.Title), + "source": "kerno", + "severity": severity, + "custom_details": map[string]string{ + "rule": f.Rule, + "process": f.Process, + "signal": f.Signal, + "cause": f.Cause, + "evidence": f.Evidence, + }, + } + } + + body, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("marshaling pagerduty payload: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://events.pagerduty.com/v2/enqueue", bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("creating pagerduty request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + if err := sendWithRetry(ctx, s.client, req, s.Name()); err != nil { + return err + } + + s.logger.Debug("sent pagerduty event", "action", action, "dedup_key", dedupKey) + return nil +} diff --git a/internal/sinks/sink.go b/internal/sinks/sink.go new file mode 100644 index 0000000..6998e44 --- /dev/null +++ b/internal/sinks/sink.go @@ -0,0 +1,127 @@ +package sinks + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "os" + "strings" + "time" + + "github.com/optiqor/kerno/internal/doctor" + "github.com/optiqor/kerno/internal/metrics" +) + +// Sink represents a destination for doctor findings. +type Sink interface { + // Name returns the identifier of the sink (e.g., "slack", "pagerduty"). + Name() string + // Send delivers the findings to the destination. It may be called concurrently. + Send(ctx context.Context, findings []doctor.Finding) error +} + +// BuildSinks parses sink URLs and returns a list of configured Sinks. +// It resolves environment variable references (e.g., slack://$KERNO_SLACK_WEBHOOK_URL). +func BuildSinks(urls []string, logger *slog.Logger) ([]Sink, error) { + var sinks []Sink + for _, rawURL := range urls { + u := expandEnvInURL(rawURL) + + switch { + case strings.HasPrefix(u, "slack://"): + webhookURL := strings.TrimPrefix(u, "slack://") + if webhookURL == "" { + return nil, errors.New("slack sink requires a webhook URL") + } + // Re-add https if they stripped it in the slack:// prefix, assuming https:// for hooks. + if !strings.HasPrefix(webhookURL, "http") { + webhookURL = "https://" + webhookURL + } + sinks = append(sinks, NewSlackSink(webhookURL, logger)) + + case strings.HasPrefix(u, "pagerduty://"): + routingKey := strings.TrimPrefix(u, "pagerduty://") + if routingKey == "" { + return nil, errors.New("pagerduty sink requires a routing key") + } + sinks = append(sinks, NewPagerDutySink(routingKey, logger)) + + case strings.HasPrefix(u, "discord://"): + webhookURL := strings.TrimPrefix(u, "discord://") + if webhookURL == "" { + return nil, errors.New("discord sink requires a webhook URL") + } + if !strings.HasPrefix(webhookURL, "http") { + webhookURL = "https://" + webhookURL + } + sinks = append(sinks, NewDiscordSink(webhookURL, logger)) + + default: + return nil, fmt.Errorf("unsupported sink type: %s", rawURL) + } + } + return sinks, nil +} + +// expandEnvInURL allows users to pass "slack://$KERNO_SLACK_WEBHOOK_URL" +// to avoid putting secrets in shell history. +func expandEnvInURL(u string) string { + return os.Expand(u, os.Getenv) +} + +// sendWithRetry executes an HTTP request with exponential backoff. +// Max 3 attempts. +func sendWithRetry(ctx context.Context, client *http.Client, req *http.Request, sinkName string) error { + const maxRetries = 3 + baseDelay := 1 * time.Second + + var lastErr error + for attempt := 0; attempt < maxRetries; attempt++ { + // We must clone the request body for retries + var bodyCopy []byte + if req.Body != nil { + var err error + bodyCopy, err = io.ReadAll(req.Body) + req.Body.Close() + if err != nil { + return fmt.Errorf("reading request body: %w", err) + } + req.Body = io.NopCloser(bytes.NewReader(bodyCopy)) + } + + resp, err := client.Do(req) + if err != nil { + lastErr = err + } else { + resp.Body.Close() + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return nil + } + lastErr = fmt.Errorf("HTTP %d", resp.StatusCode) + if resp.StatusCode >= 400 && resp.StatusCode < 500 && resp.StatusCode != 429 { + // Client error (not rate limit) - don't retry. + metrics.SinksFailedTotal.WithLabelValues(sinkName).Inc() + return lastErr + } + } + + if attempt < maxRetries-1 { + select { + case <-time.After(baseDelay * time.Duration(1< should resolve + actions = nil + err = sink.Send(context.Background(), []doctor.Finding{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(actions) != 1 || actions[0] != "resolve" { + t.Fatalf("expected 1 resolve, got %v", actions) + } +} + +func TestRetryBackoff(t *testing.T) { + var calls int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&calls, 1) + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + + req, _ := http.NewRequestWithContext(context.Background(), "POST", srv.URL, nil) + err := sendWithRetry(context.Background(), srv.Client(), req, "test") + + if err == nil { + t.Fatal("expected error after retries, got nil") + } + + if atomic.LoadInt32(&calls) != 3 { + t.Errorf("expected 3 calls, got %d", calls) + } +} + +type mockTransport struct { + fn func(*http.Request) (*http.Response, error) +} + +func (m *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) { + return m.fn(req) +} diff --git a/internal/sinks/slack.go b/internal/sinks/slack.go new file mode 100644 index 0000000..a2a9378 --- /dev/null +++ b/internal/sinks/slack.go @@ -0,0 +1,128 @@ +package sinks + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "log/slog" + "net/http" + "time" + + "github.com/optiqor/kerno/internal/doctor" +) + +// SlackSink implements Sink for Slack Incoming Webhooks using Block Kit. +type SlackSink struct { + url string + logger *slog.Logger + client *http.Client +} + +// NewSlackSink creates a new SlackSink. +func NewSlackSink(url string, logger *slog.Logger) *SlackSink { + return &SlackSink{ + url: url, + logger: logger, + client: &http.Client{Timeout: 10 * time.Second}, + } +} + +func (s *SlackSink) Name() string { return "slack" } + +func (s *SlackSink) Send(ctx context.Context, findings []doctor.Finding) error { + if len(findings) == 0 { + return nil + } + + payload := s.buildPayload(findings) + body, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("marshaling slack payload: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("creating slack request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + if err := sendWithRetry(ctx, s.client, req, s.Name()); err != nil { + return fmt.Errorf("slack sink failed: %w", err) + } + + s.logger.Debug("sent findings to slack", "count", len(findings)) + return nil +} + +func (s *SlackSink) buildPayload(findings []doctor.Finding) map[string]interface{} { + attachments := make([]map[string]interface{}, 0, len(findings)) + + for _, f := range findings { + color := "#f2c744" // yellow for WARNING + if f.Severity == doctor.SeverityCritical { + color = "#e01e5a" // red for CRITICAL + } + + blocks := []map[string]interface{}{ + { + "type": "header", + "text": map[string]interface{}{ + "type": "plain_text", + "text": fmt.Sprintf("[%s] %s", f.Severity.String(), f.Title), + }, + }, + { + "type": "section", + "text": map[string]interface{}{ + "type": "mrkdwn", + "text": fmt.Sprintf("*Cause:*\n%s\n\n*Impact:*\n%s", f.Cause, f.Impact), + }, + }, + } + + if f.Process != "" { + blocks = append(blocks, map[string]interface{}{ + "type": "section", + "text": map[string]interface{}{ + "type": "mrkdwn", + "text": fmt.Sprintf("*Process:* `%s` | *Signal:* `%s`", f.Process, f.Signal), + }, + }) + } + + if f.Evidence != "" { + blocks = append(blocks, map[string]interface{}{ + "type": "section", + "text": map[string]interface{}{ + "type": "mrkdwn", + "text": fmt.Sprintf("*Evidence:*\n```%s```", f.Evidence), + }, + }) + } + + if len(f.Fix) > 0 { + var fixText string + for _, fix := range f.Fix { + fixText += fmt.Sprintf("• %s\n", fix) + } + blocks = append(blocks, map[string]interface{}{ + "type": "section", + "text": map[string]interface{}{ + "type": "mrkdwn", + "text": fmt.Sprintf("*Fix:*\n%s", fixText), + }, + }) + } + + attachments = append(attachments, map[string]interface{}{ + "color": color, + "blocks": blocks, + }) + } + + return map[string]interface{}{ + "text": "Kerno Diagnostic Findings", + "attachments": attachments, + } +}