Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 93 additions & 17 deletions internal/cli/doctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/optiqor/kerno/internal/collector"
"github.com/optiqor/kerno/internal/config"
"github.com/optiqor/kerno/internal/doctor"
"github.com/optiqor/kerno/internal/sinks"
)

func newDoctorCmd() *cobra.Command {
Expand All @@ -34,6 +35,10 @@ func newDoctorCmd() *cobra.Command {
noAI bool
quiet bool
noBanner bool
sinkURLs []string
sevFloor string
sinkDedupe time.Duration
dryRunSink bool
)

cmd := &cobra.Command{
Expand Down Expand Up @@ -75,14 +80,18 @@ Add --ai to enrich findings with AI-powered analysis (requires API key).`,
}

return runDoctor(cmd.Context(), doctorOpts{
duration: duration,
exitCode: exitCode,
continuous: continuous,
interval: interval,
output: output,
aiEnabled: aiEnabled,
quiet: quiet,
noBanner: noBanner,
duration: duration,
exitCode: exitCode,
continuous: continuous,
interval: interval,
output: output,
aiEnabled: aiEnabled,
quiet: quiet,
noBanner: noBanner,
sinkURLs: sinkURLs,
severityFloor: sevFloor,
sinkDedupe: sinkDedupe,
dryRunSinks: dryRunSink,
})
},
}
Expand All @@ -102,19 +111,27 @@ Add --ai to enrich findings with AI-powered analysis (requires API key).`,
_ = cmd.RegisterFlagCompletionFunc("output", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) {
return []string{"pretty", "json"}, cobra.ShellCompDirectiveNoFileComp
})
flags.StringSliceVar(&sinkURLs, "sink", nil, "webhook sinks (e.g. slack://..., pagerduty://...)")
flags.StringVar(&sevFloor, "severity-floor", "warning", "minimum severity to send to sinks (info, warning, critical)")
flags.DurationVar(&sinkDedupe, "sink-dedupe", 5*time.Minute, "deduplication window to prevent alert flapping")
flags.BoolVar(&dryRunSink, "dry-run-sinks", false, "print payloads instead of sending (useful for debugging sinks)")

return cmd
}

type doctorOpts struct {
duration time.Duration
exitCode bool
continuous bool
interval time.Duration
output string
aiEnabled bool
quiet bool
noBanner bool
duration time.Duration
exitCode bool
continuous bool
interval time.Duration
output string
aiEnabled bool
quiet bool
noBanner bool
sinkURLs []string
severityFloor string
sinkDedupe time.Duration
dryRunSinks bool
}

func runDoctor(ctx context.Context, opts doctorOpts) error {
Expand Down Expand Up @@ -171,9 +188,20 @@ func runDoctor(ctx context.Context, opts doctorOpts) error {
}
}()

// Build sinks
var activeSinks []sinks.Sink
if len(opts.sinkURLs) > 0 {
var err error
activeSinks, err = sinks.BuildSinks(opts.sinkURLs, logger)
if err != nil {
return fmt.Errorf("building sinks: %w", err)
}
}
deduper := sinks.NewDeduper(opts.sinkDedupe)

// Run the diagnostic loop (once, or continuous).
for {
if err := runDiagnosticCycle(ctx, engine, build, renderer, opts, logger); err != nil {
if err := runDiagnosticCycle(ctx, engine, build, renderer, opts, activeSinks, deduper, logger); err != nil {
return err
}

Expand Down Expand Up @@ -458,6 +486,8 @@ func runDiagnosticCycle(
build collectorBuildResult,
renderer doctor.Renderer,
opts doctorOpts,
activeSinks []sinks.Sink,
deduper *sinks.Deduper,
logger *slog.Logger,
) error {
registry := build.registry
Expand Down Expand Up @@ -567,6 +597,52 @@ func runDiagnosticCycle(
return fmt.Errorf("rendering report: %w", err)
}

// Phase 4.5: Sink export
if len(activeSinks) > 0 {
// Filter by severity floor
var minSev doctor.Severity
switch strings.ToLower(opts.severityFloor) {
case "info":
minSev = doctor.SeverityInfo
case "warning":
minSev = doctor.SeverityWarning
case "critical":
minSev = doctor.SeverityCritical
default:
minSev = doctor.SeverityWarning
}

var filtered []doctor.Finding
for _, f := range report.Findings {
if f.Severity >= minSev {
filtered = append(filtered, f)
}
}

deduped := deduper.Filter(filtered)
for _, sink := range activeSinks {
var err error
if sink.Name() == "pagerduty" {
// PagerDuty maintains its own state and needs full list to resolve.
if opts.dryRunSinks {
logger.Info("dry-run sink (pagerduty)", "findings", len(filtered))
} else {
err = sink.Send(ctx, filtered)
}
} else {
// Slack/Discord use deduped list to prevent flapping.
if opts.dryRunSinks {
logger.Info("dry-run sink", "name", sink.Name(), "findings", len(deduped))
} else {
err = sink.Send(ctx, deduped)
}
}
if err != nil {
logger.Error("failed to send to sink", "sink", sink.Name(), "error", err)
}
}
}

// Phase 5: Exit code handling for CI/CD.
if opts.exitCode && report.HasCritical() {
return &exitError{code: 1}
Expand Down
19 changes: 19 additions & 0 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,22 @@ var InfoMetric = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Help: "Kerno build information.",
}, []string{"version"})

// ─── Sink Metrics ─────────────────────────────────────────────────────────

// SinksDedupedTotal counts findings dropped by the deduplicator.
var SinksDedupedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Name: "sinks_deduped_total",
Help: "Total findings dropped by the deduplicator.",
}, []string{"sink"})

// SinksFailedTotal counts findings that failed to send after retries.
var SinksFailedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Name: "sinks_failed_total",
Help: "Total findings that failed to send after retries.",
}, []string{"sink"})

func init() {
Registry.MustRegister(
// Syscall
Expand All @@ -181,5 +197,8 @@ func init() {
CollectorErrorsTotal,
BPFProgramsLoaded,
InfoMetric,
// Sinks
SinksDedupedTotal,
SinksFailedTotal,
)
}
69 changes: 69 additions & 0 deletions internal/sinks/dedupe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package sinks

import (
"crypto/sha256"
"fmt"
"sync"
"time"

"github.com/optiqor/kerno/internal/doctor"
"github.com/optiqor/kerno/internal/metrics"
)

// Deduper filters out repeated findings to prevent alert fatigue.
type Deduper struct {
ttl time.Duration
mu sync.RWMutex
cache map[string]time.Time
}

// NewDeduper creates a new Deduper with the given time-to-live.
func NewDeduper(ttl time.Duration) *Deduper {
return &Deduper{
ttl: ttl,
cache: make(map[string]time.Time),
}
}

// Filter returns only the findings that are novel (not seen within TTL).
// It increments the deduplication metric for any dropped findings.
func (d *Deduper) Filter(findings []doctor.Finding) []doctor.Finding {
if d.ttl <= 0 || len(findings) == 0 {
return findings
}

now := time.Now()
novel := make([]doctor.Finding, 0, len(findings))

d.mu.Lock()
defer d.mu.Unlock()

// Clean up stale entries on every filter to prevent unbounded memory growth.
// In a high-throughput system, this would be a background goroutine, but
// kerno doctor runs relatively infrequently.
for k, v := range d.cache {
if now.Sub(v) > d.ttl {
delete(d.cache, k)
}
}

for _, f := range findings {
key := fingerprint(f)
if lastSeen, exists := d.cache[key]; exists && now.Sub(lastSeen) <= d.ttl {
metrics.SinksDedupedTotal.WithLabelValues("all").Inc()
continue
}
d.cache[key] = now
novel = append(novel, f)
}

return novel
}

// fingerprint creates a unique key for deduplication.
func fingerprint(f doctor.Finding) string {
// Include Rule, Severity, and Process.
h := sha256.New()
fmt.Fprintf(h, "%s:%d:%s", f.Rule, f.Severity, f.Process)
return fmt.Sprintf("%x", h.Sum(nil))
}
33 changes: 33 additions & 0 deletions internal/sinks/discord.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package sinks

import (
"context"
"log/slog"
"strings"

"github.com/optiqor/kerno/internal/doctor"
)

// DiscordSink implements Sink for Discord using its built-in Slack compatibility.
type DiscordSink struct {
slack *SlackSink
}

// NewDiscordSink creates a new DiscordSink.
func NewDiscordSink(url string, logger *slog.Logger) *DiscordSink {
// Discord natively supports Slack payloads if you append /slack to the webhook URL.
if !strings.HasSuffix(url, "/slack") {
url = strings.TrimRight(url, "/") + "/slack"
}

return &DiscordSink{
slack: NewSlackSink(url, logger),
}
}

func (s *DiscordSink) Name() string { return "discord" }

func (s *DiscordSink) Send(ctx context.Context, findings []doctor.Finding) error {
// Delegate to the Slack sink implementation since the endpoint accepts Slack payloads.
return s.slack.Send(ctx, findings)
}
Loading