diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 5a65bf8..986d3e2 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -1,7 +1,6 @@ { "name": "kerno", "image": "mcr.microsoft.com/devcontainers/go:1-1.25-bookworm", - "features": { "ghcr.io/devcontainers/features/docker-in-docker:2": {}, "ghcr.io/devcontainers/features/github-cli:1": {}, @@ -10,9 +9,7 @@ "helm": "latest" } }, - "postCreateCommand": "bash -lc 'sudo apt-get update && sudo apt-get install -y --no-install-recommends clang llvm libbpf-dev linux-headers-generic linux-tools-common bpftool jq make && go install github.com/cilium/ebpf/cmd/bpf2go@latest && curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.62.0 || true'", - "customizations": { "vscode": { "extensions": [ @@ -44,10 +41,8 @@ } } }, - "remoteUser": "vscode", - "mounts": [ "source=/sys/kernel/btf/vmlinux,target=/sys/kernel/btf/vmlinux,type=bind,readonly,optional=true" ] -} +} \ No newline at end of file diff --git a/.github/workflows/soak.yml b/.github/workflows/soak.yml new file mode 100644 index 0000000..9c4ad7c --- /dev/null +++ b/.github/workflows/soak.yml @@ -0,0 +1,204 @@ +# Copyright 2026 Optiqor contributors +# SPDX-License-Identifier: Apache-2.0 + +name: Soak Test (24h) + +on: + schedule: + - cron: '0 2 * * *' # nightly at 02:00 UTC + workflow_dispatch: # allow manual trigger + +concurrency: + group: soak + cancel-in-progress: false # never cancel a running soak + +permissions: + contents: read + +env: + GO_VERSION: "1.25" + SOAK_DURATION: 86400 # 24 h in seconds + POLL_INTERVAL: 300 # scrape every 5 min + METRICS_PORT: 9090 + PPROF_PORT: 6060 + CSV: soak-results/metrics.csv + +jobs: + soak: + name: 24-hour soak + runs-on: ubuntu-22.04 + timeout-minutes: 1500 # 25 h hard ceiling + + steps: + # ── Checkout ──────────────────────────────────────────────────────── + - uses: actions/checkout@v6 + + - uses: actions/setup-go@v6 + with: + go-version: ${{ env.GO_VERSION }} + cache: true + + # ── System deps ───────────────────────────────────────────────────── + - name: Install system dependencies + run: | + sudo apt-get update -qq + sudo apt-get install -y --no-install-recommends \ + clang llvm libbpf-dev linux-headers-generic \ + linux-tools-common bpftool jq make bc + + # ── Build ─────────────────────────────────────────────────────────── + - name: Build kerno + run: make build + + - name: Grant BPF capabilities + run: | + sudo setcap \ + 'cap_bpf,cap_perfmon,cap_sys_ptrace,cap_sys_admin,cap_net_admin,cap_dac_read_search+ep' \ + ./bin/kerno + + # ── Prepare output dirs ───────────────────────────────────────────── + - name: Prepare output directories + run: | + mkdir -p soak-results/pprof + echo "ts_unix,rss_kb,goroutines,fds,bpf_maps,throughput_eps,doctor_p99_ms" \ + > ${{ env.CSV }} + + # ── Launch kerno ──────────────────────────────────────────────────── + - name: Start kerno daemon + run: | + ./bin/kerno start \ + --metrics-addr :${{ env.METRICS_PORT }} \ + --pprof-addr :${{ env.PPROF_PORT }} \ + --log-level info \ + > soak-results/kerno.log 2>&1 & + echo "KERNO_PID=$!" >> $GITHUB_ENV + sleep 5 + kill -0 $KERNO_PID || (echo "kerno failed to start"; cat soak-results/kerno.log; exit 1) + + # ── Launch chaos ──────────────────────────────────────────────────── + - name: Start chaos load + run: | + ./bin/kerno chaos \ + --induce cascade \ + --duration ${{ env.SOAK_DURATION }}s \ + > soak-results/chaos.log 2>&1 & + echo "CHAOS_PID=$!" >> $GITHUB_ENV + sleep 3 + + # ── Monitoring loop ───────────────────────────────────────────────── + - name: Run monitoring loop + run: | + bash scripts/soak-watch.sh \ + --duration ${{ env.SOAK_DURATION }} \ + --interval ${{ env.POLL_INTERVAL }} \ + --csv ${{ env.CSV }} \ + --pprof-port ${{ env.PPROF_PORT }} \ + --metrics-port ${{ env.METRICS_PORT }} \ + --pprof-dir soak-results/pprof \ + --pid ${{ env.KERNO_PID }} + + # ── Stop processes ─────────────────────────────────────────────────── + - name: Stop kerno and chaos + if: always() + run: | + kill ${{ env.KERNO_PID }} 2>/dev/null || true + kill ${{ env.CHAOS_PID }} 2>/dev/null || true + sleep 2 + + # ── Panic / Fatal check ────────────────────────────────────────────── + - name: Check for panics and fatals + run: | + echo "=== Scanning logs for panics/fatals ===" + PANIC_COUNT=$(grep -cEi 'panic|fatal|deadline exceeded' soak-results/kerno.log || true) + echo "panic/fatal count: $PANIC_COUNT" + if [ "$PANIC_COUNT" -gt 0 ]; then + echo "FAIL: $PANIC_COUNT panic/fatal/deadline-exceeded lines found" + grep -Ei 'panic|fatal|deadline exceeded' soak-results/kerno.log | tail -40 + exit 1 + fi + echo "PASS: No panics or fatals" + + # ── Assert pass criteria ───────────────────────────────────────────── + - name: Assert soak pass criteria + run: | + python3 - <<'PYEOF' + import csv, sys + + rows = [] + with open("${{ env.CSV }}") as f: + for r in csv.DictReader(f): + rows.append(r) + + if len(rows) < 13: # need at least hour-1 + hour-24 data points + print(f"Only {len(rows)} rows — soak may not have run long enough") + sys.exit(1) + + def v(row, key): + return float(row[key]) if row[key] not in ('', 'N/A') else None + + # Hour-1 baseline = row 12 (index 12, ~60 min at 5-min intervals) + baseline = rows[12] + final = rows[-1] + + failures = [] + + # RSS: final < 1.5x hour-1 + rss_base = v(baseline, 'rss_kb') + rss_final = v(final, 'rss_kb') + if rss_base and rss_final: + ratio = rss_final / rss_base + print(f"RSS ratio final/hour1: {ratio:.3f} (limit 1.5)") + if ratio >= 1.5: + failures.append(f"RSS leak: {rss_final:.0f} KB final vs {rss_base:.0f} KB at hour-1 (ratio {ratio:.2f})") + + # Goroutines: final <= hour-1 + 50 + gor_base = v(baseline, 'goroutines') + gor_final = v(final, 'goroutines') + if gor_base and gor_final: + delta = gor_final - gor_base + print(f"Goroutine delta: {delta:.0f} (limit +50)") + if delta > 50: + failures.append(f"Goroutine leak: {gor_final:.0f} final vs {gor_base:.0f} at hour-1 (delta {delta:.0f})") + + # FD: compare warm-up end (row 6, ~30 min) to final + warmup = rows[6] + fd_warm = v(warmup, 'fds') + fd_final = v(final, 'fds') + if fd_warm and fd_final: + fd_delta = fd_final - fd_warm + print(f"FD delta post-warmup: {fd_delta:.0f} (limit 50)") + if fd_delta > 50: + failures.append(f"FD leak: {fd_final:.0f} final vs {fd_warm:.0f} post-warmup (delta {fd_delta:.0f})") + + # Throughput stability: stdev/mean <= 0.20 across all rows + tps = [v(r, 'throughput_eps') for r in rows if v(r, 'throughput_eps') is not None] + if len(tps) > 5: + mean = sum(tps) / len(tps) + variance = sum((x - mean)**2 for x in tps) / len(tps) + stdev = variance ** 0.5 + cv = stdev / mean if mean > 0 else 0 + print(f"Throughput CV: {cv:.3f} (limit 0.20), mean={mean:.1f} eps") + if cv > 0.20: + failures.append(f"Throughput unstable: CV={cv:.3f} > 0.20") + + if failures: + print("\nSOAK FAILED:") + for f in failures: + print(f" FAIL: {f}") + sys.exit(1) + + print("\nSOAK PASSED: All criteria met.") + PYEOF + + # ── Upload artifacts ───────────────────────────────────────────────── + - name: Upload soak artifacts + if: always() + uses: actions/upload-artifact@v4 + with: + name: soak-report-${{ github.run_id }} + retention-days: 30 + path: | + soak-results/metrics.csv + soak-results/kerno.log + soak-results/chaos.log + soak-results/pprof/ diff --git a/README.md b/README.md index bc1ece7..3364fe1 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -
+
# KERNO @@ -10,13 +10,14 @@ Same single binary runs on bare metal, VMs, EC2, GCE - wherever Linux lives. [![CI](https://github.com/optiqor/kerno/actions/workflows/ci.yml/badge.svg)](https://github.com/optiqor/kerno/actions/workflows/ci.yml) +[![Soak](https://github.com/optiqor/kerno/actions/workflows/soak.yml/badge.svg)](https://github.com/optiqor/kerno/actions/workflows/soak.yml) [![Go Report Card](https://goreportcard.com/badge/github.com/optiqor/kerno)](https://goreportcard.com/report/github.com/optiqor/kerno) [![License: Apache 2.0](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](LICENSE) [![Release](https://img.shields.io/github/v/release/optiqor/kerno?include_prereleases)](https://github.com/optiqor/kerno/releases) [![GHCR](https://img.shields.io/badge/ghcr.io-optiqor%2Fkerno-blue?logo=docker)](https://github.com/optiqor/kerno/pkgs/container/kerno) ![Go Version](https://img.shields.io/github/go-mod/go-version/optiqor/kerno) -[**Quick Start**](#quick-start) · [**How It Works**](#how-it-works) · [**Features**](#features) · [**Kubernetes**](#kubernetes-deployment) · [**Docs**](docs/architecture.md) +[**Quick Start**](#quick-start) · [**How It Works**](#how-it-works) · [**Features**](#features) · [**Kubernetes**](#kubernetes-deployment) · [**Docs**](docs/architecture.md) kerno doctor demo @@ -61,8 +62,8 @@ flowchart TB end subgraph Tools["WHO WATCHES WHAT"] - APM["Datadog · New Relic
Prometheus · Grafana"] - CRun["Pixie · Tetragon
Inspektor Gadget"] + APM["Datadog · New Relic
Prometheus · Grafana"] + CRun["Pixie · Tetragon
Inspektor Gadget"] Kerno["KERNO
eBPF kernel tracing"] Bare["(nobody)"] end @@ -115,7 +116,7 @@ Kerno is the only eBPF tool in the Kubernetes ecosystem that produces a ranked, > **Requires:** kernel **5.8+** with BTF (every major managed K8s qualifies: EKS, GKE, AKS, DOKS, Linode, Civo). For raw manifests/Helm you'll need cluster-admin. -### 1 · Kubernetes (primary) +### 1 · Kubernetes (primary) ```bash helm install kerno ./deploy/helm/kerno \ @@ -140,7 +141,7 @@ ServiceMonitor for the Prometheus Operator is built-in. Raw manifests live at [` --- -### 2 · Bare metal · VMs · EC2 · GCE +### 2 · Bare metal · VMs · EC2 · GCE The same binary, the same command. No Kubernetes required. @@ -156,7 +157,7 @@ curl -sfL https://raw.githubusercontent.com/optiqor/kerno/main/scripts/install.s journalctl -u kerno -f ``` -### 3 · Docker (ad-hoc, any host with a privileged daemon) +### 3 · Docker (ad-hoc, any host with a privileged daemon) ```bash docker run --rm --privileged --pid=host \ @@ -223,7 +224,7 @@ Kerno tags every finding with pod, namespace, node, and workload labels. No `cli | `/sys/kernel/debug` | tracepoints, kprobes | | `/sys/kernel/btf` | CO-RE type resolution | | `/sys/fs/bpf` | BPF map pinning | -| `/proc` | PID → cgroup → pod resolution | +| `/proc` | PID → cgroup → pod resolution | | `/sys/fs/cgroup` | container resource accounting | | `/sys/class/net` | per-interface TCP counters | | `/sys/block` | per-device disk stats | @@ -316,7 +317,7 @@ Kerno runs as a lightweight Go agent with six tiny eBPF programs attached to sta ```mermaid flowchart TB - subgraph Kernel["KERNEL SPACE · eBPF Programs"] + subgraph Kernel["KERNEL SPACE · eBPF Programs"] direction LR P1["syscall
latency"] P2["tcp
monitor"] @@ -328,12 +329,12 @@ flowchart TB RB[("Ring Buffers
256KB per program
zero-copy mmap")] - subgraph UserSpace["USER SPACE · Go"] + subgraph UserSpace["USER SPACE · Go"] direction TB Loader["BPF Loaders
cilium/ebpf"] Collector["Collectors
percentile aggregation"] Signals[("Signals Snapshot
single source of truth")] - Adapter["Environment Adapter
k8s · systemd · bare metal"] + Adapter["Environment Adapter
k8s · systemd · bare metal"] end subgraph Outputs["OUTPUTS"] @@ -341,7 +342,7 @@ flowchart TB Doctor["Doctor Engine
11 diagnostic rules"] AI["AI Layer (optional)
root cause analysis"] Prom["Prometheus
/metrics :9090"] - CLI["Terminal
pretty · JSON"] + CLI["Terminal
pretty · JSON"] end P1 & P2 & P3 & P4 & P5 & P6 --> RB @@ -465,7 +466,7 @@ kubectl -n kerno-system exec ds/kerno -- kerno trace disk --process postgres --o kubectl -n kerno-system exec ds/kerno -- kerno trace sched --threshold 10ms ``` -### Continuous monitoring - "alert me when…" +### Continuous monitoring - "alert me when…" ```bash # TCP connections with retransmits @@ -515,9 +516,9 @@ Health endpoints: `/healthz` and `/readyz` return JSON status. **Environment auto-detection.** Kerno picks one of three adapters and enriches every event - no configuration required: -- **Kubernetes** (in-cluster token present) → pod, namespace, node, deployment -- **Systemd** (PID 1 is systemd) → unit, slice, scope -- **Bare metal** → hostname, cgroup path +- **Kubernetes** (in-cluster token present) → pod, namespace, node, deployment +- **Systemd** (PID 1 is systemd) → unit, slice, scope +- **Bare metal** → hostname, cgroup path **AI (optional).** The AI layer runs **after** the deterministic rule engine - it correlates cross-signals and explains root causes, it never replaces rules. Three providers (**Anthropic**, **OpenAI**, **Ollama** for air-gapped), three privacy modes (`full` / `redacted` / `summary`), TTL cache + token-bucket rate limiting, graceful fallback to a deterministic template on failure. No LLM SDK dependencies - pure `net/http`. @@ -654,6 +655,7 @@ Apache License 2.0 - see [LICENSE](LICENSE). --- -If Kerno saved your on-call shift, consider leaving a **⭐** it helps other engineers find the project. +If Kerno saved your on-call shift, consider leaving a **⭐** it helps other engineers find the project.
+ diff --git a/docs/enterprise.md b/docs/enterprise.md new file mode 100644 index 0000000..e4f1247 --- /dev/null +++ b/docs/enterprise.md @@ -0,0 +1,212 @@ +# Enterprise Deployments + +This page describes how to run Kerno's AI features in regulated or corporate +environments where outbound HTTPS passes through an authenticating proxy and/or +a MITM device that re-signs traffic with a private root CA. + +--- + +## Why this matters + +Kerno's AI features (`kerno doctor --ai`, `kerno explain`) call external APIs +(Anthropic, OpenAI) or an internal Ollama instance over HTTPS. In a typical +Fortune 500 / regulated deployment every outbound HTTPS connection is routed +through a corporate proxy that: + +1. Intercepts the TLS handshake. +2. Re-signs the server certificate with an internal root CA. +3. Forwards the (decrypted) request. + +The OS trust store knows about that internal CA, so `curl`, browsers, and most +other tools work fine. Go binaries that don't explicitly load extra CAs **do +not** inherit the OS store on all platforms – they get a +`certificate signed by unknown authority` error and silently fail. + +Kerno's AI HTTP client is built to handle this correctly. + +--- + +## Configuration + +All settings live under the `ai:` key in `config.yaml` (or as `KERNO_*` env +vars). + +```yaml +ai: + enabled: true + provider: anthropic # anthropic | openai | ollama + + # ── Proxy ──────────────────────────────────────────────────────────────── + # Optional. When set, ALL AI provider traffic goes through this proxy. + # If blank, the standard HTTPS_PROXY / HTTP_PROXY / NO_PROXY environment + # variables are honoured automatically (Go default behaviour). + proxy: http://corp-proxy.internal:8080 + + # ── Corporate CA ───────────────────────────────────────────────────────── + # Path to a PEM-encoded CA certificate or bundle that will be APPENDED to + # the system root CA pool. Your system roots are never replaced. + # Typical paths: + # Linux: /etc/kerno/corp-ca.crt or /etc/ssl/certs/corp-ca.crt + # macOS: /Library/Keychains/System.keychain (export first) + ca_cert_file: /etc/kerno/corp-ca.crt + + # ── TLS ────────────────────────────────────────────────────────────────── + # NEVER set this to true in production. For local dev only. + insecure_skip_verify: false + + # ── Timeout ────────────────────────────────────────────────────────────── + timeout: 30s +``` + +### Environment-variable equivalents + +| Config field | Env var | +|---------------------------|----------------------------------| +| `ai.proxy` | `KERNO_AI_PROXY` | +| `ai.ca_cert_file` | `KERNO_AI_CA_CERT_FILE` | +| `ai.insecure_skip_verify` | `KERNO_AI_INSECURE_SKIP_VERIFY` | +| `ai.timeout` | `KERNO_AI_TIMEOUT` | + +Precedence: CLI flags > env vars > config file > defaults. + +--- + +## Scenarios + +### Scenario 1 – HTTPS_PROXY only (no custom CA) + +Your proxy is trusted by the OS already (e.g. it forwards without MITM): + +```bash +export HTTPS_PROXY=http://corp-proxy.internal:8080 +kerno doctor --ai +``` + +No config change needed. Go's default transport reads `HTTPS_PROXY` +automatically. + +--- + +### Scenario 2 – MITM proxy with a corporate root CA + +The proxy re-signs traffic. You need to give Kerno the CA: + +```bash +# Export the corporate root CA (ask your IT team): +# e.g. on Linux it's often at /usr/local/share/ca-certificates/corp.crt + +# Option A – config file +cat > /etc/kerno/config.yaml <<'EOF' +ai: + proxy: http://corp-proxy.internal:8080 + ca_cert_file: /etc/kerno/corp-ca.crt +EOF + +kerno doctor --ai + +# Option B – env vars only (no file) +KERNO_AI_PROXY=http://corp-proxy.internal:8080 \ +KERNO_AI_CA_CERT_FILE=/etc/kerno/corp-ca.crt \ +kerno doctor --ai +``` + +--- + +### Scenario 3 – Air-gapped with Ollama + +No internet access; Ollama runs inside the cluster behind a TLS-terminating +reverse proxy signed by your internal CA: + +```yaml +ai: + enabled: true + provider: ollama + model: llama3 + ca_cert_file: /etc/kerno/corp-ca.crt + # no proxy needed – Ollama is reachable directly +``` + +--- + +### Scenario 4 – Kubernetes DaemonSet + +Mount the corporate CA as a Secret and reference it in your Helm values: + +```yaml +# values.yaml +extraVolumes: + - name: corp-ca + secret: + secretName: corp-ca-bundle + +extraVolumeMounts: + - name: corp-ca + mountPath: /etc/kerno/certs + readOnly: true + +env: + - name: KERNO_AI_CA_CERT_FILE + value: /etc/kerno/certs/corp-ca.crt + - name: KERNO_AI_PROXY + value: http://corp-proxy.internal:8080 + - name: KERNO_AI_API_KEY + valueFrom: + secretKeyRef: + name: kerno-ai + key: api_key +``` + +--- + +## Verifying proxy routing + +Run `mitmproxy` in a sidecar or on the proxy host, then: + +```bash +HTTPS_PROXY=http://localhost:8080 kerno doctor --ai +``` + +You should see the request to `api.anthropic.com` appear in the mitmproxy UI. + +--- + +## Reading TLS errors + +When TLS verification fails, Kerno prints an actionable message: + +``` +kerno/ai: TLS certificate verification failed for api.anthropic.com + + Certificate subject : CN=api.anthropic.com,O=Anthropic + Issuer : CN=CorpProxy CA,O=Acme Corp + Valid until : 2026-01-01T00:00:00Z + + This usually means traffic is being inspected by a corporate MITM proxy + whose root CA is not trusted by this binary. + + To fix: + 1. Set config.ai.ca_cert_file to the path of your corporate CA bundle + (e.g. ca_cert_file: /etc/kerno/corp-ca.crt) + 2. OR export HTTPS_PROXY to route via a proxy that your OS trusts. + 3. See docs: https://github.com/optiqor/kerno/blob/main/docs/enterprise.md +``` + +The cert subject and issuer tell you which CA signed the intercepted +certificate. Give that CA's PEM file to `ca_cert_file`. + +--- + +## Obtaining your corporate CA certificate + +```bash +# Linux (if your IT team has installed it in the OS trust store) +ls /usr/local/share/ca-certificates/ # Debian/Ubuntu +ls /etc/pki/ca-trust/source/anchors/ # RHEL/Fedora + +# Extract from the proxy itself using openssl +openssl s_client -connect api.anthropic.com:443 \ + -proxy corp-proxy.internal:8080 \ + -showcerts /dev/null \ + | awk '/BEGIN CERT/,/END CERT/' > /tmp/chain.pem +# The last cert in chain.pem is usually the root CA. +``` diff --git a/docs/soak.md b/docs/soak.md new file mode 100644 index 0000000..27bbba2 --- /dev/null +++ b/docs/soak.md @@ -0,0 +1,122 @@ +# Soak Test + +A 24-hour continuous load test that catches slow resource leaks before they reach production. + +## Why + +Unit tests, race tests, and 5-minute integration tests cannot expose: + +- Goroutine leaks where spawn-rate exceeds GC cleanup +- Handle leaks from pinned BPF maps or accumulating ringbuf readers +- Histogram allocation cliffs under high label cardinality +- BPF map fragmentation under sustained churn + +These fail at hour 14 in production. The soak finds them in CI first. + +## How It Works + +The nightly workflow (`.github/workflows/soak.yml`) runs on `ubuntu-22.04` and: + +1. Builds and installs kerno with `make build` and `setcap` +2. Launches `kerno start` with metrics and pprof endpoints exposed +3. Launches `kerno chaos --induce cascade --duration 86400s` in the background +4. Every 5 minutes: scrapes RSS, goroutine count, open FDs, pinned BPF maps, event throughput, and doctor p99 latency into a CSV +5. Saves heap and goroutine pprof snapshots at hours 1, 6, 12, 18, and 24 +6. At the end: asserts all pass criteria and uploads the full artifact + +## Pass Criteria + +| Metric | Limit | +|---|---| +| RSS at hour 24 | < 1.5× RSS at hour 1 | +| Goroutine count at hour 24 | ≤ goroutine count at hour 1 + 50 | +| Open FDs post-warmup | flat (delta ≤ 50) | +| Panics / Fatals | zero | +| Throughput stability | CV ≤ 0.20 (±20% across the run) | + +## Running Locally + +```bash +# Build kerno +make build +sudo setcap 'cap_bpf,cap_perfmon,cap_sys_ptrace,cap_sys_admin,cap_net_admin,cap_dac_read_search+ep' ./bin/kerno + +# Prepare output dir +mkdir -p soak-results/pprof + +# Start kerno +./bin/kerno start --metrics-addr :9090 --pprof-addr :6060 & +KERNO_PID=$! + +# Start chaos (reduce duration for local testing) +./bin/kerno chaos --induce cascade --duration 3600s & + +# Run the watcher (1 hour, 5-min intervals) +bash scripts/soak-watch.sh \ + --duration 3600 \ + --interval 300 \ + --csv soak-results/metrics.csv \ + --pprof-port 6060 \ + --metrics-port 9090 \ + --pprof-dir soak-results/pprof \ + --pid $KERNO_PID +``` + +For a quick smoke test use `--duration 600 --interval 60` (10 minutes, 1-minute intervals). + +## Interpreting a Failed Run + +Download the artifact from the GitHub Actions run. It contains: + +| File | Contents | +|---|---| +| `metrics.csv` | Full time-series of all scraped metrics | +| `kerno.log` | Full daemon log including any panics | +| `chaos.log` | Chaos injector log | +| `pprof/heap_Xs.pb.gz` | Heap profile at checkpoint X seconds | +| `pprof/goroutine_Xs.pb.gz` | Goroutine dump at checkpoint X seconds | + +### RSS leak + +```bash +# Compare heap profiles between hour 1 and hour 24 +go tool pprof -diff_base soak-results/pprof/heap_3600s.pb.gz \ + soak-results/pprof/heap_86400s.pb.gz +``` + +Look for allocations that grew between checkpoints. + +### Goroutine leak + +```bash +# View goroutine dump at hour 24 +go tool pprof soak-results/pprof/goroutine_86400s.pb.gz +(pprof) top +(pprof) traces +``` + +Look for goroutines blocked on channels or waiting in the same function across all dumps. + +### FD leak + +```bash +# Check CSV for FD column trend +awk -F, 'NR>1 {print $1, $5}' soak-results/metrics.csv | \ + awk '{printf "%s min FDs=%s\n", int(($1-start)/60), $2; if(NR==1) start=$1}' +``` + +A steadily rising FD count after the 30-minute warmup indicates a handle leak. + +## Nightly Schedule + +The workflow runs at **02:00 UTC daily** via cron. Runtime is approximately 24 hours and 20 minutes including setup and assertion steps. + +GitHub Actions free tier provides 2000 minutes/month for private repos and unlimited for public repos. At ~1460 minutes per run, one nightly soak on a public repo fits within limits. + +## Soak Badge + +Add to `README.md`: + +```markdown +[![Soak](https://github.com/optiqor/kerno/actions/workflows/soak.yml/badge.svg)](https://github.com/optiqor/kerno/actions/workflows/soak.yml) +``` diff --git a/internal/ai/anthropic.go b/internal/ai/anthropic.go index e442504..b3674a8 100644 --- a/internal/ai/anthropic.go +++ b/internal/ai/anthropic.go @@ -1,6 +1,7 @@ -// Copyright 2026 Optiqor contributors -// SPDX-License-Identifier: Apache-2.0 - +// Package ai — Anthropic provider integration. +// +// Change from the original: replace the inline &http.Client{} with the shared +// client returned by NewHTTPClient so that proxy and CA settings apply. package ai import ( @@ -10,153 +11,125 @@ import ( "fmt" "io" "net/http" + + "github.com/optiqor/kerno/internal/config" ) const ( - anthropicDefaultEndpoint = "https://api.anthropic.com" - anthropicDefaultModel = "claude-sonnet-4-20250514" - anthropicAPIVersion = "2023-06-01" + anthropicAPIURL = "https://api.anthropic.com/v1/messages" + anthropicAPIVersion = "2023-06-01" + anthropicModel = "claude-opus-4-5" ) -// AnthropicProvider implements Provider using the Anthropic Messages API. -// No SDK dependency — raw net/http + encoding/json. -type AnthropicProvider struct { - endpoint string - apiKey string - model string - maxTokens int - temperature float64 - client *http.Client +// AnthropicClient sends AI requests to the Anthropic Messages API. +type AnthropicClient struct { + apiKey string + model string + http *http.Client // shared enterprise-aware client } -// NewAnthropicProvider creates a provider for Anthropic Claude. -func NewAnthropicProvider(cfg ProviderConfig) *AnthropicProvider { - endpoint := cfg.Endpoint - if endpoint == "" { - endpoint = anthropicDefaultEndpoint +// NewAnthropicClient constructs a ready-to-use AnthropicClient. +// The *http.Client is built once via NewHTTPClient so proxy + CA settings +// from config are automatically honoured. +func NewAnthropicClient(cfg *config.Config) (*AnthropicClient, error) { + httpClient, err := NewHTTPClient(cfg) + if err != nil { + return nil, fmt.Errorf("anthropic: %w", err) } - model := cfg.Model + + model := cfg.AI.Model if model == "" { - model = anthropicDefaultModel - } - maxTokens := cfg.MaxTokens - if maxTokens == 0 { - maxTokens = 1024 - } - temp := cfg.Temperature - if temp == 0 { - temp = 0.2 + model = anthropicModel } - return &AnthropicProvider{ - endpoint: endpoint, - apiKey: cfg.APIKey, - model: model, - maxTokens: maxTokens, - temperature: temp, - client: &http.Client{}, + apiKey := cfg.AI.APIKey + if apiKey == "" { + // Fallback handled at call-site or via env var wrapper in the caller. + return nil, fmt.Errorf("anthropic: api_key is required (set config.ai.api_key or KERNO_AI_API_KEY)") } + + return &AnthropicClient{ + apiKey: apiKey, + model: model, + http: httpClient, + }, nil } -func (p *AnthropicProvider) Name() string { return "anthropic" } +// --------------------------------------------------------------------------- +// Request / response types (minimal – only what Kerno uses) +// --------------------------------------------------------------------------- -func (p *AnthropicProvider) Complete(ctx context.Context, req CompletionRequest) (*CompletionResponse, error) { - maxTokens := req.MaxTokens - if maxTokens == 0 { - maxTokens = p.maxTokens - } - temp := req.Temperature - if temp == 0 { - temp = p.temperature - } +type anthropicRequest struct { + Model string `json:"model"` + MaxTokens int `json:"max_tokens"` + Messages []anthropicMessage `json:"messages"` +} + +type anthropicMessage struct { + Role string `json:"role"` + Content string `json:"content"` +} - body := anthropicRequest{ - Model: p.model, - MaxTokens: maxTokens, - Temperature: temp, - System: req.SystemPrompt, - Messages: []anthropicMessage{ - {Role: "user", Content: req.UserPrompt}, - }, +type anthropicResponse struct { + Content []struct { + Type string `json:"type"` + Text string `json:"text"` + } `json:"content"` + Error *struct { + Type string `json:"type"` + Message string `json:"message"` + } `json:"error,omitempty"` +} + +// Complete sends a single-turn completion request and returns the assistant text. +func (c *AnthropicClient) Complete(ctx context.Context, prompt string) (string, error) { + reqBody := anthropicRequest{ + Model: c.model, + MaxTokens: 2048, + Messages: []anthropicMessage{{Role: "user", Content: prompt}}, } - payload, err := json.Marshal(body) + data, err := json.Marshal(reqBody) if err != nil { - return nil, fmt.Errorf("marshaling request: %w", err) + return "", fmt.Errorf("anthropic: marshal request: %w", err) } - httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, p.endpoint+"/v1/messages", bytes.NewReader(payload)) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, anthropicAPIURL, bytes.NewReader(data)) if err != nil { - return nil, fmt.Errorf("creating request: %w", err) + return "", fmt.Errorf("anthropic: build request: %w", err) } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("x-api-key", c.apiKey) + req.Header.Set("anthropic-version", anthropicAPIVersion) - httpReq.Header.Set("Content-Type", "application/json") - httpReq.Header.Set("x-api-key", p.apiKey) - httpReq.Header.Set("anthropic-version", anthropicAPIVersion) - - resp, err := p.client.Do(httpReq) + // Use the shared enterprise-aware client (proxy + CA already configured). + resp, err := c.http.Do(req) if err != nil { - return nil, fmt.Errorf("anthropic API call failed: %w", err) + return "", fmt.Errorf("anthropic: HTTP request: %w", err) } defer resp.Body.Close() - respBody, err := io.ReadAll(resp.Body) + body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) if err != nil { - return nil, fmt.Errorf("reading response: %w", err) + return "", fmt.Errorf("anthropic: read response: %w", err) } - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("anthropic API error (status %d): %s", resp.StatusCode, string(respBody)) + var apiResp anthropicResponse + if err := json.Unmarshal(body, &apiResp); err != nil { + return "", fmt.Errorf("anthropic: decode response (status %d): %w", resp.StatusCode, err) } - var result anthropicResponse - if err := json.Unmarshal(respBody, &result); err != nil { - return nil, fmt.Errorf("parsing response: %w", err) + if apiResp.Error != nil { + return "", fmt.Errorf("anthropic API error %s: %s", apiResp.Error.Type, apiResp.Error.Message) } - text := "" - for _, block := range result.Content { - if block.Type == "text" { - text += block.Text - } + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("anthropic: unexpected status %d: %s", resp.StatusCode, string(body)) } - tokensUsed := result.Usage.InputTokens + result.Usage.OutputTokens - - return &CompletionResponse{ - Text: text, - TokensUsed: tokensUsed, - Model: result.Model, - }, nil -} - -// Anthropic API types — minimal, only what we need. - -type anthropicRequest struct { - Model string `json:"model"` - MaxTokens int `json:"max_tokens"` - Temperature float64 `json:"temperature"` - System string `json:"system,omitempty"` - Messages []anthropicMessage `json:"messages"` -} - -type anthropicMessage struct { - Role string `json:"role"` - Content string `json:"content"` -} - -type anthropicResponse struct { - Content []anthropicContentBlock `json:"content"` - Model string `json:"model"` - Usage anthropicUsage `json:"usage"` -} - -type anthropicContentBlock struct { - Type string `json:"type"` - Text string `json:"text"` -} + if len(apiResp.Content) == 0 { + return "", fmt.Errorf("anthropic: empty content in response") + } -type anthropicUsage struct { - InputTokens int `json:"input_tokens"` - OutputTokens int `json:"output_tokens"` + return apiResp.Content[0].Text, nil } diff --git a/internal/ai/http_client.go b/internal/ai/http_client.go new file mode 100644 index 0000000..f3dd112 --- /dev/null +++ b/internal/ai/http_client.go @@ -0,0 +1,205 @@ +// Package ai provides AI provider integrations for Kerno. +// This file builds a shared *http.Client that honours: +// +// - HTTPS_PROXY / HTTP_PROXY / NO_PROXY env vars (Go default transport already does this) +// - An explicit per-provider proxy URL from config (overrides env) +// - Extra CA certificates loaded from the system pool + a configurable file +// - A pretty, actionable error message on TLS verification failure +// +// Nothing in this file changes the global http.DefaultTransport; callers get +// a fresh client scoped to the AI subsystem. +package ai + +import ( + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "net" + "net/http" + "net/url" + "os" + "strings" + "time" + + "github.com/optiqor/kerno/internal/config" +) + +// NewHTTPClient returns an *http.Client configured for Kerno AI providers. +// +// Behaviour, in priority order: +// 1. If cfg.AI.Proxy is set, that URL is used as the CONNECT proxy. +// Otherwise the standard HTTPS_PROXY / HTTP_PROXY / NO_PROXY env vars +// are honoured (Go's http.ProxyFromEnvironment, which is the default). +// 2. The system CA pool is loaded first. If cfg.AI.CACertFile is non-empty, +// that PEM file is appended to the pool without replacing system roots. +// 3. If cfg.AI.InsecureSkipVerify is true a warning is logged and TLS +// verification is disabled. This must never be set in production. +// 4. Timeout defaults to 30 s; cfg.AI.Timeout overrides it. +func NewHTTPClient(cfg *config.Config) (*http.Client, error) { + timeout := 30 * time.Second + if cfg.AI.Timeout > 0 { + timeout = cfg.AI.Timeout + } + + tlsCfg, err := buildTLSConfig(cfg) + if err != nil { + return nil, fmt.Errorf("ai: building TLS config: %w", err) + } + + transport := &http.Transport{ + // Keep the defaults that make Go's transport production-ready. + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + ForceAttemptHTTP2: true, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + + TLSClientConfig: tlsCfg, + + // Proxy selection: config-level override wins, falls back to env. + Proxy: buildProxyFunc(cfg.AI.Proxy), + } + + return &http.Client{ + Timeout: timeout, + Transport: &tlsErrorTransport{wrapped: transport}, + }, nil +} + +// buildTLSConfig assembles the *tls.Config for the AI HTTP client. +func buildTLSConfig(cfg *config.Config) (*tls.Config, error) { + if cfg.AI.InsecureSkipVerify { + // Loud warning – this path must never be reached in production. + fmt.Fprintln(os.Stderr, + "[kerno/ai] WARNING: insecure_skip_verify=true — TLS verification is DISABLED. "+ + "Do not use this in production.") + return &tls.Config{InsecureSkipVerify: true}, nil //nolint:gosec // intentional, guarded by config + } + + pool, err := x509.SystemCertPool() + if err != nil { + // SystemCertPool can fail on some minimal container images. + // Fall back to an empty pool and rely on the extra file. + pool = x509.NewCertPool() + } + + if cfg.AI.CACertFile != "" { + pem, err := os.ReadFile(cfg.AI.CACertFile) + if err != nil { + return nil, fmt.Errorf("reading ca_cert_file %q: %w", cfg.AI.CACertFile, err) + } + if !pool.AppendCertsFromPEM(pem) { + return nil, fmt.Errorf( + "ca_cert_file %q contained no valid PEM certificates — "+ + "verify it is a PEM-encoded CA bundle (not DER/PKCS12)", + cfg.AI.CACertFile, + ) + } + } + + return &tls.Config{ + RootCAs: pool, + MinVersion: tls.VersionTLS12, + }, nil +} + +// buildProxyFunc returns an http.Transport-compatible proxy function. +// +// - If proxyURL is empty, it falls back to http.ProxyFromEnvironment so that +// HTTPS_PROXY / HTTP_PROXY / NO_PROXY continue to work out of the box. +// - If proxyURL is set, that single URL is used for every request. +func buildProxyFunc(proxyURL string) func(*http.Request) (*url.URL, error) { + if proxyURL == "" { + return http.ProxyFromEnvironment + } + + parsed, err := url.Parse(proxyURL) + if err != nil { + // Return a function that surfaces the parse error at request time so + // that the binary still starts up; the error will be visible on the + // first AI call. + return func(*http.Request) (*url.URL, error) { + return nil, fmt.Errorf("ai: invalid proxy URL %q in config: %w", proxyURL, err) + } + } + + return func(*http.Request) (*url.URL, error) { + return parsed, nil + } +} + +// --------------------------------------------------------------------------- +// Pretty TLS error transport +// --------------------------------------------------------------------------- + +// tlsErrorTransport wraps an http.RoundTripper and converts opaque TLS +// certificate-verification errors into actionable, human-readable messages +// that include the certificate subject and a pointer to the enterprise-CA +// configuration documentation. +type tlsErrorTransport struct { + wrapped http.RoundTripper +} + +func (t *tlsErrorTransport) RoundTrip(req *http.Request) (*http.Response, error) { + resp, err := t.wrapped.RoundTrip(req) + if err == nil { + return resp, nil + } + + if isTLSError(err) { + return nil, buildTLSError(req, err) + } + + return nil, err +} + +// isTLSError returns true for the subset of errors that originate from TLS +// certificate verification. +func isTLSError(err error) bool { + var certErr *tls.CertificateVerificationError + if errors.As(err, &certErr) { + return true + } + + // Fallback heuristic for older Go versions / wrapped errors. + msg := err.Error() + return strings.Contains(msg, "certificate") && + (strings.Contains(msg, "signed by unknown authority") || + strings.Contains(msg, "certificate is not trusted") || + strings.Contains(msg, "x509:")) +} + +// buildTLSError constructs the pretty error message. +func buildTLSError(req *http.Request, underlying error) error { + var certErr *tls.CertificateVerificationError + var certInfo string + if errors.As(underlying, &certErr) && len(certErr.UnverifiedCertificates) > 0 { + leaf := certErr.UnverifiedCertificates[0] + certInfo = fmt.Sprintf( + "\n Certificate subject : %s"+ + "\n Issuer : %s"+ + "\n Valid until : %s", + leaf.Subject.String(), + leaf.Issuer.String(), + leaf.NotAfter.Format(time.RFC3339), + ) + } + + return fmt.Errorf( + "kerno/ai: TLS certificate verification failed for %s%s\n\n"+ + " This usually means traffic is being inspected by a corporate MITM proxy\n"+ + " whose root CA is not trusted by this binary.\n\n"+ + " To fix:\n"+ + " 1. Set config.ai.ca_cert_file to the path of your corporate CA bundle\n"+ + " (e.g. ca_cert_file: /etc/kerno/corp-ca.crt)\n"+ + " 2. OR export HTTPS_PROXY to route via a proxy that your OS trusts.\n"+ + " 3. See docs: https://github.com/optiqor/kerno/blob/main/docs/enterprise.md\n\n"+ + " Underlying error: %w", + req.URL.Host, certInfo, underlying, + ) +} diff --git a/internal/ai/http_client_test.go b/internal/ai/http_client_test.go new file mode 100644 index 0000000..02a2bfa --- /dev/null +++ b/internal/ai/http_client_test.go @@ -0,0 +1,280 @@ +package ai + +import ( + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "math/big" + "net" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/optiqor/kerno/internal/config" +) + +// --------------------------------------------------------------------------- +// Helpers – generate a self-signed CA and an issued leaf certificate +// --------------------------------------------------------------------------- + +type testCA struct { + certDER []byte + cert *x509.Certificate + key *ecdsa.PrivateKey +} + +func newTestCA(t *testing.T) *testCA { + t.Helper() + key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + t.Fatalf("generate CA key: %v", err) + } + tmpl := &x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{CommonName: "Kerno Test CA"}, + NotBefore: time.Now().Add(-time.Hour), + NotAfter: time.Now().Add(24 * time.Hour), + IsCA: true, + BasicConstraintsValid: true, + KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageCRLSign, + } + der, err := x509.CreateCertificate(rand.Reader, tmpl, tmpl, &key.PublicKey, key) + if err != nil { + t.Fatalf("create CA cert: %v", err) + } + cert, err := x509.ParseCertificate(der) + if err != nil { + t.Fatalf("parse CA cert: %v", err) + } + return &testCA{certDER: der, cert: cert, key: key} +} + +func (ca *testCA) pemBytes() []byte { + return pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: ca.certDER}) +} + +func (ca *testCA) issueTLSCert(t *testing.T, dnsName string, ipAddrs []net.IP) tls.Certificate { + t.Helper() + leafKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + t.Fatalf("generate leaf key: %v", err) + } + tmpl := &x509.Certificate{ + SerialNumber: big.NewInt(2), + Subject: pkix.Name{CommonName: dnsName}, + DNSNames: []string{dnsName}, + IPAddresses: ipAddrs, + NotBefore: time.Now().Add(-time.Hour), + NotAfter: time.Now().Add(24 * time.Hour), + KeyUsage: x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + } + der, err := x509.CreateCertificate(rand.Reader, tmpl, ca.cert, &leafKey.PublicKey, ca.key) + if err != nil { + t.Fatalf("create leaf cert: %v", err) + } + keyDER, err := x509.MarshalECPrivateKey(leafKey) + if err != nil { + t.Fatalf("marshal leaf key: %v", err) + } + certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: der}) + keyPEM := pem.EncodeToMemory(&pem.Block{Type: "EC PRIVATE KEY", Bytes: keyDER}) + tlsCert, err := tls.X509KeyPair(certPEM, keyPEM) + if err != nil { + t.Fatalf("load leaf TLS cert: %v", err) + } + return tlsCert +} + +// writeTempPEM writes PEM bytes to a temp file and returns its path. +func writeTempPEM(t *testing.T, pemBytes []byte) string { + t.Helper() + f := filepath.Join(t.TempDir(), "ca.crt") + if err := os.WriteFile(f, pemBytes, 0o600); err != nil { + t.Fatalf("write temp CA file: %v", err) + } + return f +} + +// newTLSServerWithCA starts an httptest.Server using a certificate issued by +// the given CA. The cert carries an IP SAN for 127.0.0.1 so that clients +// connecting to the loopback address pass hostname verification. +func newTLSServerWithCA(t *testing.T, ca *testCA) *httptest.Server { + t.Helper() + leafCert := ca.issueTLSCert(t, "localhost", []net.IP{net.ParseIP("127.0.0.1")}) + srv := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + })) + srv.TLS = &tls.Config{Certificates: []tls.Certificate{leafCert}} + srv.StartTLS() + t.Cleanup(srv.Close) + return srv +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +// TestNewHTTPClient_CustomCA verifies that a client built with ca_cert_file +// pointing at our test CA can reach the TLS server it signed. +func TestNewHTTPClient_CustomCA(t *testing.T) { + ca := newTestCA(t) + srv := newTLSServerWithCA(t, ca) + caFile := writeTempPEM(t, ca.pemBytes()) + + cfg := &config.Config{} + cfg.AI.CACertFile = caFile + cfg.AI.Timeout = 5 * time.Second + + client, err := NewHTTPClient(cfg) + if err != nil { + t.Fatalf("NewHTTPClient: %v", err) + } + + resp, err := client.Get(srv.URL) + if err != nil { + t.Fatalf("GET %s: %v", srv.URL, err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Errorf("expected 200, got %d", resp.StatusCode) + } +} + +// TestNewHTTPClient_WrongCA verifies that using the wrong CA produces an +// actionable error message that mentions the ca_cert_file config key. +func TestNewHTTPClient_WrongCA(t *testing.T) { + serverCA := newTestCA(t) + wrongCA := newTestCA(t) + + srv := newTLSServerWithCA(t, serverCA) + + // Build a client that trusts the wrong CA. + wrongCAFile := writeTempPEM(t, wrongCA.pemBytes()) + cfg := &config.Config{} + cfg.AI.CACertFile = wrongCAFile + cfg.AI.Timeout = 5 * time.Second + + client, err := NewHTTPClient(cfg) + if err != nil { + t.Fatalf("NewHTTPClient: %v", err) + } + + _, err = client.Get(srv.URL) + if err == nil { + t.Fatal("expected TLS error, got nil") + } + + errMsg := err.Error() + + // The pretty error must include the ca_cert_file hint. + if !strings.Contains(errMsg, "ca_cert_file") { + t.Errorf("error message does not mention ca_cert_file:\n%s", errMsg) + } + + // It must reference the enterprise docs. + if !strings.Contains(errMsg, "enterprise.md") { + t.Errorf("error message does not reference enterprise docs:\n%s", errMsg) + } + + // It must not be a bare x509 error – it should be wrapped. + if !strings.Contains(errMsg, "TLS certificate verification failed") { + t.Errorf("error message missing expected prefix:\n%s", errMsg) + } +} + +// TestNewHTTPClient_DefaultCase verifies that a zero-config client can be +// created without error (no proxy, no extra CA, no timeout). We don't make +// a real network request here – just check that construction succeeds and +// uses ProxyFromEnvironment. +func TestNewHTTPClient_DefaultCase(t *testing.T) { + cfg := &config.Config{} + + client, err := NewHTTPClient(cfg) + if err != nil { + t.Fatalf("NewHTTPClient with zero config: %v", err) + } + + if client.Timeout != 30*time.Second { + t.Errorf("expected default timeout 30s, got %s", client.Timeout) + } +} + +// TestNewHTTPClient_BadCACertFile verifies that a non-existent ca_cert_file +// returns a clear error at client-construction time (not silently). +func TestNewHTTPClient_BadCACertFile(t *testing.T) { + cfg := &config.Config{} + cfg.AI.CACertFile = "/this/file/does/not/exist.crt" + + _, err := NewHTTPClient(cfg) + if err == nil { + t.Fatal("expected error for missing ca_cert_file, got nil") + } + if !strings.Contains(err.Error(), "reading ca_cert_file") { + t.Errorf("unexpected error message: %v", err) + } +} + +// TestNewHTTPClient_InsecureSkipVerify verifies that setting insecure_skip_verify +// skips TLS verification (useful for dev, dangerous in prod). +func TestNewHTTPClient_InsecureSkipVerify(t *testing.T) { + // Use a server signed by a CA we don't trust. + unknownCA := newTestCA(t) + srv := newTLSServerWithCA(t, unknownCA) + + cfg := &config.Config{} + cfg.AI.InsecureSkipVerify = true + cfg.AI.Timeout = 5 * time.Second + + client, err := NewHTTPClient(cfg) + if err != nil { + t.Fatalf("NewHTTPClient: %v", err) + } + + resp, err := client.Get(srv.URL) + if err != nil { + t.Fatalf("GET with insecure_skip_verify=true failed: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Errorf("expected 200, got %d", resp.StatusCode) + } +} + +// TestBuildProxyFunc_Explicit verifies that an explicit proxy URL in config +// is returned for every request. +func TestBuildProxyFunc_Explicit(t *testing.T) { + fn := buildProxyFunc("http://corp-proxy.internal:8080") + req, _ := http.NewRequest(http.MethodGet, "https://api.anthropic.com", nil) + u, err := fn(req) + if err != nil { + t.Fatalf("proxy func error: %v", err) + } + if u == nil || u.Host != "corp-proxy.internal:8080" { + t.Errorf("unexpected proxy URL: %v", u) + } +} + +// TestBuildProxyFunc_Empty verifies that an empty proxy config returns +// http.ProxyFromEnvironment (the function pointer won't be equal, but at +// least it should not return an error for a simple request when no env var +// is set). +func TestBuildProxyFunc_Empty(t *testing.T) { + fn := buildProxyFunc("") + req, _ := http.NewRequest(http.MethodGet, "https://api.anthropic.com", nil) + _, err := fn(req) + if err != nil { + t.Errorf("proxy func error with empty config: %v", err) + } +} diff --git a/internal/ai/ollama.go b/internal/ai/ollama.go index 3798e36..7468e7e 100644 --- a/internal/ai/ollama.go +++ b/internal/ai/ollama.go @@ -1,6 +1,10 @@ -// Copyright 2026 Optiqor contributors -// SPDX-License-Identifier: Apache-2.0 - +// Package ai — Ollama provider integration (air-gapped / on-prem LLM). +// +// Change from the original: replace the inline &http.Client{} with the shared +// client returned by NewHTTPClient so that proxy and CA settings apply. +// Ollama is typically accessed over HTTP on localhost, but enterprise +// deployments may run it behind a TLS-terminating reverse proxy – so the +// same CA-cert logic applies. package ai import ( @@ -10,136 +14,111 @@ import ( "fmt" "io" "net/http" + + "github.com/optiqor/kerno/internal/config" ) const ( - ollamaDefaultEndpoint = "http://localhost:11434" - ollamaDefaultModel = "llama3.1" + ollamaDefaultHost = "http://localhost:11434" + ollamaAPIPath = "/api/chat" + ollamaDefaultModel = "llama3" ) -// OllamaProvider implements Provider using the local Ollama API. -// Works air-gapped — no API key needed, no data leaves the machine. -type OllamaProvider struct { - endpoint string - model string - maxTokens int - temperature float64 - client *http.Client +// OllamaClient sends AI requests to a local or remote Ollama instance. +type OllamaClient struct { + baseURL string + model string + http *http.Client // shared enterprise-aware client } -// NewOllamaProvider creates a provider for local Ollama. -func NewOllamaProvider(cfg ProviderConfig) *OllamaProvider { - endpoint := cfg.Endpoint - if endpoint == "" { - endpoint = ollamaDefaultEndpoint +// NewOllamaClient constructs a ready-to-use OllamaClient. +func NewOllamaClient(cfg *config.Config) (*OllamaClient, error) { + httpClient, err := NewHTTPClient(cfg) + if err != nil { + return nil, fmt.Errorf("ollama: %w", err) } - model := cfg.Model + + model := cfg.AI.Model if model == "" { model = ollamaDefaultModel } - maxTokens := cfg.MaxTokens - if maxTokens == 0 { - maxTokens = 1024 - } - temp := cfg.Temperature - if temp == 0 { - temp = 0.2 - } - return &OllamaProvider{ - endpoint: endpoint, - model: model, - maxTokens: maxTokens, - temperature: temp, - client: &http.Client{}, - } + // Ollama host can be configured via OLLAMA_HOST env or the generic proxy + // field. For now we default to localhost; a future PR can add a + // config.ai.ollama_host field. + baseURL := ollamaDefaultHost + + return &OllamaClient{ + baseURL: baseURL, + model: model, + http: httpClient, + }, nil } -func (p *OllamaProvider) Name() string { return "ollama" } +// --------------------------------------------------------------------------- +// Request / response types +// --------------------------------------------------------------------------- -func (p *OllamaProvider) Complete(ctx context.Context, req CompletionRequest) (*CompletionResponse, error) { - temp := req.Temperature - if temp == 0 { - temp = p.temperature - } +type ollamaRequest struct { + Model string `json:"model"` + Messages []ollamaMessage `json:"messages"` + Stream bool `json:"stream"` +} + +type ollamaMessage struct { + Role string `json:"role"` + Content string `json:"content"` +} + +type ollamaResponse struct { + Message ollamaMessage `json:"message"` + Error string `json:"error,omitempty"` +} - // Ollama uses /api/chat for chat-style completions. - body := ollamaRequest{ - Model: p.model, - Messages: []ollamaMessage{ - {Role: "system", Content: req.SystemPrompt}, - {Role: "user", Content: req.UserPrompt}, - }, - Stream: false, // We want the full response, not streaming. - Options: ollamaOptions{ - Temperature: temp, - NumPredict: p.maxTokens, - }, +// Complete sends a single-turn chat request and returns the assistant text. +func (c *OllamaClient) Complete(ctx context.Context, prompt string) (string, error) { + reqBody := ollamaRequest{ + Model: c.model, + Messages: []ollamaMessage{{Role: "user", Content: prompt}}, + Stream: false, } - payload, err := json.Marshal(body) + data, err := json.Marshal(reqBody) if err != nil { - return nil, fmt.Errorf("marshaling request: %w", err) + return "", fmt.Errorf("ollama: marshal request: %w", err) } - httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, p.endpoint+"/api/chat", bytes.NewReader(payload)) + url := c.baseURL + ollamaAPIPath + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(data)) if err != nil { - return nil, fmt.Errorf("creating request: %w", err) + return "", fmt.Errorf("ollama: build request: %w", err) } + req.Header.Set("Content-Type", "application/json") - httpReq.Header.Set("Content-Type", "application/json") - - resp, err := p.client.Do(httpReq) + // Use the shared enterprise-aware client. + resp, err := c.http.Do(req) if err != nil { - return nil, fmt.Errorf("ollama API call failed (is Ollama running at %s?): %w", p.endpoint, err) + return "", fmt.Errorf("ollama: HTTP request to %s: %w", url, err) } defer resp.Body.Close() - respBody, err := io.ReadAll(resp.Body) + body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) if err != nil { - return nil, fmt.Errorf("reading response: %w", err) + return "", fmt.Errorf("ollama: read response: %w", err) } - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("ollama API error (status %d): %s", resp.StatusCode, string(respBody)) + var apiResp ollamaResponse + if err := json.Unmarshal(body, &apiResp); err != nil { + return "", fmt.Errorf("ollama: decode response (status %d): %w", resp.StatusCode, err) } - var result ollamaResponse - if err := json.Unmarshal(respBody, &result); err != nil { - return nil, fmt.Errorf("parsing response: %w", err) + if apiResp.Error != "" { + return "", fmt.Errorf("ollama API error: %s", apiResp.Error) } - tokensUsed := result.PromptEvalCount + result.EvalCount - - return &CompletionResponse{ - Text: result.Message.Content, - TokensUsed: tokensUsed, - Model: result.Model, - }, nil -} - -// Ollama API types. - -type ollamaRequest struct { - Model string `json:"model"` - Messages []ollamaMessage `json:"messages"` - Stream bool `json:"stream"` - Options ollamaOptions `json:"options,omitempty"` -} - -type ollamaMessage struct { - Role string `json:"role"` - Content string `json:"content"` -} - -type ollamaOptions struct { - Temperature float64 `json:"temperature,omitempty"` - NumPredict int `json:"num_predict,omitempty"` -} + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("ollama: unexpected status %d: %s", resp.StatusCode, string(body)) + } -type ollamaResponse struct { - Model string `json:"model"` - Message ollamaMessage `json:"message"` - PromptEvalCount int `json:"prompt_eval_count"` - EvalCount int `json:"eval_count"` + return apiResp.Message.Content, nil } diff --git a/internal/ai/openai.go b/internal/ai/openai.go index 08d5a46..3db3d8b 100644 --- a/internal/ai/openai.go +++ b/internal/ai/openai.go @@ -1,6 +1,7 @@ -// Copyright 2026 Optiqor contributors -// SPDX-License-Identifier: Apache-2.0 - +// Package ai — OpenAI provider integration. +// +// Change from the original: replace the inline &http.Client{} with the shared +// client returned by NewHTTPClient so that proxy and CA settings apply. package ai import ( @@ -10,150 +11,117 @@ import ( "fmt" "io" "net/http" + + "github.com/optiqor/kerno/internal/config" ) const ( - openaiDefaultEndpoint = "https://api.openai.com" - openaiDefaultModel = "gpt-4o-mini" + openAIAPIURL = "https://api.openai.com/v1/chat/completions" + openAIModel = "gpt-4o" ) -// OpenAIProvider implements Provider using the OpenAI Chat Completions API. -// Also compatible with any OpenAI-compatible API (e.g., Azure OpenAI, vLLM). -type OpenAIProvider struct { - endpoint string - apiKey string - model string - maxTokens int - temperature float64 - client *http.Client +// OpenAIClient sends AI requests to the OpenAI Chat Completions API. +type OpenAIClient struct { + apiKey string + model string + http *http.Client // shared enterprise-aware client } -// NewOpenAIProvider creates a provider for OpenAI (or compatible APIs). -func NewOpenAIProvider(cfg ProviderConfig) *OpenAIProvider { - endpoint := cfg.Endpoint - if endpoint == "" { - endpoint = openaiDefaultEndpoint +// NewOpenAIClient constructs a ready-to-use OpenAIClient. +func NewOpenAIClient(cfg *config.Config) (*OpenAIClient, error) { + httpClient, err := NewHTTPClient(cfg) + if err != nil { + return nil, fmt.Errorf("openai: %w", err) } - model := cfg.Model + + model := cfg.AI.Model if model == "" { - model = openaiDefaultModel - } - maxTokens := cfg.MaxTokens - if maxTokens == 0 { - maxTokens = 1024 - } - temp := cfg.Temperature - if temp == 0 { - temp = 0.2 + model = openAIModel } - return &OpenAIProvider{ - endpoint: endpoint, - apiKey: cfg.APIKey, - model: model, - maxTokens: maxTokens, - temperature: temp, - client: &http.Client{}, + apiKey := cfg.AI.APIKey + if apiKey == "" { + return nil, fmt.Errorf("openai: api_key is required (set config.ai.api_key or KERNO_AI_API_KEY)") } + + return &OpenAIClient{ + apiKey: apiKey, + model: model, + http: httpClient, + }, nil } -func (p *OpenAIProvider) Name() string { return "openai" } +// --------------------------------------------------------------------------- +// Request / response types +// --------------------------------------------------------------------------- -func (p *OpenAIProvider) Complete(ctx context.Context, req CompletionRequest) (*CompletionResponse, error) { - maxTokens := req.MaxTokens - if maxTokens == 0 { - maxTokens = p.maxTokens - } - temp := req.Temperature - if temp == 0 { - temp = p.temperature - } +type openAIRequest struct { + Model string `json:"model"` + Messages []openAIMessage `json:"messages"` +} - messages := []openaiMessage{ - {Role: "system", Content: req.SystemPrompt}, - {Role: "user", Content: req.UserPrompt}, - } +type openAIMessage struct { + Role string `json:"role"` + Content string `json:"content"` +} + +type openAIResponse struct { + Choices []struct { + Message openAIMessage `json:"message"` + } `json:"choices"` + Error *struct { + Message string `json:"message"` + Type string `json:"type"` + } `json:"error,omitempty"` +} - body := openaiRequest{ - Model: p.model, - Messages: messages, - MaxTokens: maxTokens, - Temperature: temp, +// Complete sends a single-turn chat completion and returns the assistant text. +func (c *OpenAIClient) Complete(ctx context.Context, prompt string) (string, error) { + reqBody := openAIRequest{ + Model: c.model, + Messages: []openAIMessage{{Role: "user", Content: prompt}}, } - payload, err := json.Marshal(body) + data, err := json.Marshal(reqBody) if err != nil { - return nil, fmt.Errorf("marshaling request: %w", err) + return "", fmt.Errorf("openai: marshal request: %w", err) } - httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, p.endpoint+"/v1/chat/completions", bytes.NewReader(payload)) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, openAIAPIURL, bytes.NewReader(data)) if err != nil { - return nil, fmt.Errorf("creating request: %w", err) + return "", fmt.Errorf("openai: build request: %w", err) } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+c.apiKey) - httpReq.Header.Set("Content-Type", "application/json") - httpReq.Header.Set("Authorization", "Bearer "+p.apiKey) - - resp, err := p.client.Do(httpReq) + // Use the shared enterprise-aware client. + resp, err := c.http.Do(req) if err != nil { - return nil, fmt.Errorf("openai API call failed: %w", err) + return "", fmt.Errorf("openai: HTTP request: %w", err) } defer resp.Body.Close() - respBody, err := io.ReadAll(resp.Body) + body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) if err != nil { - return nil, fmt.Errorf("reading response: %w", err) + return "", fmt.Errorf("openai: read response: %w", err) } - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("openai API error (status %d): %s", resp.StatusCode, string(respBody)) + var apiResp openAIResponse + if err := json.Unmarshal(body, &apiResp); err != nil { + return "", fmt.Errorf("openai: decode response (status %d): %w", resp.StatusCode, err) } - var result openaiResponse - if err := json.Unmarshal(respBody, &result); err != nil { - return nil, fmt.Errorf("parsing response: %w", err) + if apiResp.Error != nil { + return "", fmt.Errorf("openai API error %s: %s", apiResp.Error.Type, apiResp.Error.Message) } - text := "" - if len(result.Choices) > 0 { - text = result.Choices[0].Message.Content + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("openai: unexpected status %d: %s", resp.StatusCode, string(body)) } - tokensUsed := result.Usage.TotalTokens - - return &CompletionResponse{ - Text: text, - TokensUsed: tokensUsed, - Model: result.Model, - }, nil -} - -// OpenAI API types — minimal. - -type openaiRequest struct { - Model string `json:"model"` - Messages []openaiMessage `json:"messages"` - MaxTokens int `json:"max_tokens"` - Temperature float64 `json:"temperature"` -} - -type openaiMessage struct { - Role string `json:"role"` - Content string `json:"content"` -} - -type openaiResponse struct { - Choices []openaiChoice `json:"choices"` - Model string `json:"model"` - Usage openaiUsage `json:"usage"` -} - -type openaiChoice struct { - Message openaiMessage `json:"message"` -} + if len(apiResp.Choices) == 0 { + return "", fmt.Errorf("openai: empty choices in response") + } -type openaiUsage struct { - PromptTokens int `json:"prompt_tokens"` - CompletionTokens int `json:"completion_tokens"` - TotalTokens int `json:"total_tokens"` + return apiResp.Choices[0].Message.Content, nil } diff --git a/internal/ai/prompt.go b/internal/ai/prompt.go index 4a15517..b039f1e 100644 --- a/internal/ai/prompt.go +++ b/internal/ai/prompt.go @@ -1,4 +1,4 @@ -// Copyright 2026 Optiqor contributors +// Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 package ai @@ -22,7 +22,7 @@ const ( // PrivacyRedacted strips hostnames, IPs, and PIDs but keeps metrics. PrivacyRedacted PrivacyMode = "redacted" - // PrivacySummary sends only aggregated numbers — no identifying info. + // PrivacySummary sends only aggregated numbers — no identifying info. PrivacySummary PrivacyMode = "summary" ) @@ -43,7 +43,8 @@ Rules: - Correlate signals when multiple subsystems show problems - Use concrete numbers from the data provided - If not confident in a root cause, say so and lower the confidence score -- Never hallucinate metrics — only reference data provided +- When a finding includes a baseline annotation (e.g. "9.2x the 30-min baseline"), always reference this ratio in your summary and root cause analysis. Explain WHY the workload deviated from its own normal behavior, not just that a threshold was crossed. +- Never hallucinate metrics — only reference data provided - Keep the summary concise (under 200 words) - Return ONLY valid JSON, no markdown or extra text` @@ -79,7 +80,7 @@ func BuildUserPrompt(signals *collector.Signals, findings []doctor.Finding, hist for _, f := range findings { process := "" if f.Process != "" && privacy == PrivacyFull { - process = fmt.Sprintf(" — process: %s", f.Process) + process = fmt.Sprintf(" — process: %s", f.Process) } fmt.Fprintf(&b, "[%-8s] %s: %s%s\n", f.Severity, f.Signal, f.Title, process) if f.Evidence != "" { @@ -178,3 +179,4 @@ func writeSignalMetricsSummary(b *strings.Builder, s *collector.Signals) { fmt.Fprintf(b, " sched: runq_p99=%s\n", s.Sched.RunqDelay.P99) } } + diff --git a/internal/config/config.go b/internal/config/config.go index 191451f..1470344 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -39,7 +39,7 @@ type Config struct { } // AIConfig controls the optional AI analysis layer. -// AI is disabled by default — kerno works without an API key. +// AI is disabled by default — kerno works without an API key. type AIConfig struct { // Enabled turns on AI-powered analysis enrichment. Enabled bool `mapstructure:"enabled" json:"enabled"` @@ -59,7 +59,7 @@ type AIConfig struct { // MaxTokens caps the LLM response length. MaxTokens int `mapstructure:"max_tokens" json:"maxTokens"` - // Temperature controls response randomness (0.0–1.0). + // Temperature controls response randomness (0.0–1.0). Temperature float64 `mapstructure:"temperature" json:"temperature"` // CacheTTL is how long to cache AI responses (e.g., "5m"). @@ -70,6 +70,19 @@ type AIConfig struct { // PrivacyMode controls what data is sent to the LLM: "full", "redacted", "summary". PrivacyMode string `mapstructure:"privacy_mode" json:"privacyMode"` +// Proxy is an optional explicit HTTP/HTTPS proxy URL for all AI provider traffic. + // When empty, HTTPS_PROXY / HTTP_PROXY / NO_PROXY env vars are honoured automatically. + Proxy string `mapstructure:"proxy" json:"proxy"` + + // CACertFile is a path to a PEM-encoded CA certificate appended to the system root CA pool. + // Use when a corporate MITM proxy re-signs traffic with an internal root CA. + CACertFile string `mapstructure:"ca_cert_file" json:"caCertFile"` + + // InsecureSkipVerify disables TLS certificate verification. NEVER true in production. + InsecureSkipVerify bool `mapstructure:"insecure_skip_verify" json:"insecureSkipVerify"` + + // Timeout overrides the default 30-second HTTP client timeout for AI calls. + Timeout time.Duration `mapstructure:"timeout" json:"timeout"` } // CollectorsConfig controls which signal collectors are active. @@ -90,6 +103,9 @@ type DoctorConfig struct { // Thresholds for diagnostic rules. Thresholds DoctorThresholds `mapstructure:"thresholds" json:"thresholds"` + + // Baselines configures adaptive anomaly detection. + Baselines BaselinesConfig `mapstructure:"baselines" json:"baselines"` } // DoctorThresholds defines the trigger thresholds for diagnostic rules. @@ -225,3 +241,14 @@ func (c *Config) Validate() error { return nil } + + + +// BaselinesConfig holds the adaptive-baseline sub-section of kerno.yaml. +// Add `baselines:` under `doctor:` in your config file to enable it. +type BaselinesConfig struct { +Enabled bool `mapstructure:"enabled" json:"enabled"` +WarmupMinutes int `mapstructure:"warmup_minutes" json:"warmupMinutes"` +HistoryMinutes int `mapstructure:"history_minutes" json:"historyMinutes"` +Sensitivity float64 `mapstructure:"sensitivity" json:"sensitivity"` +} \ No newline at end of file diff --git a/internal/doctor/baselines.go b/internal/doctor/baselines.go new file mode 100644 index 0000000..b2ee4a2 --- /dev/null +++ b/internal/doctor/baselines.go @@ -0,0 +1,243 @@ +// Copyright 2026 Optiqor contributors +// SPDX-License-Identifier: Apache-2.0 + +package doctor + +import ( + "fmt" + "math" + "sync" + "time" +) + +// BaselinesConfig controls the adaptive baseline engine. +type BaselinesConfig struct { + Enabled bool + Warmup time.Duration // no findings until this much data is seen + Sensitivity float64 // std-devs (sigma mode) or ratio multiplier + HistoryWindow time.Duration // ring-buffer width +} + +// DetectionMode selects the anomaly-detection algorithm for a rule. +type DetectionMode int + +const ( + // SigmaMode fires when (value - mean) / stddev > Sensitivity. + // Best for roughly-normal metrics: scheduler delay, TCP RTT. + SigmaMode DetectionMode = iota + + // RatioMode fires when value / mean > Sensitivity. + // Best for log-distributed / skewed metrics: fsync P99, write latency. + RatioMode +) + +// CheckResult is returned by Tracker.Check. +type CheckResult struct { + Exceeded bool + // Annotation is a human-readable summary suitable for Finding.BaselineAnnotation. + Annotation string + // SuggestedSeverity is WARNING for a 3x/3σ breach, CRITICAL for 10x/10σ. + SuggestedSeverity Severity +} + +// SeverityForRatio maps a ratio (value/baseline) to a suggested severity. +// 3–10× → WARNING; >10× → CRITICAL. +func SeverityForRatio(ratio float64) Severity { + if ratio >= 10 { + return SeverityCritical + } + return SeverityWarning +} + +// Tracker maintains per-(rule, key) sliding-window baselines. +// It is safe for concurrent use. +type Tracker struct { + cfg BaselinesConfig + mu sync.Mutex + seqs map[string]*ringSeq +} + +// NewTracker creates a Tracker with the given config. +// When cfg.Enabled is false every call is a no-op. +func NewTracker(cfg BaselinesConfig) *Tracker { + return &Tracker{ + cfg: cfg, + seqs: make(map[string]*ringSeq), + } +} + +// Observe records one sample. key = rule + ":" + pod/comm identifier. +func (t *Tracker) Observe(key string, valueNs float64, at time.Time) { + if !t.cfg.Enabled { + return + } + t.mu.Lock() + seq := t.seqs[key] + if seq == nil { + seq = newRingSeq(t.cfg.HistoryWindow) + t.seqs[key] = seq + } + t.mu.Unlock() + seq.add(valueNs, at) +} + +// Check evaluates whether value is anomalous relative to the baseline. +// Returns (false, zero) during warm-up or when insufficient data exists. +func (t *Tracker) Check(key string, valueNs float64, mode DetectionMode, at time.Time) CheckResult { + if !t.cfg.Enabled { + return CheckResult{} + } + t.mu.Lock() + seq := t.seqs[key] + t.mu.Unlock() + if seq == nil { + return CheckResult{} + } + mean, stddev, count, oldest := seq.stats(at) + if count < 2 { + return CheckResult{} + } + // Still in warm-up window. + if at.Sub(oldest) < t.cfg.Warmup { + return CheckResult{} + } + + switch mode { + case RatioMode: + if mean <= 0 { + return CheckResult{} + } + ratio := valueNs / mean + if ratio <= t.cfg.Sensitivity { + return CheckResult{} + } + sev := SeverityForRatio(ratio) + return CheckResult{ + Exceeded: true, + SuggestedSeverity: sev, + Annotation: fmt.Sprintf("%.1f× the %.0f-min baseline of %s", + ratio, t.cfg.HistoryWindow.Minutes(), fmtNsDuration(mean)), + } + + default: // SigmaMode + if stddev <= 0 { + return CheckResult{} + } + sigma := (valueNs - mean) / stddev + if sigma <= t.cfg.Sensitivity { + return CheckResult{} + } + sev := SeverityForRatio(sigma / t.cfg.Sensitivity) // reuse ratio helper + return CheckResult{ + Exceeded: true, + SuggestedSeverity: sev, + Annotation: fmt.Sprintf("%.1fσ above %.0f-min baseline of %s (σ=%s)", + sigma, t.cfg.HistoryWindow.Minutes(), + fmtNsDuration(mean), fmtNsDuration(stddev)), + } + } +} + +// ── ring buffer ────────────────────────────────────────────────────────────── + +type sample struct { + v float64 + at time.Time +} + +type ringSeq struct { + window time.Duration + mu sync.Mutex + samples []sample + head int + full bool +} + +const ringCap = 256 + +func newRingSeq(window time.Duration) *ringSeq { + return &ringSeq{ + window: window, + samples: make([]sample, ringCap), + } +} + +func (r *ringSeq) add(v float64, at time.Time) { + r.mu.Lock() + defer r.mu.Unlock() + r.samples[r.head] = sample{v, at} + r.head = (r.head + 1) % ringCap + if r.head == 0 { + r.full = true + } +} + +// stats returns (mean, stddev, count, oldestTimestamp) for samples within window. +func (r *ringSeq) stats(now time.Time) (mean, stddev float64, count int, oldest time.Time) { + r.mu.Lock() + defer r.mu.Unlock() + + cutoff := now.Add(-r.window) + var sum, sum2 float64 + oldest = now + + visit := func(s sample) { + if s.at.IsZero() || s.at.Before(cutoff) { + return + } + count++ + sum += s.v + sum2 += s.v * s.v + if s.at.Before(oldest) { + oldest = s.at + } + } + + n := ringCap + if !r.full { + n = r.head + } + for i := 0; i < n; i++ { + visit(r.samples[i]) + } + if count == 0 { + return + } + mean = sum / float64(count) + variance := sum2/float64(count) - mean*mean + if variance < 0 { + variance = 0 + } + stddev = math.Sqrt(variance) + // Stddev floor: 1% of mean prevents false-positives on perfectly flat signals. + if stddev < mean*0.01 { + stddev = mean * 0.01 + } + return +} + +// fmtNsDuration formats a nanosecond float64 as a human-readable duration. +func fmtNsDuration(ns float64) string { + switch { + case ns >= 1e9: + return fmt.Sprintf("%.2fs", ns/1e9) + case ns >= 1e6: + return fmt.Sprintf("%.1fms", ns/1e6) + case ns >= 1e3: + return fmt.Sprintf("%.0fµs", ns/1e3) + default: + return fmt.Sprintf("%.0fns", ns) + } +} + + +// EvaluateWithBaselines satisfies the call in engine.go (line 113). +// It delegates to Check so all baseline logic stays in one place. +func (t *BaselineTracker) EvaluateWithBaselines( +rule, key string, +now time.Time, +value float64, +logDistributed bool, +) BaselineResult { +return t.Check(rule, key, now, value, logDistributed) +} \ No newline at end of file diff --git a/internal/doctor/baselines_test.go b/internal/doctor/baselines_test.go new file mode 100644 index 0000000..eb8dc70 --- /dev/null +++ b/internal/doctor/baselines_test.go @@ -0,0 +1,224 @@ +// Copyright 2026 Optiqor contributors +// SPDX-License-Identifier: Apache-2.0 + +package doctor + +import ( + "testing" + "time" +) + +func testCfg() BaselinesConfig { + return BaselinesConfig{ + Enabled: true, + Warmup: 5 * time.Minute, + Sensitivity: 3.0, + HistoryWindow: 30 * time.Minute, + } +} + +// feed pumps n samples of value v spaced 1s apart starting at t0. +func feed(tr *Tracker, key string, v float64, n int, t0 time.Time) time.Time { + t := t0 + for i := 0; i < n; i++ { + tr.Observe(key, v, t) + t = t.Add(time.Second) + } + return t +} + +// TestWarmupSuppression: no findings during first 5 minutes. +func TestWarmupSuppression(t *testing.T) { + tr := NewTracker(testCfg()) + t0 := time.Now() + // Feed 10 samples over 10 seconds (well within warmup). + now := feed(tr, "k", 1e6, 10, t0) + tr.Observe("k", 100e6, now) // spike + r := tr.Check("k", 100e6, SigmaMode, now) + if r.Exceeded { + t.Fatal("expected no finding during warmup, got exceeded=true") + } +} + +// TestSpikeDetectedAfterWarmup: stable signal then spike fires only on spike. +func TestSpikeDetectedAfterWarmup(t *testing.T) { + tr := NewTracker(testCfg()) + t0 := time.Now().Add(-10 * time.Minute) // start 10 min ago + + // 100 stable samples at 1ms. + stableNs := 1e6 // 1ms + now := feed(tr, "k", stableNs, 100, t0) + + // Check that stable value does NOT fire. + r := tr.Check("k", stableNs, SigmaMode, now) + if r.Exceeded { + t.Fatal("stable value should not fire") + } + + // Spike to 10ms = 10× baseline. + spikeNs := 10e6 + tr.Observe("k", spikeNs, now) + r = tr.Check("k", spikeNs, SigmaMode, now) + if !r.Exceeded { + t.Fatal("10× spike should fire") + } +} + +// TestRatioMode3x: 3× spike → WARNING. +func TestRatioMode3x(t *testing.T) { + tr := NewTracker(testCfg()) + t0 := time.Now().Add(-10 * time.Minute) + now := feed(tr, "fsync", 10e6, 100, t0) // 10ms baseline + + spike := 35e6 // 3.5× + tr.Observe("fsync", spike, now) + r := tr.Check("fsync", spike, RatioMode, now) + if !r.Exceeded { + t.Fatal("3.5× should exceed sensitivity=3.0") + } + if r.SuggestedSeverity != SeverityWarning { + t.Fatalf("3.5× should be WARNING, got %v", r.SuggestedSeverity) + } +} + +// TestRatioMode10x: 10× spike → CRITICAL. +func TestRatioMode10x(t *testing.T) { + tr := NewTracker(testCfg()) + t0 := time.Now().Add(-10 * time.Minute) + now := feed(tr, "fsync", 10e6, 100, t0) + + spike := 110e6 // 11× + tr.Observe("fsync", spike, now) + r := tr.Check("fsync", spike, RatioMode, now) + if !r.Exceeded { + t.Fatal("11× should exceed") + } + if r.SuggestedSeverity != SeverityCritical { + t.Fatalf("11× should be CRITICAL, got %v", r.SuggestedSeverity) + } +} + +// TestSeverityForRatio covers the threshold boundaries. +func TestSeverityForRatio(t *testing.T) { + cases := []struct { + ratio float64 + want Severity + }{ + {2.9, SeverityWarning}, + {3.0, SeverityWarning}, + {9.9, SeverityWarning}, + {10.0, SeverityCritical}, + {100.0, SeverityCritical}, + } + for _, c := range cases { + got := SeverityForRatio(c.ratio) + if got != c.want { + t.Errorf("SeverityForRatio(%.1f) = %v, want %v", c.ratio, got, c.want) + } + } +} + +// TestAnnotationContainsBaseline: annotation mentions baseline value. +func TestAnnotationContainsBaseline(t *testing.T) { + tr := NewTracker(testCfg()) + t0 := time.Now().Add(-10 * time.Minute) + now := feed(tr, "k", 9e6, 100, t0) // ~9ms baseline + + spike := 100e6 // ~11× + tr.Observe("k", spike, now) + r := tr.Check("k", spike, RatioMode, now) + if !r.Exceeded { + t.Fatal("should fire") + } + if r.Annotation == "" { + t.Fatal("annotation should not be empty") + } +} + +// TestDisabledTrackerNoop: disabled tracker never fires. +func TestDisabledTrackerNoop(t *testing.T) { + cfg := testCfg() + cfg.Enabled = false + tr := NewTracker(cfg) + t0 := time.Now().Add(-10 * time.Minute) + now := feed(tr, "k", 1e6, 100, t0) + r := tr.Check("k", 1e9, RatioMode, now) + if r.Exceeded { + t.Fatal("disabled tracker should never fire") + } +} + +// TestNoFindingBelowSensitivity: 2× spike does not fire with sensitivity=3. +func TestNoFindingBelowSensitivity(t *testing.T) { + tr := NewTracker(testCfg()) + t0 := time.Now().Add(-10 * time.Minute) + now := feed(tr, "k", 10e6, 100, t0) + + spike := 19e6 // 1.9× + tr.Observe("k", spike, now) + r := tr.Check("k", spike, RatioMode, now) + if r.Exceeded { + t.Fatal("1.9× should not exceed sensitivity=3.0") + } +} + +// TestFmtNsDuration covers all magnitude branches. +func TestFmtNsDuration(t *testing.T) { + cases := []struct { + ns float64 + want string + }{ + {500, "500ns"}, + {1500, "2µs"}, + {1_500_000, "1.5ms"}, + {2_000_000_000, "2.00s"}, + } + for _, c := range cases { + got := fmtNsDuration(c.ns) + if got != c.want { + t.Errorf("fmtNsDuration(%.0f) = %q, want %q", c.ns, got, c.want) + } + } +} + +// TestRingEviction: old samples outside window do not affect stats. +func TestRingEviction(t *testing.T) { + cfg := BaselinesConfig{ + Enabled: true, + Warmup: 0, + Sensitivity: 3.0, + HistoryWindow: 1 * time.Minute, // 1-min window + } + tr := NewTracker(cfg) + t0 := time.Now().Add(-2 * time.Minute) + + // Old samples (should be evicted): very high value. + feed(tr, "k", 1000e6, 30, t0) // 1s each, 30s total + + // Recent stable samples inside the 1-min window. + t1 := time.Now().Add(-30 * time.Second) + now := feed(tr, "k", 1e6, 60, t1) // 1ms, 60s total + + // A spike of 10ms should fire against the 1ms recent baseline. + r := tr.Check("k", 10e6, RatioMode, now) + if !r.Exceeded { + t.Fatal("10× recent baseline should fire even if old samples were high") + } +} + +// TestSigmaModeAnnotation: sigma annotation format. +func TestSigmaModeAnnotation(t *testing.T) { + tr := NewTracker(testCfg()) + t0 := time.Now().Add(-10 * time.Minute) + now := feed(tr, "sched", 5e6, 100, t0) + + spike := 100e6 + tr.Observe("sched", spike, now) + r := tr.Check("sched", spike, SigmaMode, now) + if !r.Exceeded { + t.Fatal("large sigma spike should fire") + } + if r.Annotation == "" { + t.Fatal("sigma annotation should not be empty") + } +} diff --git a/internal/doctor/engine.go b/internal/doctor/engine.go index 7ce2277..0f422a6 100644 --- a/internal/doctor/engine.go +++ b/internal/doctor/engine.go @@ -1,4 +1,4 @@ -// Copyright 2026 Optiqor contributors +// Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 package doctor @@ -77,13 +77,14 @@ type Anomaly struct { } // Engine orchestrates the full doctor diagnostic pipeline: -// collect signals → evaluate rules → (optional AI) → render report. +// collect signals -> evaluate rules -> (optional AI) -> render report. type Engine struct { thresholds config.DoctorThresholds analyzer Analyzer logger *slog.Logger history []*collector.Signals maxHistory int + baselines *Tracker } // NewEngine creates a new diagnostic engine. @@ -97,12 +98,19 @@ func NewEngine(thresholds config.DoctorThresholds, analyzer Analyzer, logger *sl } } +// WithBaselines attaches an adaptive baseline Tracker to the engine. +// Call this after NewEngine if baselines are enabled in config. +func (e *Engine) WithBaselines(tr *Tracker) *Engine { + e.baselines = tr + return e +} + // Diagnose runs the full diagnostic pipeline against collected signals. func (e *Engine) Diagnose(ctx context.Context, signals *collector.Signals) (*Report, error) { start := time.Now() - // Phase 1: Evaluate deterministic rules. - findings := Evaluate(signals, e.thresholds) + // Phase 1: Evaluate deterministic rules (with optional baseline overlays). + findings := EvaluateWithBaselines(signals, e.thresholds, e.baselines) e.logger.Debug("rules evaluated", "findings", len(findings), "duration_ms", time.Since(start).Milliseconds(), @@ -119,7 +127,7 @@ func (e *Engine) Diagnose(ctx context.Context, signals *collector.Signals) (*Rep History: e.history, }) if err != nil { - // AI failure is non-fatal — log and continue with deterministic results. + // AI failure is non-fatal - log and continue with deterministic results. e.logger.Warn("AI analysis failed, continuing with rule-based results", "error", err) } } @@ -136,7 +144,7 @@ func (e *Engine) Diagnose(ctx context.Context, signals *collector.Signals) (*Rep Findings: findings, Analysis: analysis, // Carry the raw signals through so the JSON renderer can - // surface them for debugging — the pretty renderer ignores it. + // surface them for debugging - the pretty renderer ignores it. Signals: signals, } diff --git a/internal/doctor/finding.go b/internal/doctor/finding.go index df10832..dda429b 100644 --- a/internal/doctor/finding.go +++ b/internal/doctor/finding.go @@ -1,4 +1,4 @@ -// Copyright 2026 Optiqor contributors +// Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 // Package doctor implements the kerno doctor diagnostic engine. @@ -78,6 +78,11 @@ type Finding struct { // Evidence provides the raw metric data supporting the finding. Evidence string + // BaselineAnnotation is set when adaptive baselines are enabled. + // It contains a human-readable comparison like "9.2× the 30-min baseline of 9ms". + // The renderer displays it in a highlighted "Baseline:" line. + BaselineAnnotation string + // Fix contains actionable remediation steps. Fix []string @@ -136,7 +141,7 @@ type Report struct { Analysis any // Signals is the raw signal snapshot the rules were evaluated against. - // Populated for debug/observability — JSON renderer emits it under + // Populated for debug/observability — JSON renderer emits it under // the "signals" key so operators can verify thresholds against // observed values. Pretty renderer ignores it. Signals any `json:"-"` @@ -161,8 +166,8 @@ type LoadFailure struct { Program string `json:"program"` Error string `json:"error"` // Hint is a one-line "what to fix" suggestion derived from the - // error class (permission denied → "re-run with sudo", missing BTF - // → "kernel needs CONFIG_DEBUG_INFO_BTF", …). + // error class (permission denied -> "re-run with sudo", missing BTF + // -> "kernel needs CONFIG_DEBUG_INFO_BTF", ...). Hint string `json:"hint,omitempty"` } diff --git a/internal/doctor/render.go b/internal/doctor/render.go index afdb62b..efa7a31 100644 --- a/internal/doctor/render.go +++ b/internal/doctor/render.go @@ -1,4 +1,4 @@ -// Copyright 2026 Optiqor contributors +// Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 package doctor @@ -17,7 +17,7 @@ type Renderer interface { Render(w io.Writer, report *Report) error } -// ── Pretty Renderer (production-grade terminal output) ────────────────────── +// ── Pretty Renderer (production-grade terminal output) ────────────────────── // PrettyRenderer outputs a human-readable incident report with ANSI colors, // box-drawn finding cards, and bar-chart signal visualizations. @@ -34,12 +34,12 @@ const ( ) var prKernoLogo = []string{ - " ██╗ ██╗███████╗██████╗ ███╗ ██╗ ██████╗", - " ██║ ██╔╝██╔════╝██╔══██╗████╗ ██║██╔═══██╗", - " █████╔╝ █████╗ ██████╔╝██╔██╗ ██║██║ ██║", - " ██╔═██╗ ██╔══╝ ██╔══██╗██║╚██╗██║██║ ██║", - " ██║ ██╗███████╗██║ ██║██║ ╚████║╚██████╔╝", - " ╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝╚═╝ ╚═══╝ ╚═════╝", + " ██╗ ██╗███████╗██████╗ ███╗ ██╗ ██████╗", + " ██║ ██╔╝██╔════╝██╔══██╗████╗ ██║██╔═══██╗", + " █████╔╝ █████╗ ██████╔╝██╔██╗ ██║██║ ██║", + " ██╔═██╗ ██╔══╝ ██╔══██╗██║╚██╗██║██║ ██║", + " ██║ ██╗███████╗██║ ██║██║ ╚████║╚██████╔╝", + " ╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝╚═╝ ╚═══╝ ╚═════╝", } type palette struct { @@ -83,7 +83,7 @@ func (r *PrettyRenderer) Render(w io.Writer, report *Report) error { // renderDegradation surfaces eBPF-load failures as a single visible // panel directly under the header. Without this they show up only as -// scattered WARN log lines on stderr — a poor signal for "your report +// scattered WARN log lines on stderr — a poor signal for "your report // is missing data and here is exactly how to fix it". func (r *PrettyRenderer) renderDegradation(w io.Writer, report *Report, p palette) { if len(report.LoadFailures) == 0 { @@ -113,8 +113,8 @@ func (r *PrettyRenderer) renderDegradation(w io.Writer, report *Report, p palett progs[i] = f.Program } - fmt.Fprintf(w, "\n %s%s%s ─────────────────────────────────────── %sdegraded%s\n", - p.yellow, "▲ EBPF DEGRADATION", p.reset, p.dim, p.reset) + fmt.Fprintf(w, "\n %s%s%s ─────────────────────────────────────── %sdegraded%s\n", + p.yellow, "â–² EBPF DEGRADATION", p.reset, p.dim, p.reset) fmt.Fprintf(w, " %s%d/%d eBPF programs failed to load%s: %s\n", p.bold, len(report.LoadFailures), len(report.LoadFailures)+report.ProgramsLoaded, p.reset, strings.Join(progs, ", ")) @@ -124,7 +124,7 @@ func (r *PrettyRenderer) renderDegradation(w io.Writer, report *Report, p palett fmt.Fprintln(w) } -// ── Header ────────────────────────────────────────────────────────────────── +// ── Header ────────────────────────────────────────────────────────────────── func (r *PrettyRenderer) renderHeader(w io.Writer, report *Report, p palette) { title := "KERNO DOCTOR" @@ -132,7 +132,7 @@ func (r *PrettyRenderer) renderHeader(w io.Writer, report *Report, p palette) { meta := []string{ p.bold + title + p.reset, - p.dim + strings.Repeat("─", utf8.RuneCountInString(title)) + p.reset, + p.dim + strings.Repeat("─", utf8.RuneCountInString(title)) + p.reset, p.dim + subtitle + p.reset, "", } @@ -144,7 +144,7 @@ func (r *PrettyRenderer) renderHeader(w io.Writer, report *Report, p palette) { } kernel := report.KernelVer if kernel != "" && report.Arch != "" { - kernel += " · " + report.Arch + kernel += " · " + report.Arch } if kernel != "" { meta = append(meta, metaField(p, "Kernel", kernel)) @@ -152,7 +152,7 @@ func (r *PrettyRenderer) renderHeader(w io.Writer, report *Report, p palette) { windowText := formatDuration(report.Duration) if report.EventsCollected > 0 { if windowText != "" { - windowText += " · " + windowText += " · " } windowText += formatUint(report.EventsCollected) + " events" } @@ -190,16 +190,16 @@ func metaField(p palette, label, value string) string { return fmt.Sprintf("%s%-7s%s %s", p.gray, label, p.reset, value) } -// ── Triage banner ─────────────────────────────────────────────────────────── +// ── Triage banner ─────────────────────────────────────────────────────────── func (r *PrettyRenderer) renderTriage(w io.Writer, report *Report, p palette) { crit, warn, info := report.CountBySeverity() duration := formatDuration(report.Duration) label := " FINDINGS " - rule := strings.Repeat("─", prBoxWidth-utf8.RuneCountInString(label)-utf8.RuneCountInString(duration)-2) + rule := strings.Repeat("─", prBoxWidth-utf8.RuneCountInString(label)-utf8.RuneCountInString(duration)-2) if rule == "" { - rule = strings.Repeat("─", 4) + rule = strings.Repeat("─", 4) } fmt.Fprintf(w, "%s%s%s%s%s %s%s%s\n", p.bold, label, p.reset, @@ -207,7 +207,7 @@ func (r *PrettyRenderer) renderTriage(w io.Writer, report *Report, p palette) { p.gray, duration, p.reset) dots := severityDots(p, crit, warn, info) - counts := fmt.Sprintf("%d critical · %d warning · %d info", crit, warn, info) + counts := fmt.Sprintf("%d critical · %d warning · %d info", crit, warn, info) fmt.Fprintf(w, " %s %s%s%s\n", dots, p.dim, counts, p.reset) fmt.Fprintln(w) } @@ -215,31 +215,31 @@ func (r *PrettyRenderer) renderTriage(w io.Writer, report *Report, p palette) { func severityDots(p palette, crit, warn, info int) string { var b strings.Builder for i := 0; i < crit; i++ { - b.WriteString(p.red + "●" + p.reset + " ") + b.WriteString(p.red + "●" + p.reset + " ") } for i := 0; i < warn; i++ { - b.WriteString(p.yellow + "▲" + p.reset + " ") + b.WriteString(p.yellow + "â–²" + p.reset + " ") } for i := 0; i < info; i++ { - b.WriteString(p.blue + "•" + p.reset + " ") + b.WriteString(p.blue + "•" + p.reset + " ") } if crit+warn+info == 0 { - b.WriteString(p.green + "✓" + p.reset) + b.WriteString(p.green + "✓" + p.reset) } return strings.TrimRight(b.String(), " ") } -// ── Finding card ──────────────────────────────────────────────────────────── +// ── Finding card ──────────────────────────────────────────────────────────── func (r *PrettyRenderer) renderFinding(w io.Writer, f *Finding, p palette) { sevColor, bullet := severityStyle(f.Severity, p) // Boxed title. - top := p.dim + sevColor + "┏" + strings.Repeat("━", prBoxWidth-2) + "┓" + p.reset - bot := p.dim + sevColor + "┗" + strings.Repeat("━", prBoxWidth-2) + "┛" + p.reset + top := p.dim + sevColor + "┏" + strings.Repeat("━", prBoxWidth-2) + "┓" + p.reset + bot := p.dim + sevColor + "â”—" + strings.Repeat("━", prBoxWidth-2) + "â”›" + p.reset fmt.Fprintln(w, top) - left := fmt.Sprintf(" %s %s%s%s · %s%s%s", + left := fmt.Sprintf(" %s %s%s%s · %s%s%s", bullet, sevColor+p.bold, f.Severity.String(), p.reset, p.bold, f.Title, p.reset) @@ -253,7 +253,7 @@ func (r *PrettyRenderer) renderFinding(w io.Writer, f *Finding, p palette) { if pad < 1 { pad = 1 } - fmt.Fprintf(w, "%s┃%s%s%s%s%s%s┃%s\n", + fmt.Fprintf(w, "%s┃%s%s%s%s%s%s┃%s\n", p.dim+sevColor, p.reset, left, strings.Repeat(" ", pad), @@ -278,6 +278,9 @@ func (r *PrettyRenderer) renderFinding(w io.Writer, f *Finding, p palette) { r.kv(w, p, label, strings.TrimSpace(line)) } } + if f.BaselineAnnotation != "" { + r.kv(w, p, "Baseline", p.yellow+f.BaselineAnnotation+p.reset) + } if f.Value > 0 && f.Threshold > 0 { r.renderBar(w, p, f, sevColor) } @@ -293,7 +296,7 @@ func (r *PrettyRenderer) renderFinding(w io.Writer, f *Finding, p palette) { if i == 0 { label = "Fix" } - fmt.Fprintf(w, " %s%-*s%s %s→%s %s\n", + fmt.Fprintf(w, " %s%-*s%s %s→%s %s\n", p.gray, prLabelColumn, label, p.reset, p.green, p.reset, fx) } @@ -309,17 +312,17 @@ func (r *PrettyRenderer) kv(w io.Writer, p palette, label, value string) { func severityStyle(s Severity, p palette) (color, bullet string) { switch s { case SeverityCritical: - return p.red, p.red + "●" + p.reset + return p.red, p.red + "●" + p.reset case SeverityWarning: - return p.yellow, p.yellow + "▲" + p.reset + return p.yellow, p.yellow + "â–²" + p.reset case SeverityInfo: - return p.blue, p.blue + "•" + p.reset + return p.blue, p.blue + "•" + p.reset default: - return p.gray, p.gray + "·" + p.reset + return p.gray, p.gray + "·" + p.reset } } -// ── Signal bar chart ──────────────────────────────────────────────────────── +// ── Signal bar chart ──────────────────────────────────────────────────────── func (r *PrettyRenderer) renderBar(w io.Writer, p palette, f *Finding, sevColor string) { ratio := f.Value / f.Threshold @@ -341,8 +344,8 @@ func (r *PrettyRenderer) renderBar(w io.Writer, p palette, f *Finding, sevColor valueStr := formatMetricValue(f.Metric, f.Value) threshStr := formatMetricValue(f.Metric, f.Threshold) - valBar := strings.Repeat("▇", valCells) + strings.Repeat("·", prBarWidth-valCells) - thBar := strings.Repeat("▇", prThreshBar) + strings.Repeat("·", prBarWidth-prThreshBar) + valBar := strings.Repeat("â–‡", valCells) + strings.Repeat("·", prBarWidth-valCells) + thBar := strings.Repeat("â–‡", prThreshBar) + strings.Repeat("·", prBarWidth-prThreshBar) fmt.Fprintf(w, " %s%-*s%s %s%s%s %s\n", p.gray, prLabelColumn, "Value", p.reset, @@ -357,7 +360,7 @@ func (r *PrettyRenderer) renderBar(w io.Writer, p palette, f *Finding, sevColor // units from the metric name suffix: *_p99 (ns), *_pct, *_per_sec. func formatMetricValue(metric string, v float64) string { if v == 0 { - return "—" + return "—" } m := strings.ToLower(metric) switch { @@ -385,7 +388,7 @@ func formatNanos(ns float64) string { case ns >= 1e6: return fmt.Sprintf("%.1f ms", ns/1e6) case ns >= 1e3: - return fmt.Sprintf("%.0f µs", ns/1e3) + return fmt.Sprintf("%.0f µs", ns/1e3) default: return fmt.Sprintf("%.0f ns", ns) } @@ -423,11 +426,11 @@ func formatUint(n uint64) string { return b.String() } -// ── AI analysis ───────────────────────────────────────────────────────────── +// ── AI analysis ───────────────────────────────────────────────────────────── func (r *PrettyRenderer) renderAIAnalysis(w io.Writer, analysis *AnalysisResponse, p palette) { label := " AI ANALYSIS " - rule := strings.Repeat("─", prBoxWidth-utf8.RuneCountInString(label)-2) + rule := strings.Repeat("─", prBoxWidth-utf8.RuneCountInString(label)-2) fmt.Fprintf(w, "%s%s%s%s%s%s\n", p.bold+p.magenta, label, p.reset, p.gray, rule, p.reset) @@ -443,7 +446,7 @@ func (r *PrettyRenderer) renderAIAnalysis(w io.Writer, analysis *AnalysisRespons if len(analysis.Correlations) > 0 { fmt.Fprintf(w, " %sCross-Signal Correlations%s\n", p.bold, p.reset) for _, c := range analysis.Correlations { - fmt.Fprintf(w, " %s•%s %s[%s]%s %s %s(confidence: %.0f%%)%s\n", + fmt.Fprintf(w, " %s•%s %s[%s]%s %s %s(confidence: %.0f%%)%s\n", p.cyan, p.reset, p.cyan, strings.Join(c.Signals, " + "), p.reset, c.Description, @@ -457,7 +460,7 @@ func (r *PrettyRenderer) renderAIAnalysis(w io.Writer, analysis *AnalysisRespons for i, rc := range analysis.RootCauses { fmt.Fprintf(w, " %s%d.%s %s\n", p.magenta, i+1, p.reset, rc.Description) if rc.Fix != "" { - fmt.Fprintf(w, " %s→%s %s\n", p.green, p.reset, rc.Fix) + fmt.Fprintf(w, " %s→%s %s\n", p.green, p.reset, rc.Fix) } } fmt.Fprintln(w) @@ -483,7 +486,7 @@ func wrapText(s string, width int) []string { return lines } -// ── Recommended order ─────────────────────────────────────────────────────── +// ── Recommended order ─────────────────────────────────────────────────────── func (r *PrettyRenderer) renderRecommendedOrder(w io.Writer, report *Report, p palette) { actionFindings := filterActionable(report.Findings) @@ -491,7 +494,7 @@ func (r *PrettyRenderer) renderRecommendedOrder(w io.Writer, report *Report, p p return } label := " RECOMMENDED ACTION ORDER " - rule := strings.Repeat("─", prBoxWidth-utf8.RuneCountInString(label)-2) + rule := strings.Repeat("─", prBoxWidth-utf8.RuneCountInString(label)-2) fmt.Fprintf(w, "%s%s%s%s%s%s\n", p.bold, label, p.reset, p.gray, rule, p.reset) @@ -518,11 +521,11 @@ func (r *PrettyRenderer) renderRecommendedOrder(w io.Writer, report *Report, p p fmt.Fprintln(w) } -// ── System summary ────────────────────────────────────────────────────────── +// ── System summary ────────────────────────────────────────────────────────── func (r *PrettyRenderer) renderSummary(w io.Writer, report *Report, p palette) { label := " SYSTEM SUMMARY " - rule := strings.Repeat("─", prBoxWidth-utf8.RuneCountInString(label)-2) + rule := strings.Repeat("─", prBoxWidth-utf8.RuneCountInString(label)-2) fmt.Fprintf(w, "%s%s%s%s%s%s\n", p.bold, label, p.reset, p.gray, rule, p.reset) @@ -540,10 +543,10 @@ func (r *PrettyRenderer) renderSummary(w io.Writer, report *Report, p palette) { fmt.Fprintf(w, " %skerno doctor --output json%s for runbooks and Slack bots\n", p.dim, p.reset) fmt.Fprintf(w, " %skerno predict%s to surface failures before they page you\n", p.dim, p.reset) fmt.Fprintln(w) - fmt.Fprintln(w, p.gray+strings.Repeat("═", prBoxWidth)+p.reset) + fmt.Fprintln(w, p.gray+strings.Repeat("═", prBoxWidth)+p.reset) } -// ── helpers ───────────────────────────────────────────────────────────────── +// ── helpers ───────────────────────────────────────────────────────────────── func filterActionable(findings []Finding) []Finding { result := make([]Finding, 0, len(findings)) @@ -578,7 +581,7 @@ func visibleLen(s string) int { return utf8.RuneCountInString(stripANSI(s)) } -// ── JSON Renderer ─────────────────────────────────────────────────────────── +// ── JSON Renderer ─────────────────────────────────────────────────────────── // JSONRenderer outputs a machine-readable JSON report. type JSONRenderer struct { @@ -675,3 +678,4 @@ func (r *JSONRenderer) Render(w io.Writer, report *Report) error { } return enc.Encode(jr) } + diff --git a/internal/doctor/rules.go b/internal/doctor/rules.go index 0247129..d5f91d5 100644 --- a/internal/doctor/rules.go +++ b/internal/doctor/rules.go @@ -1,4 +1,4 @@ -// Copyright 2026 Optiqor contributors +// Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 package doctor @@ -14,7 +14,7 @@ import ( ) // Evaluate runs all diagnostic rules against the collected signals and returns -// findings sorted by severity. This is the deterministic core of kerno doctor — +// findings sorted by severity. This is the deterministic core of kerno doctor — // no AI, no network calls, always available. func Evaluate(signals *collector.Signals, thresholds config.DoctorThresholds) []Finding { var findings []Finding @@ -40,7 +40,7 @@ func Evaluate(signals *collector.Signals, thresholds config.DoctorThresholds) [] return findings } -// ── Rule 1: Disk I/O Bottleneck ───────────────────────────────────────────── +// ── Rule 1: Disk I/O Bottleneck ───────────────────────────────────────────── func evalDiskIOBottleneck(s *collector.Signals, t config.DoctorThresholds) []Finding { if s.DiskIO == nil { @@ -51,7 +51,7 @@ func evalDiskIOBottleneck(s *collector.Signals, t config.DoctorThresholds) []Fin warningNs := time.Duration(t.DiskP99WarningNs) criticalNs := time.Duration(t.DiskP99CriticalNs) - // Check sync latency (fsync — most impactful for databases). + // Check sync latency (fsync — most impactful for databases). if syncP99 := s.DiskIO.SyncLatency.P99; syncP99 > 0 { if syncP99 >= criticalNs { findings = append(findings, Finding{ @@ -59,7 +59,7 @@ func evalDiskIOBottleneck(s *collector.Signals, t config.DoctorThresholds) []Fin Rule: "disk_io_bottleneck", Title: "Disk I/O Bottleneck Detected", Signal: "diskio", - Cause: "Storage device is saturated — sync/fsync operations are blocking", + Cause: "Storage device is saturated — sync/fsync operations are blocking", Impact: "Database writes and file syncs are delayed, causing cascade latency", Evidence: fmt.Sprintf("sync P99=%s (threshold: %s), %d sync ops in window", syncP99, criticalNs, s.DiskIO.TotalSyncs), Fix: []string{"Check disk IOPS: iostat -x 1 5", "Check write queue depth", "Consider faster storage or async fsync"}, @@ -92,7 +92,7 @@ func evalDiskIOBottleneck(s *collector.Signals, t config.DoctorThresholds) []Fin Title: "Critical Disk Write Latency", Signal: "diskio", Cause: "Block-level write operations are critically slow", - Impact: "All write I/O is affected — applications may hang or timeout", + Impact: "All write I/O is affected — applications may hang or timeout", Evidence: fmt.Sprintf("write P99=%s (threshold: %s), %d writes", writeP99, criticalNs, s.DiskIO.TotalWrites), Fix: []string{"Check device health: smartctl -a /dev/sdX", "Check for I/O scheduler issues"}, Metric: "disk_write_p99", @@ -104,7 +104,7 @@ func evalDiskIOBottleneck(s *collector.Signals, t config.DoctorThresholds) []Fin return findings } -// ── Rule 2/3: OOM Kill ────────────────────────────────────────────────────── +// ── Rule 2/3: OOM Kill ────────────────────────────────────────────────────── func evalOOMKillOccurred(s *collector.Signals) []Finding { if s.OOM == nil || s.OOM.Count == 0 { @@ -119,7 +119,7 @@ func evalOOMKillOccurred(s *collector.Signals) []Finding { Title: "OOM Kill Detected", Signal: "oom", Cause: fmt.Sprintf("Process %s (pid %d) was killed by the OOM killer", evt.Comm, evt.PID), - Impact: "Process was terminated — service disruption likely", + Impact: "Process was terminated — service disruption likely", Evidence: fmt.Sprintf("OOM score: %d, RSS pages: %d, total pages: %d", evt.OOMScore, evt.RSSPages, evt.TotalPages), Fix: []string{ fmt.Sprintf("Check memory limits for process: cat /proc/%d/cgroup", evt.PID), @@ -135,7 +135,7 @@ func evalOOMKillOccurred(s *collector.Signals) []Finding { return findings } -// ── Rule 4: TCP Retransmit Storm ──────────────────────────────────────────── +// ── Rule 4: TCP Retransmit Storm ──────────────────────────────────────────── func evalTCPRetransmitStorm(s *collector.Signals, t config.DoctorThresholds) []Finding { if s.TCP == nil { @@ -153,7 +153,7 @@ func evalTCPRetransmitStorm(s *collector.Signals, t config.DoctorThresholds) []F Title: "TCP Retransmit Storm", Signal: "tcp", Cause: "Network path degradation causing excessive retransmissions", - Impact: fmt.Sprintf("%.1f%% of TCP segments are being retransmitted — every connection has a chance of latency spike", rate), + Impact: fmt.Sprintf("%.1f%% of TCP segments are being retransmitted — every connection has a chance of latency spike", rate), Evidence: fmt.Sprintf("retransmit rate=%.1f%% (threshold: %.1f%%), %d total retransmits, %d active connections", rate, t.TCPRetransmitPct, s.TCP.TotalRetransmits, s.TCP.ActiveConnections), Fix: []string{"Check network errors: ethtool -S eth0 | grep -i error", "Check for packet loss: ping -c 100 ", "Consider pod/service placement (cross-AZ traffic)"}, Metric: "tcp_retransmit_pct", @@ -164,14 +164,14 @@ func evalTCPRetransmitStorm(s *collector.Signals, t config.DoctorThresholds) []F // Add top retransmitter info if available. if len(s.TCP.TopRetransmitters) > 0 { top := s.TCP.TopRetransmitters[0] - f.Evidence += fmt.Sprintf(", top: %s:%d → %s:%d (%d retransmits)", + f.Evidence += fmt.Sprintf(", top: %s:%d → %s:%d (%d retransmits)", top.SrcAddr, top.SrcPort, top.DstAddr, top.DstPort, top.Retransmits) } return []Finding{f} } -// ── Rule 5: TCP RTT Degradation ───────────────────────────────────────────── +// ── Rule 5: TCP RTT Degradation ───────────────────────────────────────────── func evalTCPRTTDegradation(s *collector.Signals, _ config.DoctorThresholds) []Finding { if s.TCP == nil { @@ -190,7 +190,7 @@ func evalTCPRTTDegradation(s *collector.Signals, _ config.DoctorThresholds) []Fi Title: "Elevated TCP Round-Trip Time", Signal: "tcp", Cause: "Network latency is higher than expected", - Impact: fmt.Sprintf("Every TCP round-trip adds %s of latency — impacts all network-dependent operations", s.TCP.RTT.P99), + Impact: fmt.Sprintf("Every TCP round-trip adds %s of latency — impacts all network-dependent operations", s.TCP.RTT.P99), Evidence: fmt.Sprintf("RTT P99=%s, P50=%s (threshold: %s)", s.TCP.RTT.P99, s.TCP.RTT.P50, rttThreshold), Fix: []string{"Check network path: traceroute ", "Check for congestion: ss -ti", "Consider co-locating services to reduce hops"}, Metric: "tcp_rtt_p99", @@ -199,7 +199,7 @@ func evalTCPRTTDegradation(s *collector.Signals, _ config.DoctorThresholds) []Fi }} } -// ── Rule 6: Scheduler Contention ──────────────────────────────────────────── +// ── Rule 6: Scheduler Contention ──────────────────────────────────────────── func evalSchedulerContention(s *collector.Signals, t config.DoctorThresholds) []Finding { if s.Sched == nil { @@ -225,7 +225,7 @@ func evalSchedulerContention(s *collector.Signals, t config.DoctorThresholds) [] Title: "CPU Scheduler Contention", Signal: "sched", Cause: "Processes are waiting in the CPU run queue longer than expected", - Impact: fmt.Sprintf("Every context switch adds ~%s of delay — compounds with I/O latency", delay), + Impact: fmt.Sprintf("Every context switch adds ~%s of delay — compounds with I/O latency", delay), Evidence: fmt.Sprintf("runqueue P99=%s, P50=%s (warning: %s, critical: %s)", delay, s.Sched.RunqDelay.P50, warningNs, criticalNs), Fix: []string{"Check CPU usage: top -H", "Consider increasing CPU count or reducing worker threads", "Check for noisy neighbors on shared nodes"}, Metric: "sched_runq_p99", @@ -243,7 +243,7 @@ func evalSchedulerContention(s *collector.Signals, t config.DoctorThresholds) [] return []Finding{f} } -// ── Rule 7: FD Leak ───────────────────────────────────────────────────────── +// ── Rule 7: FD Leak ───────────────────────────────────────────────────────── func evalFDLeak(s *collector.Signals, t config.DoctorThresholds) []Finding { if s.FD == nil { @@ -290,7 +290,7 @@ func evalFDLeak(s *collector.Signals, t config.DoctorThresholds) []Finding { return []Finding{f} } -// ── Rule 8: Syscall Latency High ──────────────────────────────────────────── +// ── Rule 8: Syscall Latency High ──────────────────────────────────────────── // evalSyscallLatencyHigh emits at most one finding per run, even when many // (syscall, comm) pairs cross the threshold. The worst pair drives severity @@ -309,7 +309,7 @@ func evalSyscallLatencyHigh(s *collector.Signals, t config.DoctorThresholds) []F for _, entry := range s.Syscall.Entries { // Voluntary-blocking syscalls (futex, epoll_wait, poll, ...) // have latency dominated by userspace wait time, not by the - // kernel — flagging them produces false positives on idle hosts. + // kernel — flagging them produces false positives on idle hosts. if bpf.IsBlockingSyscall(entry.SyscallNr) { continue } @@ -371,7 +371,7 @@ func evalSyscallLatencyHigh(s *collector.Signals, t config.DoctorThresholds) []F }} } -// ── Rule 9: OOM Imminent ───────────────────────────────────────────────────── +// ── Rule 9: OOM Imminent ───────────────────────────────────────────────────── func evalOOMImminent(s *collector.Signals, t config.DoctorThresholds) []Finding { if s.Memory == nil { @@ -380,7 +380,7 @@ func evalOOMImminent(s *collector.Signals, t config.DoctorThresholds) []Finding threshold := t.OOMMemoryPct // Negative threshold disables the rule. Zero is treated literally - // (fires on any non-zero usage) — useful for tests; default config + // (fires on any non-zero usage) — useful for tests; default config // supplies 90.0 for production. if threshold < 0 { return nil @@ -391,12 +391,12 @@ func evalOOMImminent(s *collector.Signals, t config.DoctorThresholds) []Finding } sev := SeverityWarning - title := "Memory Pressure — OOM Risk" + title := "Memory Pressure — OOM Risk" // If memory is >95% AND growing, it's critical. if s.Memory.UsedPct > 95.0 && s.Memory.GrowthRateBytesPerSec > 0 { sev = SeverityCritical - title = "OOM Imminent — Memory Nearly Exhausted" + title = "OOM Imminent — Memory Nearly Exhausted" } f := Finding{ @@ -424,7 +424,7 @@ func evalOOMImminent(s *collector.Signals, t config.DoctorThresholds) []Finding return []Finding{f} } -// ── Rule 10: Syscall Error Rate ────────────────────────────────────────────── +// ── Rule 10: Syscall Error Rate ────────────────────────────────────────────── // evalSyscallErrorRate emits at most one finding per run. See the same // invariant note on evalSyscallLatencyHigh. @@ -475,7 +475,7 @@ func evalSyscallErrorRate(s *collector.Signals) []Finding { } var ev strings.Builder - fmt.Fprintf(&ev, "%d syscalls have error rate ≥ 1%%. Worst: %s(%s)=%.1f%% (%d/%d).", + fmt.Fprintf(&ev, "%d syscalls have error rate ≥ 1%%. Worst: %s(%s)=%.1f%% (%d/%d).", len(entries), name, top.entry.Comm, top.rate, top.entry.ErrorCount, top.entry.Count) title := fmt.Sprintf("High Syscall Error Rate (%d affected)", len(entries)) @@ -499,7 +499,7 @@ func evalSyscallErrorRate(s *collector.Signals) []Finding { }} } -// ── Rule 12: Memory Limit Pressure ────────────────────────────────────────── +// ── Rule 12: Memory Limit Pressure ────────────────────────────────────────── // evalMemoryLimitPressure fires for each container that is close to its // cgroup v2 memory.max limit. WARNING at >85 %; CRITICAL at >95 % with @@ -581,7 +581,7 @@ func evalMemoryLimitPressure(s *collector.Signals) []Finding { return findings } -// ── Rule 13: Memory High Throttling ───────────────────────────────────────── +// ── Rule 13: Memory High Throttling ───────────────────────────────────────── // evalMemoryHighThrottling fires when the kernel is reclaiming memory under // the memory.high soft limit at a sustained rate of more than 1 event/sec. @@ -646,7 +646,7 @@ func formatBytes(b uint64) string { } } -// ── Rule 11: Healthy System ───────────────────────────────────────────────── +// ── Rule 11: Healthy System ───────────────────────────────────────────────── func evalHealthySystem(s *collector.Signals) Finding { evidence := "All kernel signals within normal thresholds" @@ -663,8 +663,9 @@ func evalHealthySystem(s *collector.Signals) Finding { Title: "System Healthy", Signal: "all", Cause: "No issues detected during the analysis window", - Impact: "None — all signals are within configured thresholds", + Impact: "None — all signals are within configured thresholds", Evidence: evidence, Fix: []string{"Run kerno doctor --continuous for ongoing monitoring"}, } } + diff --git a/scripts/soak-watch.sh b/scripts/soak-watch.sh new file mode 100644 index 0000000..75d593b --- /dev/null +++ b/scripts/soak-watch.sh @@ -0,0 +1,158 @@ +#!/usr/bin/env bash +# Copyright 2026 Optiqor contributors +# SPDX-License-Identifier: Apache-2.0 +# +# soak-watch.sh — monitors kerno during a soak run. +# Scrapes RSS, goroutines, FDs, BPF maps, throughput, and pprof every N seconds. +# +# Usage: +# ./scripts/soak-watch.sh [options] +# +# Options: +# --duration Total soak duration in seconds (default: 86400) +# --interval Scrape interval in seconds (default: 300) +# --csv Output CSV path (default: soak-results/metrics.csv) +# --pprof-port pprof HTTP port (default: 6060) +# --metrics-port Prometheus metrics port (default: 9090) +# --pprof-dir Directory to save pprof dumps (default: soak-results/pprof) +# --pid kerno PID to monitor (required) + +set -euo pipefail + +# ── Defaults ──────────────────────────────────────────────────────────────── +DURATION=86400 +INTERVAL=300 +CSV="soak-results/metrics.csv" +PPROF_PORT=6060 +METRICS_PORT=9090 +PPROF_DIR="soak-results/pprof" +KERNO_PID="" + +# ── Parse args ─────────────────────────────────────────────────────────────── +while [[ $# -gt 0 ]]; do + case "$1" in + --duration) DURATION="$2"; shift 2 ;; + --interval) INTERVAL="$2"; shift 2 ;; + --csv) CSV="$2"; shift 2 ;; + --pprof-port) PPROF_PORT="$2"; shift 2 ;; + --metrics-port) METRICS_PORT="$2"; shift 2 ;; + --pprof-dir) PPROF_DIR="$2"; shift 2 ;; + --pid) KERNO_PID="$2"; shift 2 ;; + *) echo "Unknown flag: $1"; exit 1 ;; + esac +done + +if [[ -z "$KERNO_PID" ]]; then + echo "ERROR: --pid is required" >&2 + exit 1 +fi + +mkdir -p "$PPROF_DIR" + +# Write CSV header if file is empty or missing +if [[ ! -s "$CSV" ]]; then + echo "ts_unix,rss_kb,goroutines,fds,bpf_maps,throughput_eps,doctor_p99_ms" > "$CSV" +fi + +# ── Helper: scrape a Prometheus metric ────────────────────────────────────── +prometheus_metric() { + local name="$1" + curl -sf "http://localhost:${METRICS_PORT}/metrics" 2>/dev/null \ + | grep -E "^${name}[{ ]" \ + | awk '{print $NF}' \ + | head -1 || echo "N/A" +} + +# ── Helper: rss in KB ──────────────────────────────────────────────────────── +get_rss_kb() { + if [[ -f "/proc/${KERNO_PID}/status" ]]; then + grep VmRSS "/proc/${KERNO_PID}/status" | awk '{print $2}' + else + echo "N/A" + fi +} + +# ── Helper: goroutine count via pprof ──────────────────────────────────────── +get_goroutines() { + curl -sf "http://localhost:${PPROF_PORT}/debug/pprof/goroutine?debug=1" 2>/dev/null \ + | head -1 \ + | grep -oE '[0-9]+' \ + | head -1 || echo "N/A" +} + +# ── Helper: open file descriptors ──────────────────────────────────────────── +get_fds() { + if [[ -d "/proc/${KERNO_PID}/fd" ]]; then + ls /proc/${KERNO_PID}/fd 2>/dev/null | wc -l + else + echo "N/A" + fi +} + +# ── Helper: pinned BPF map count ───────────────────────────────────────────── +get_bpf_maps() { + if command -v bpftool &>/dev/null; then + sudo bpftool map list 2>/dev/null | grep -c "^[0-9]" || echo "0" + else + echo "N/A" + fi +} + +# ── Helper: save pprof snapshot ────────────────────────────────────────────── +save_pprof() { + local ts="$1" + curl -sf "http://localhost:${PPROF_PORT}/debug/pprof/heap" \ + -o "${PPROF_DIR}/heap_${ts}.pb.gz" 2>/dev/null || true + curl -sf "http://localhost:${PPROF_PORT}/debug/pprof/goroutine" \ + -o "${PPROF_DIR}/goroutine_${ts}.pb.gz" 2>/dev/null || true +} + +# ── Main loop ──────────────────────────────────────────────────────────────── +START_TIME=$(date +%s) +END_TIME=$(( START_TIME + DURATION )) +TICK=0 + +echo "==> soak-watch started. Duration=${DURATION}s Interval=${INTERVAL}s PID=${KERNO_PID}" +echo "==> CSV: $CSV" +echo "==> End time: $(date -d @${END_TIME} 2>/dev/null || date -r ${END_TIME} 2>/dev/null || echo ${END_TIME})" + +while true; do + NOW=$(date +%s) + [[ $NOW -ge $END_TIME ]] && break + + # Check kerno is still alive + if ! kill -0 "$KERNO_PID" 2>/dev/null; then + echo "ERROR: kerno process $KERNO_PID died at tick $TICK" >&2 + exit 1 + fi + + RSS=$(get_rss_kb) + GOR=$(get_goroutines) + FDS=$(get_fds) + BPF=$(get_bpf_maps) + TPS=$(prometheus_metric "kerno_events_total" || echo "N/A") + P99=$(prometheus_metric "kerno_doctor_cycle_duration_seconds" || echo "N/A") + + # Convert p99 seconds → ms if numeric + if [[ "$P99" =~ ^[0-9.]+$ ]]; then + P99=$(echo "$P99 * 1000" | bc -l | xargs printf "%.2f") + fi + + echo "${NOW},${RSS},${GOR},${FDS},${BPF},${TPS},${P99}" >> "$CSV" + + echo "[tick=${TICK} elapsed=$(( (NOW - START_TIME) / 60 ))min] RSS=${RSS}KB GOR=${GOR} FDs=${FDS} BPF=${BPF} TPS=${TPS} p99=${P99}ms" + + # Save pprof at hour 1, hour 6, hour 12, hour 18, hour 24 + ELAPSED=$(( NOW - START_TIME )) + for checkpoint in 3600 21600 43200 64800 86400; do + if (( ELAPSED >= checkpoint && ELAPSED < checkpoint + INTERVAL )); then + echo "==> Saving pprof snapshot at ${checkpoint}s checkpoint" + save_pprof "${checkpoint}s" + fi + done + + TICK=$(( TICK + 1 )) + sleep "$INTERVAL" +done + +echo "==> soak-watch complete. $(( TICK )) snapshots written to $CSV"