From ce3889408b4ddd94539feeef763701929a1c8c97 Mon Sep 17 00:00:00 2001 From: Joe Searcy Date: Fri, 3 Apr 2026 03:42:44 -0400 Subject: [PATCH 1/2] Add support for custom collectors --- .gitignore | 2 + README.md | 21 +- docs/collectors.md | 227 ++++++++++++ docs/operations.md | 37 ++ docs/policy-authoring.md | 65 ++++ examples/collectors/hugepages/metadata.json | 15 + examples/collectors/hugepages/policy.rego | 200 +++++++++++ .../collectors/hugepages/policy_test.rego | 132 +++++++ examples/collectors/node-info/metadata.json | 15 + examples/collectors/node-info/policy.rego | 162 +++++++++ .../collectors/node-info/policy_test.rego | 109 ++++++ internal/bundle/remote.go | 245 +++++++++++++ internal/bundle/remote_test.go | 244 +++++++++++++ internal/cli/collect.go | 238 +++++++++++++ internal/cli/root.go | 1 + internal/cli/scan.go | 160 ++++++++- internal/collector/collector.go | 37 ++ internal/collector/collector_test.go | 132 +++++++ internal/collector/job_collector.go | 327 ++++++++++++++++++ internal/collector/merge.go | 43 +++ internal/collector/result.go | 78 +++++ internal/collector/types.go | 99 ++++++ internal/eval/rego/engine.go | 21 +- internal/eval/rego/engine_test.go | 164 +++++++++ internal/kube/snapshot.go | 7 + 25 files changed, 2763 insertions(+), 18 deletions(-) create mode 100644 docs/collectors.md create mode 100644 examples/collectors/hugepages/metadata.json create mode 100644 examples/collectors/hugepages/policy.rego create mode 100644 examples/collectors/hugepages/policy_test.rego create mode 100644 examples/collectors/node-info/metadata.json create mode 100644 examples/collectors/node-info/policy.rego create mode 100644 examples/collectors/node-info/policy_test.rego create mode 100644 internal/bundle/remote.go create mode 100644 internal/bundle/remote_test.go create mode 100644 internal/cli/collect.go create mode 100644 internal/collector/collector.go create mode 100644 internal/collector/collector_test.go create mode 100644 internal/collector/job_collector.go create mode 100644 internal/collector/merge.go create mode 100644 internal/collector/result.go create mode 100644 internal/collector/types.go diff --git a/.gitignore b/.gitignore index 8af57b8..9ca631d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ bin/ .DS_Store dist/ planning/ +collector-data.json +collectors-test/ diff --git a/README.md b/README.md index dbea796..e792684 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,20 @@ make build ./bin/kvirtbp scan --show-runbook --output table ./bin/kvirtbp runbook ./bin/kvirtbp runbook --id RUNBOOK-SEC-RBAC-001 + +# Collector workflow — gather node/cluster data then scan with it +./bin/kvirtbp collect --bundle ./policy/baseline --output collector-data.json +./bin/kvirtbp collect --collector-config ./my-collectors.json --output collector-data.json +./bin/kvirtbp scan --engine rego --policy-bundle ./policy/baseline --collector-data collector-data.json + +# Remote bundle (HTTPS tarball) +./bin/kvirtbp collect --bundle https://github.com/myorg/policies/archive/refs/tags/v1.2.0.tar.gz --output collector-data.json +./bin/kvirtbp scan --engine rego --policy-bundle https://github.com/myorg/policies/archive/refs/tags/v1.2.0.tar.gz --collector-data collector-data.json + +# Remote monorepo (bundle lives under a subdirectory) +./bin/kvirtbp scan --engine rego \ + --policy-bundle https://github.com/myorg/policies/archive/refs/tags/v1.2.0.tar.gz \ + --bundle-subdir policy/kubevirt --collector-data collector-data.json ``` ## Homebrew @@ -92,8 +106,10 @@ Scan command supports: - `--category` and `--severity` to filter findings - `--engine` to select evaluator backend (`go` and `rego`) - `--policy-file` to provide a custom Rego policy file with `data.kvirtbp.findings` output -- `--policy-bundle` to provide a directory of `.rego` files with optional `metadata.json` +- `--policy-bundle` to provide a local directory or HTTPS `.tar.gz` URL of `.rego` files with optional `metadata.json` +- `--bundle-subdir` to point at a subdirectory within a remote archive (for monorepo layouts) - `--show-runbook` to append compact runbook hints for failing findings +- `--collector-data` to inject pre-collected node/cluster data into `input.cluster.collectors` for Rego policies Namespace scoping precedence for namespace-based coverage controls: @@ -132,6 +148,7 @@ Additional documentation: - [docs/check-catalog.md](docs/check-catalog.md) - [docs/policy-authoring.md](docs/policy-authoring.md) +- [docs/collectors.md](docs/collectors.md) - [docs/operations.md](docs/operations.md) - [docs/workflows.md](docs/workflows.md) @@ -140,6 +157,7 @@ Policy bundle metadata (optional `metadata.json`): - `schemaVersion`: currently `v1alpha1` - `policyVersion`: informational version for your bundle - `minBinaryVersion`: optional minimum CLI version (for example `1.2.0`) +- `collectors`: optional array of `CollectorConfig` objects that `kvirtbp collect` will run automatically when `--bundle` is provided (see [docs/collectors.md](docs/collectors.md)) Checked-in baseline Rego bundle: @@ -196,4 +214,5 @@ Manual CI execution: ## Roadmap notes - v1 includes hybrid policy execution (Go + Rego/OPA) +- The `collect` subcommand and collector framework are included in v1 - Snapshot bundle export and visualization UI are post-v1 roadmap items diff --git a/docs/collectors.md b/docs/collectors.md new file mode 100644 index 0000000..f90b3c0 --- /dev/null +++ b/docs/collectors.md @@ -0,0 +1,227 @@ +# Collectors Guide + +Collectors are short-lived Kubernetes Jobs deployed by `kvirtbp collect` to gather node or cluster-scope data that Rego policies can reference at scan time via `input.cluster.collectors`. + +## How it works + +``` +kvirtbp collect kvirtbp scan --collector-data ... + │ │ + ▼ ▼ +Deploy Jobs on cluster Load collector-data.json + │ │ + ▼ ▼ +Wait for completion Inject into ClusterSnapshot.Collectors + │ │ + ▼ ▼ +Read pod logs (JSON) Available as input.cluster.collectors + │ + ▼ +Write collector-data.json +``` + +`scan` is read-only. All cluster writes happen exclusively in `collect`. + +## Quick start + +```bash +# Run collectors declared in a local policy bundle +./bin/kvirtbp collect --bundle ./policy/baseline --output collector-data.json + +# Run collectors from a remote bundle (HTTPS .tar.gz) +./bin/kvirtbp collect \ + --bundle https://github.com/myorg/policies/archive/refs/tags/v1.2.0.tar.gz \ + --output collector-data.json + +# Monorepo: bundle lives under a subdirectory of the archive +./bin/kvirtbp collect \ + --bundle https://github.com/myorg/policies/archive/refs/tags/v1.2.0.tar.gz \ + --bundle-subdir policy/kubevirt \ + --output collector-data.json + +# Or from a standalone collector config file +./bin/kvirtbp collect --collector-config ./my-collectors.json --output collector-data.json + +# Then scan with the collected data +./bin/kvirtbp scan --engine rego --policy-bundle ./policy/baseline \ + --collector-data collector-data.json +``` + +## CollectorConfig schema + +Collectors are declared as JSON objects. The schema maps to `collector.CollectorConfig` in the Go package. + +| Field | Type | Required | Description | +|---|---|---|---| +| `name` | string | yes | Unique identifier; used as the key in `input.cluster.collectors` | +| `image` | string | yes | Container image to run | +| `commands` | []string | yes | Shell commands to execute inside the container | +| `scope` | string | no | `"once"` (default) or `"per-node"` | +| `outputPath` | string | no | In-pod path where commands write JSON output (default: `/kvirtbp/output.json`) | +| `timeoutSeconds` | int | no | Per-collector deadline in seconds; `0` means use the global `--collector-timeout` cap | +| `privileged` | bool | no | Run container with `SecurityContext.Privileged = true` | +| `hostPID` | bool | no | Mount host PID namespace | +| `hostNetwork` | bool | no | Attach to host network namespace | +| `env` | object | no | Environment variables injected into the container | +| `tolerations` | []object | no | Pod tolerations applied to the Job pod. Each object supports `key`, `operator` (`Exists`/`Equal`), `value`, and `effect` fields. Use `{"operator": "Exists"}` to tolerate all taints (e.g. to run `per-node` on control-plane nodes). | + +### Scope values + +| Value | Behaviour | +|---|---| +| `once` | A single Job is deployed. Output is stored under the sentinel key `_cluster`. | +| `per-node` | One Job per node (via `nodeName` selector). Output is stored keyed by node name. | + +## Output format + +`kvirtbp collect` writes a JSON file with the structure: + +```json +{ + "": { + "": { + "": "" + } + } +} +``` + +Example with a `sysctl` collector using `scope: once`: + +```json +{ + "sysctl": { + "_cluster": { + "net.ipv4.ip_forward": "0", + "net.bridge.bridge-nf-call-iptables": "1" + } + } +} +``` + +Example with a `sysctl` collector using `scope: per-node`: + +```json +{ + "sysctl": { + "worker-1": { "net.ipv4.ip_forward": "0" }, + "worker-2": { "net.ipv4.ip_forward": "1" } + } +} +``` + +If a collector (or individual node) fails, an `_error` key is stored in place of the normal output and collection continues: + +```json +{ + "sysctl": { + "worker-1": { "_error": "job kvirtbp-sysctl-worker-1 failed: BackoffLimitExceeded" } + } +} +``` + +## Writing collector output + +Commands must write valid JSON to `outputPath` (default `/kvirtbp/output.json`). The CLI appends `cat ` as the last command in the Job spec — this is what appears as pod logs and is parsed as JSON. + +Intermediate commands may freely write to stdout/stderr without corrupting the output payload since they run before the final `cat`. + +Example pattern for a sysctl collector: + +```bash +# write JSON to the output path, then let the CLI's appended "cat" emit it +commands: + - "sysctl -a --pattern '^(net\\.ipv4\\.ip_forward|net\\.bridge\\.bridge-nf-call-iptables)' | awk 'BEGIN{printf \"{\"} {printf \"%s\\\"%s\\\": \\\"%s\\\"%s\", NR>1?\",\":\" \", $1, $3} END{print \"}\"}' > /kvirtbp/output.json" +``` + +Or with a helper image that already produces JSON: + +```bash +commands: + - "my-tool dump-json > /kvirtbp/output.json" +``` + +## Declaring collectors in a bundle + +Add a `collectors` array to the bundle's `metadata.json`: + +```json +{ + "schemaVersion": "v1alpha1", + "policyVersion": "1.0.0", + "resources": ["v1/nodes"], + "collectors": [ + { + "name": "sysctl", + "image": "alpine:3.21", + "commands": [ + "apk add -q procps", + "sysctl -a --pattern '^net\\.ipv4\\.ip_forward' | awk 'BEGIN{printf \"{\"}{printf \"\\\"%s\\\":\\\"%s\\\"\", $1,$3}END{print \"}\"}' > /kvirtbp/output.json" + ], + "scope": "per-node", + "privileged": true, + "hostNetwork": true + } + ] +} +``` + +Running `kvirtbp collect --bundle ./policy/baseline` will execute these collectors automatically. + +## Merging collector configs + +When both `--bundle` and `--collector-config` are provided, the two sets are merged. `--collector-config` wins on name collision: + +```bash +./bin/kvirtbp collect \ + --bundle ./policy/baseline \ + --collector-config ./overrides.json \ + --output collector-data.json +``` + +`overrides.json` can be a partial list — only the names that need to differ from the bundle's defaults need to be included. + +## RBAC requirements + +The identity running `kvirtbp collect` needs: + +```yaml +rules: + - apiGroups: ["batch"] + resources: ["jobs"] + verbs: ["create", "get", "list", "delete"] + - apiGroups: [""] + resources: ["pods", "pods/log"] + verbs: ["get", "list"] + - apiGroups: [""] + resources: ["namespaces"] + verbs: ["get", "create"] + - apiGroups: [""] + resources: ["nodes"] + verbs: ["list"] # needed for per-node scope +``` + +If `--collector-namespace` already exists, the `namespaces` create verb is not required. + +## Debugging + +Use `--no-collector-cleanup` to prevent Job deletion after completion: + +```bash +./bin/kvirtbp collect --bundle ./policy/baseline \ + --no-collector-cleanup --output collector-data.json +``` + +Then inspect failed Jobs directly: + +```bash +kubectl get jobs -n kvirtbp-collectors +kubectl logs -n kvirtbp-collectors -l collector-name=sysctl +``` + +## Security considerations + +- `privileged`, `hostPID`, and `hostNetwork` must be explicitly opted into per collector; they are never defaulted. +- The collector namespace (`kvirtbp-collectors` by default) should have a restrictive PSA policy. Consider `enforce: privileged` only if your collectors require host access, and scope the namespace RBAC tightly. +- Collector `env` values are stored in plain text in the Job spec. Do not use them for secrets; use a Kubernetes Secret volume mount instead. +- The output file (`collector-data.json`) may contain sensitive node data. Treat it with appropriate access controls. diff --git a/docs/operations.md b/docs/operations.md index 1f4c3a2..e57383a 100644 --- a/docs/operations.md +++ b/docs/operations.md @@ -16,6 +16,38 @@ The scanner expects read/list access to: Insufficient access is reported as explicit permission findings (`perm-list-*`) and may degrade security/availability baseline outcomes. +The `collect` subcommand creates Kubernetes Jobs and therefore requires additional permissions in the collector namespace: + +- `batch/jobs` — create, get, list, delete +- `pods/log` — get (to read Job output) +- `namespaces` — get, create (only if `--collector-namespace` does not already exist) + +## Collector workflow + +The `collect` subcommand deploys short-lived Kubernetes Jobs to gather data that Rego policies can reference via `input.cluster.collectors`. This is a separate step from `scan`; the scan command itself makes no cluster writes. + +Typical two-step workflow: + +```bash +# Step 1: collect — writes collector-data.json +./bin/kvirtbp collect --bundle ./policy/baseline --output collector-data.json + +# Step 2: scan — injects collected data, makes no cluster writes +./bin/kvirtbp scan --engine rego --policy-bundle ./policy/baseline \ + --collector-data collector-data.json +``` + +Key `collect` flags: + +- `--bundle` — loads collector definitions from a bundle's `metadata.json` automatically +- `--collector-config` — path to a standalone JSON file of `[]CollectorConfig`; merged with bundle collectors (file wins on name collision) +- `--collector-namespace` — Kubernetes namespace for Jobs (default: `kvirtbp-collectors`; created if absent) +- `--collector-timeout` — maximum time to wait for all collectors (default: `5m`) +- `--no-collector-cleanup` — keep completed Jobs after collection (useful for debugging) +- `--output` — path for the collector data JSON file (default: `collector-data.json`) + +Collector Jobs are deleted automatically after completion unless `--no-collector-cleanup` is set. A `TTLSecondsAfterFinished` of 300 seconds is also set on each Job as a safety net. + ## Runtime behavior Key runtime flags: @@ -29,6 +61,7 @@ Key runtime flags: - `--policy-bundle` - `--waiver-file` - `--show-runbook` +- `--collector-data` (inject pre-collected node/cluster data into `input.cluster.collectors`) Output modes: @@ -91,5 +124,9 @@ Waiver rules: - Apply PSA enforce labels and baseline NetworkPolicies. - Namespace guardrail failures: - Add ResourceQuota and LimitRange to targeted namespaces. +- Collector failures: + - Check Job status in the collector namespace: `kubectl get jobs -n kvirtbp-collectors` + - Use `--no-collector-cleanup` to retain failed Jobs for log inspection. + - Ensure the scanning identity has `batch/jobs` create/get/delete and `pods/log` get in the collector namespace. For remediation-specific playbooks, see `docs/runbooks.md`. diff --git a/docs/policy-authoring.md b/docs/policy-authoring.md index 1505d76..65cbef7 100644 --- a/docs/policy-authoring.md +++ b/docs/policy-authoring.md @@ -247,6 +247,68 @@ findings := array.concat( | Named partial arrays (`required_check_findings`, etc.) | Each rule produces an independent slice; `array.concat` merges them into `findings` | | `findings` entrypoint | Must be `data.kvirtbp.findings`; the engine reads exactly this path | +## Accessing collector data in Rego + +Collector output is injected into `input.cluster.collectors` when `--collector-data ` is passed to `scan`. The structure mirrors the file produced by `kvirtbp collect`: + +``` +input.cluster.collectors[""][""][""] = "" +``` + +- **`_cluster`** is the key used for `scope: once` (single cluster-wide Job) +- **node names** are keys for `scope: per-node` (one Job per node) + +Example policy reading a sysctl value: + +```rego +package kvirtbp + +import rego.v1 + +# Always start from input.cluster (guaranteed defined) — not input.cluster.collectors, +# which may be absent when --collector-data is not provided. +ip_forward := object.get( + object.get( + object.get( + object.get(input.cluster, "collectors", {}), + "sysctl", {}), + "_cluster", {}), +"net.ipv4.ip_forward", "0") + +findings := [{ + "checkId": "sec-ip-forward", + "title": "IP Forwarding Disabled", + "category": "security", + "severity": "warning", + "pass": ip_forward == "0", + "message": sprintf("net.ipv4.ip_forward = %s (expected 0)", [ip_forward]), +}] +``` + +> **Important:** `object.get` returns undefined — not the default — when its first argument is itself undefined. Always anchor the chain at a value that is guaranteed to be present in `input` (such as `input.cluster`) so the rule produces a result even when collector data is absent. + +For a per-node pattern: + +```rego +node_results := object.get(object.get(input.cluster, "collectors", {}), "sysctl", {}) + +findings := [finding | + node := input.cluster.nodes[_] + node_data := object.get(node_results, node.name, {}) + val := object.get(node_data, "net.ipv4.ip_forward", "0") + val != "0" + finding := { + "checkId": "sec-ip-forward", + "title": "IP Forwarding Disabled", + "category": "security", + "severity": "warning", + "pass": false, + "message": sprintf("node %s has net.ipv4.ip_forward=%s", [node.name, val]), + "evidence": {"node": node.name, "value": val}, + } +] +``` + ## Bundle layout A bundle directory can contain one or more `.rego` files and optional metadata. @@ -261,6 +323,9 @@ Example: - `schemaVersion` (currently `v1alpha1`) - `policyVersion` (informational) - `minBinaryVersion` (optional) +- `collectors` (optional) — array of `CollectorConfig` objects run automatically by `kvirtbp collect --bundle`; their output is injected into `input.cluster.collectors` at scan time + +See [docs/collectors.md](docs/collectors.md) for the full `CollectorConfig` schema and collector authoring guide. ## Go check authoring diff --git a/examples/collectors/hugepages/metadata.json b/examples/collectors/hugepages/metadata.json new file mode 100644 index 0000000..053a8a9 --- /dev/null +++ b/examples/collectors/hugepages/metadata.json @@ -0,0 +1,15 @@ +{ + "schemaVersion": "v1alpha1", + "policyVersion": "0.1.0", + "collectors": [ + { + "name": "hugepages", + "image": "alpine:3.21", + "commands": [ + "awk '/HugePages_Total/{t=$2} /HugePages_Free/{f=$2} /Hugepagesize/{s=$2} END{printf \"{\\\"total\\\":%d,\\\"free\\\":%d,\\\"size_kb\\\":%d}\\n\",t,f,s}' /proc/meminfo > /kvirtbp/output.json" + ], + "scope": "per-node", + "tolerations": [{"operator": "Exists"}] + } + ] +} diff --git a/examples/collectors/hugepages/policy.rego b/examples/collectors/hugepages/policy.rego new file mode 100644 index 0000000..121c314 --- /dev/null +++ b/examples/collectors/hugepages/policy.rego @@ -0,0 +1,200 @@ +# ============================================================================ +# hugepages reference collector bundle +# ============================================================================ +# +# Collector: "hugepages" (scope: per-node) +# Runs one Job per node using alpine:3.21. Each Job reads /proc/meminfo +# (which is not namespaced in Linux — it reflects the host's values) and +# emits a small JSON object with the node's hugepages configuration. +# +# No special privileges or host namespace mounts are required because +# /proc/meminfo is visible from any container without elevated access. +# +# Collected data shape (input.cluster.collectors["hugepages"]): +# { +# "worker-1": { "total": 512, "free": 480, "size_kb": 2048 }, +# "worker-2": { "total": 0, "free": 0, "size_kb": 2048 } +# } +# +# Fields: +# total — HugePages_Total: system-wide number of hugepages configured +# free — HugePages_Free: hugepages not yet allocated +# size_kb — Hugepagesize (in kB): typically 2048 (2 Mi) or 1048576 (1 Gi) +# +# Policies: +# prod-hugepages-collector-present — gates subsequent checks; warns if data absent +# prod-hugepages-configured — at least one node has hugepages enabled +# prod-hugepages-all-configured — every node has hugepages enabled (info) +# +# Usage: +# # Step 1: collect +# kvirtbp collect --bundle ./examples/collectors/hugepages --output collector-data.json +# +# # Step 2: scan +# kvirtbp scan --engine rego \ +# --policy-bundle ./examples/collectors/hugepages \ +# --collector-data collector-data.json +# +# # Combine with node-info in a single collect run — scan remains separate per bundle: +# kvirtbp collect \ +# --bundle ./examples/collectors/node-info \ +# --bundle ./examples/collectors/hugepages \ +# --output collector-data.json +# ============================================================================ + +package kvirtbp + +# --------------------------------------------------------------------------- +# Short-circuit: no cluster snapshot present (unit tests / dry-run). +# --------------------------------------------------------------------------- + +findings := [] { not input.cluster } +findings := cluster_findings { input.cluster } + +# --------------------------------------------------------------------------- +# Safe access helpers. +# --------------------------------------------------------------------------- + +hugepages_data := object.get( + object.get(input.cluster, "collectors", {}), + "hugepages", + {} +) + +# Nodes that reported at least one configured hugepage (total > 0). +nodes_with_hugepages := {name | + some name + hugepages_data[name] + object.get(hugepages_data[name], "total", 0) > 0 +} + +# All node names that sent back data (excludes _error sentinels). +nodes_reporting := {name | + some name + hugepages_data[name] + not startswith(name, "_") + not hugepages_data[name]._error +} + +# --------------------------------------------------------------------------- +# Guards. +# --------------------------------------------------------------------------- + +collector_data_present { + count(hugepages_data) > 0 +} + +some_hugepages_configured { + collector_data_present + count(nodes_with_hugepages) > 0 +} + +all_hugepages_configured { + collector_data_present + count(nodes_reporting) > 0 + count(nodes_with_hugepages) == count(nodes_reporting) +} + +no_hugepages_configured { + collector_data_present + count(nodes_with_hugepages) == 0 +} + +partial_hugepages_configured { + some_hugepages_configured + not all_hugepages_configured +} + +# --------------------------------------------------------------------------- +# Check 1: collector data is present +# --------------------------------------------------------------------------- + +collector_findings := [{ + "checkId": "prod-hugepages-collector-present", + "title": "Hugepages Collector Data Present", + "category": "production-readiness", + "severity": "info", + "pass": true, + "reasonCode": "prod.collector.hugepages.present", + "message": sprintf("hugepages collector data present for %d node(s)", [count(nodes_reporting)]), + "evidence": {"nodeCount": sprintf("%d", [count(nodes_reporting)])}, +}] { + collector_data_present +} + +collector_findings := [{ + "checkId": "prod-hugepages-collector-present", + "title": "Hugepages Collector Data Present", + "category": "production-readiness", + "severity": "warning", + "pass": false, + "reasonCode": "prod.collector.hugepages.absent", + "message": "hugepages collector data is absent; run 'kvirtbp collect' before scanning", + "remediation": "kvirtbp collect --bundle ./examples/collectors/hugepages --output collector-data.json", +}] { + not collector_data_present +} + +# --------------------------------------------------------------------------- +# Check 2: hugepages configured on all nodes +# +# KubeVirt VMs can use 2Mi or 1Gi hugepages to reduce TLB pressure and +# improve memory throughput for latency-sensitive workloads. Configuring +# hugepages on every node ensures VMs can be scheduled anywhere without +# capacity errors. +# --------------------------------------------------------------------------- + +hugepages_findings := [{ + "checkId": "prod-hugepages-configured", + "title": "Hugepages Configured on All Nodes", + "category": "production-readiness", + "severity": "info", + "pass": true, + "reasonCode": "prod.hugepages.all_configured", + "message": sprintf("hugepages configured on all %d reporting node(s)", [count(nodes_reporting)]), + "evidence": {"configuredNodes": sprintf("%d", [count(nodes_with_hugepages)])}, +}] { + all_hugepages_configured +} + +hugepages_findings := [{ + "checkId": "prod-hugepages-configured", + "title": "Hugepages Configured on All Nodes", + "category": "production-readiness", + "severity": "info", + "pass": false, + "reasonCode": "prod.hugepages.partial", + "message": sprintf("%d of %d node(s) have hugepages configured", [count(nodes_with_hugepages), count(nodes_reporting)]), + "evidence": { + "configuredNodes": sprintf("%d", [count(nodes_with_hugepages)]), + "reportingNodes": sprintf("%d", [count(nodes_reporting)]), + }, + "remediation": "Configure hugepages on all nodes for consistent KubeVirt VM scheduling. See: https://kubernetes.io/docs/tasks/manage-hugepages/scheduling-hugepages/", +}] { + partial_hugepages_configured +} + +hugepages_findings := [{ + "checkId": "prod-hugepages-configured", + "title": "Hugepages Configured on All Nodes", + "category": "production-readiness", + "severity": "info", + "pass": false, + "reasonCode": "prod.hugepages.none_configured", + "message": "no nodes have hugepages configured; KubeVirt VMs requesting hugepages will fail to schedule", + "remediation": "Configure 2Mi or 1Gi hugepages on cluster nodes to support KubeVirt VMs with hugepages memory. See: https://kubernetes.io/docs/tasks/manage-hugepages/scheduling-hugepages/", +}] { + no_hugepages_configured +} + +hugepages_findings := [] { + not all_hugepages_configured + not partial_hugepages_configured + not no_hugepages_configured +} + +# --------------------------------------------------------------------------- +# Entrypoint +# --------------------------------------------------------------------------- + +cluster_findings := array.concat(collector_findings, hugepages_findings) diff --git a/examples/collectors/hugepages/policy_test.rego b/examples/collectors/hugepages/policy_test.rego new file mode 100644 index 0000000..c554cc0 --- /dev/null +++ b/examples/collectors/hugepages/policy_test.rego @@ -0,0 +1,132 @@ +package kvirtbp_test + +import rego.v1 + +# --------------------------------------------------------------------------- +# Test helpers +# --------------------------------------------------------------------------- + +# All nodes have hugepages configured. +input_all_configured := { + "cluster": { + "nodes": [ + {"name": "worker-1"}, + {"name": "worker-2"}, + ], + "collectors": { + "hugepages": { + "worker-1": {"total": 512, "free": 480, "size_kb": 2048}, + "worker-2": {"total": 256, "free": 256, "size_kb": 2048}, + }, + }, + }, +} + +# Only one of two nodes has hugepages configured. +input_partial := { + "cluster": { + "nodes": [ + {"name": "worker-1"}, + {"name": "worker-2"}, + ], + "collectors": { + "hugepages": { + "worker-1": {"total": 512, "free": 480, "size_kb": 2048}, + "worker-2": {"total": 0, "free": 0, "size_kb": 2048}, + }, + }, + }, +} + +# No nodes have hugepages configured. +input_none_configured := { + "cluster": { + "nodes": [ + {"name": "worker-1"}, + {"name": "worker-2"}, + ], + "collectors": { + "hugepages": { + "worker-1": {"total": 0, "free": 0, "size_kb": 2048}, + "worker-2": {"total": 0, "free": 0, "size_kb": 2048}, + }, + }, + }, +} + +# No collector data at all. +input_no_collectors := { + "cluster": { + "nodes": [{"name": "worker-1"}], + }, +} + +# No cluster (dry-run / unit-test mode). +input_no_cluster := {} + +# --------------------------------------------------------------------------- +# findings are empty when there is no cluster +# --------------------------------------------------------------------------- + +test_no_cluster_findings_empty if { + findings := data.kvirtbp.findings with input as input_no_cluster + count(findings) == 0 +} + +# --------------------------------------------------------------------------- +# Check 1: collector data present / absent +# --------------------------------------------------------------------------- + +test_collector_present_pass if { + findings := data.kvirtbp.findings with input as input_all_configured + present := [f | f := findings[_]; f.checkId == "prod-hugepages-collector-present"] + count(present) == 1 + present[0].pass == true + present[0].reasonCode == "prod.collector.hugepages.present" +} + +test_collector_absent_fail if { + findings := data.kvirtbp.findings with input as input_no_collectors + present := [f | f := findings[_]; f.checkId == "prod-hugepages-collector-present"] + count(present) == 1 + present[0].pass == false + present[0].reasonCode == "prod.collector.hugepages.absent" +} + +# --------------------------------------------------------------------------- +# Check 2: hugepages configuration coverage +# --------------------------------------------------------------------------- + +test_all_configured_pass if { + findings := data.kvirtbp.findings with input as input_all_configured + hp := [f | f := findings[_]; f.checkId == "prod-hugepages-configured"] + count(hp) == 1 + hp[0].pass == true + hp[0].reasonCode == "prod.hugepages.all_configured" +} + +test_partial_configured_fail if { + findings := data.kvirtbp.findings with input as input_partial + hp := [f | f := findings[_]; f.checkId == "prod-hugepages-configured"] + count(hp) == 1 + hp[0].pass == false + hp[0].reasonCode == "prod.hugepages.partial" +} + +test_none_configured_fail if { + findings := data.kvirtbp.findings with input as input_none_configured + hp := [f | f := findings[_]; f.checkId == "prod-hugepages-configured"] + count(hp) == 1 + hp[0].pass == false + hp[0].reasonCode == "prod.hugepages.none_configured" +} + +# --------------------------------------------------------------------------- +# No collector data → hugepages check is skipped +# --------------------------------------------------------------------------- + +test_no_collector_hugepages_check_skipped if { + findings := data.kvirtbp.findings with input as input_no_collectors + hp := [f | f := findings[_]; f.checkId == "prod-hugepages-configured"] + count(hp) == 0 +} diff --git a/examples/collectors/node-info/metadata.json b/examples/collectors/node-info/metadata.json new file mode 100644 index 0000000..e82c116 --- /dev/null +++ b/examples/collectors/node-info/metadata.json @@ -0,0 +1,15 @@ +{ + "schemaVersion": "v1alpha1", + "policyVersion": "0.1.0", + "collectors": [ + { + "name": "node-info", + "image": "alpine:3.21", + "commands": [ + "printf '{\"kernel\":\"%s\",\"arch\":\"%s\"}\\n' \"$(uname -r)\" \"$(uname -m)\" > /kvirtbp/output.json" + ], + "scope": "per-node", + "tolerations": [{"operator": "Exists"}] + } + ] +} diff --git a/examples/collectors/node-info/policy.rego b/examples/collectors/node-info/policy.rego new file mode 100644 index 0000000..db55b28 --- /dev/null +++ b/examples/collectors/node-info/policy.rego @@ -0,0 +1,162 @@ +# ============================================================================ +# node-info reference collector bundle +# ============================================================================ +# +# Collector: "node-info" (scope: per-node) +# Runs one Job per node using alpine:3.21. Each Job writes the node's +# kernel version and CPU architecture to /kvirtbp/output.json with a +# single printf command — no extra packages required. +# +# Collected data shape (input.cluster.collectors["node-info"]): +# { +# "worker-1": { "kernel": "5.15.0-122-generic", "arch": "x86_64" }, +# "worker-2": { "kernel": "5.15.0-122-generic", "arch": "aarch64" } +# } +# +# Usage: +# # Step 1: collect +# kvirtbp collect --bundle ./examples/collectors/node-info --output collector-data.json +# +# # Step 2: scan +# kvirtbp scan --engine rego \ +# --policy-bundle ./examples/collectors/node-info \ +# --collector-data collector-data.json +# +# # Or from a tagged GitHub release in a single step: +# BUNDLE=https://github.com/myorg/policies/archive/refs/tags/v1.0.0.tar.gz +# kvirtbp collect --bundle $BUNDLE --bundle-subdir examples/collectors/node-info \ +# --output collector-data.json +# kvirtbp scan --engine rego --policy-bundle $BUNDLE \ +# --bundle-subdir examples/collectors/node-info \ +# --collector-data collector-data.json +# ============================================================================ + +package kvirtbp + +# --------------------------------------------------------------------------- +# Short-circuit: no cluster snapshot present (unit tests / dry-run). +# --------------------------------------------------------------------------- + +findings := [] { not input.cluster } +findings := cluster_findings { input.cluster } + +# --------------------------------------------------------------------------- +# Safe access helpers. +# +# Always anchor object.get chains at input.cluster (which is guaranteed to be +# defined when input.cluster is present) rather than at +# input.cluster.collectors, which may be absent when --collector-data is not +# provided. Passing an undefined first argument to object.get returns +# undefined, not the default value. +# --------------------------------------------------------------------------- + +node_info_data := object.get( + object.get(input.cluster, "collectors", {}), + "node-info", + {} +) + +# Set of distinct CPU architectures seen across all nodes that reported back. +node_architectures := {arch | + some node_name + node_info_data[node_name] + arch := object.get(node_info_data[node_name], "arch", "") + arch != "" +} + +# --------------------------------------------------------------------------- +# Guards — used to select which rule body fires below. +# --------------------------------------------------------------------------- + +collector_data_present { + count(node_info_data) > 0 +} + +arch_consistent { + collector_data_present + count(node_architectures) == 1 +} + +arch_mixed { + collector_data_present + count(node_architectures) > 1 +} + +# --------------------------------------------------------------------------- +# Check 1: collector data is present +# +# This check acts as a pre-condition gate: if the user forgot to run +# 'kvirtbp collect' it fails loudly so subsequent checks can be skipped +# rather than silently producing vacuous pass results. +# --------------------------------------------------------------------------- + +collector_findings := [{ + "checkId": "prod-node-info-collector-present", + "title": "Node Info Collector Data Present", + "category": "production-readiness", + "severity": "info", + "pass": true, + "reasonCode": "prod.collector.node-info.present", + "message": sprintf("node-info collector data present for %d node(s)", [count(node_info_data)]), + "evidence": {"nodeCount": sprintf("%d", [count(node_info_data)])}, +}] { + collector_data_present +} + +collector_findings := [{ + "checkId": "prod-node-info-collector-present", + "title": "Node Info Collector Data Present", + "category": "production-readiness", + "severity": "warning", + "pass": false, + "reasonCode": "prod.collector.node-info.absent", + "message": "node-info collector data is absent; run 'kvirtbp collect' before scanning", + "remediation": "kvirtbp collect --bundle ./examples/collectors/node-info --output collector-data.json", +}] { + not collector_data_present +} + +# --------------------------------------------------------------------------- +# Check 2: all nodes report the same CPU architecture +# +# Mixed architectures in a KubeVirt cluster can cause VM scheduling failures +# when the guest image is incompatible with the host CPU architecture. +# --------------------------------------------------------------------------- + +arch_findings := [{ + "checkId": "prod-node-arch-consistent", + "title": "Node CPU Architecture Consistency", + "category": "production-readiness", + "severity": "info", + "pass": true, + "reasonCode": "prod.node.arch.consistent", + "message": sprintf("all nodes report a consistent CPU architecture: %s", [concat(", ", node_architectures)]), + "evidence": {"architecture": concat(", ", node_architectures)}, +}] { + arch_consistent +} + +arch_findings := [{ + "checkId": "prod-node-arch-consistent", + "title": "Node CPU Architecture Consistency", + "category": "production-readiness", + "severity": "warning", + "pass": false, + "reasonCode": "prod.node.arch.mixed", + "message": sprintf("mixed CPU architectures detected across nodes: %s", [concat(", ", node_architectures)]), + "evidence": {"architectures": concat(", ", node_architectures)}, + "remediation": "KubeVirt VM scheduling may fail if the guest image is incompatible with the host CPU architecture. Ensure nodes share a common architecture for KubeVirt workloads.", +}] { + arch_mixed +} + +arch_findings := [] { + not arch_consistent + not arch_mixed +} + +# --------------------------------------------------------------------------- +# Entrypoint +# --------------------------------------------------------------------------- + +cluster_findings := array.concat(collector_findings, arch_findings) diff --git a/examples/collectors/node-info/policy_test.rego b/examples/collectors/node-info/policy_test.rego new file mode 100644 index 0000000..e2c8afb --- /dev/null +++ b/examples/collectors/node-info/policy_test.rego @@ -0,0 +1,109 @@ +package kvirtbp_test + +import rego.v1 + +# --------------------------------------------------------------------------- +# Test helpers +# --------------------------------------------------------------------------- + +# Minimal input with collector data present (two nodes, same arch). +input_with_data := { + "cluster": { + "nodes": [ + {"name": "worker-1"}, + {"name": "worker-2"}, + ], + "collectors": { + "node-info": { + "worker-1": {"kernel": "5.15.0-122-generic", "arch": "x86_64"}, + "worker-2": {"kernel": "5.15.0-118-generic", "arch": "x86_64"}, + }, + }, + }, +} + +# Input with mixed architectures. +input_mixed_arch := { + "cluster": { + "nodes": [ + {"name": "worker-1"}, + {"name": "worker-2"}, + ], + "collectors": { + "node-info": { + "worker-1": {"kernel": "5.15.0-122-generic", "arch": "x86_64"}, + "worker-2": {"kernel": "6.1.0-28-arm64", "arch": "aarch64"}, + }, + }, + }, +} + +# Input with no collector data injected. +input_no_collectors := { + "cluster": { + "nodes": [{"name": "worker-1"}], + }, +} + +# No cluster at all (dry-run / unit test mode). +input_no_cluster := {} + +# --------------------------------------------------------------------------- +# findings are empty when no cluster +# --------------------------------------------------------------------------- + +test_no_cluster_findings_empty if { + findings := data.kvirtbp.findings with input as input_no_cluster + count(findings) == 0 +} + +# --------------------------------------------------------------------------- +# Collector data present — check 1 +# --------------------------------------------------------------------------- + +test_collector_present_pass if { + findings := data.kvirtbp.findings with input as input_with_data + present_findings := [f | f := findings[_]; f.checkId == "prod-node-info-collector-present"] + count(present_findings) == 1 + present_findings[0].pass == true + present_findings[0].reasonCode == "prod.collector.node-info.present" +} + +test_collector_absent_fail if { + findings := data.kvirtbp.findings with input as input_no_collectors + present_findings := [f | f := findings[_]; f.checkId == "prod-node-info-collector-present"] + count(present_findings) == 1 + present_findings[0].pass == false + present_findings[0].reasonCode == "prod.collector.node-info.absent" +} + +# --------------------------------------------------------------------------- +# Architecture consistency — check 2 +# --------------------------------------------------------------------------- + +test_arch_consistent_pass if { + findings := data.kvirtbp.findings with input as input_with_data + arch_findings := [f | f := findings[_]; f.checkId == "prod-node-arch-consistent"] + count(arch_findings) == 1 + arch_findings[0].pass == true + arch_findings[0].reasonCode == "prod.node.arch.consistent" + arch_findings[0].evidence.architecture == "x86_64" +} + +test_arch_mixed_fail if { + findings := data.kvirtbp.findings with input as input_mixed_arch + arch_findings := [f | f := findings[_]; f.checkId == "prod-node-arch-consistent"] + count(arch_findings) == 1 + arch_findings[0].pass == false + arch_findings[0].reasonCode == "prod.node.arch.mixed" +} + +# --------------------------------------------------------------------------- +# No collector data → arch check is skipped (only collector-present emitted) +# --------------------------------------------------------------------------- + +test_no_collector_arch_check_skipped if { + findings := data.kvirtbp.findings with input as input_no_collectors + arch_findings := [f | f := findings[_]; f.checkId == "prod-node-arch-consistent"] + count(arch_findings) == 0 +} diff --git a/internal/bundle/remote.go b/internal/bundle/remote.go new file mode 100644 index 0000000..8b2f0e2 --- /dev/null +++ b/internal/bundle/remote.go @@ -0,0 +1,245 @@ +// Package bundle resolves a policy bundle path to a local directory. +// Paths that start with "http://" or "https://" are treated as remote tarballs +// (gzip-compressed tar archives) and are downloaded to a temporary directory. +// Local paths are returned unchanged. +package bundle + +import ( + "archive/tar" + "compress/gzip" + "context" + "fmt" + "io" + "net/http" + "net/url" + "os" + "path/filepath" + "sort" + "strings" +) +// Resolve returns the local directory path for the bundle at rawPath. +// +// If rawPath is a local filesystem path it is returned as-is and cleanup is a +// no-op. If rawPath starts with "http://" or "https://" the tarball is +// downloaded, unpacked into a temporary directory, and cleanup deletes that +// directory. The caller must always call cleanup() when done. +// +// subdir is an optional subdirectory within the archive root that contains the +// bundle's metadata.json (useful for monorepos where the bundle lives under +// e.g. "policy/baseline"). When empty the archive root is used. +func Resolve(ctx context.Context, rawPath, subdir string) (dir string, cleanup func(), err error) { + noop := func() {} + + if !isURL(rawPath) { + if subdir != "" { + return filepath.Join(rawPath, subdir), noop, nil + } + return rawPath, noop, nil + } + + tmpDir, err := os.MkdirTemp("", "kvirtbp-bundle-*") + if err != nil { + return "", noop, fmt.Errorf("create temp dir: %w", err) + } + cleanup = func() { _ = os.RemoveAll(tmpDir) } + + if err := fetchAndUnpack(ctx, rawPath, tmpDir); err != nil { + cleanup() + return "", noop, fmt.Errorf("fetch bundle %q: %w", rawPath, err) + } + + // GitHub and most Git hosting platforms wrap the archive contents in a + // single top-level directory (e.g. "repo-v1.2.0/"). Strip it so that + // tmpDir itself — or tmpDir/subdir — contains the bundle files. + stripped, err := stripTopDir(tmpDir) + if err != nil { + cleanup() + return "", noop, err + } + + if subdir != "" { + stripped = filepath.Join(stripped, subdir) + } + + // Verify the resolved directory exists in the unpacked archive. + if _, statErr := os.Stat(stripped); statErr != nil { + cleanup() + return "", noop, fmt.Errorf("bundle subdir %q not found in archive", stripped) + } + + return stripped, cleanup, nil +} + +// isURL returns true when s looks like an http or https URL. +func isURL(s string) bool { + u, err := url.Parse(s) + if err != nil { + return false + } + return u.Scheme == "http" || u.Scheme == "https" +} + +// fetchAndUnpack downloads a .tar.gz archive from rawURL and extracts it into +// destDir. Only regular files and directories are extracted; symlinks, +// hard-links, and device files are skipped for security. +func fetchAndUnpack(ctx context.Context, rawURL, destDir string) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, rawURL, nil) + if err != nil { + return fmt.Errorf("build request: %w", err) + } + req.Header.Set("User-Agent", "kvirtbp") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("download: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected HTTP status %d", resp.StatusCode) + } + + gr, err := gzip.NewReader(resp.Body) + if err != nil { + return fmt.Errorf("open gzip stream: %w", err) + } + defer gr.Close() + + tr := tar.NewReader(gr) + for { + hdr, err := tr.Next() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("read tar entry: %w", err) + } + + // Security: reject any path that would escape destDir. + cleanName := filepath.Clean(hdr.Name) + if strings.HasPrefix(cleanName, "..") { + return fmt.Errorf("archive contains unsafe path %q", hdr.Name) + } + + target := filepath.Join(destDir, cleanName) + + // Ensure the resolved path is still inside destDir. + if !strings.HasPrefix(target, filepath.Clean(destDir)+string(os.PathSeparator)) { + return fmt.Errorf("archive path %q escapes destination", hdr.Name) + } + + switch hdr.Typeflag { + case tar.TypeDir: + if err := os.MkdirAll(target, 0o750); err != nil { + return fmt.Errorf("create dir %q: %w", target, err) + } + case tar.TypeReg: + if err := os.MkdirAll(filepath.Dir(target), 0o750); err != nil { + return fmt.Errorf("create parent dir for %q: %w", target, err) + } + f, err := os.OpenFile(target, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.FileMode(hdr.Mode)&0o777) + if err != nil { + return fmt.Errorf("create file %q: %w", target, err) + } + if _, err := io.Copy(f, tr); err != nil { //nolint:gosec // size bounded by server + f.Close() + return fmt.Errorf("write file %q: %w", target, err) + } + f.Close() + // Intentionally skip symlinks, hard-links, and special files. + } + } + + return nil +} + +// stripTopDir checks whether all entries inside dir share a single common +// top-level subdirectory (the GitHub-style wrapper directory). If they do it +// returns that subdirectory path. If the bundle is already flat it returns dir +// unchanged. +func stripTopDir(dir string) (string, error) { + entries, err := os.ReadDir(dir) + if err != nil { + return "", fmt.Errorf("read temp dir: %w", err) + } + + if len(entries) == 1 && entries[0].IsDir() { + return filepath.Join(dir, entries[0].Name()), nil + } + return dir, nil +} + +// SubBundles returns the bundle directory paths within dir. +// +// If dir itself contains a metadata.json it is considered a single bundle and +// returned as a one-element slice. Otherwise every immediate subdirectory of +// dir that contains a metadata.json is returned in sorted (alphabetical) order. +// Returns nil (no error) when no bundles are found. +func SubBundles(dir string) ([]string, error) { + // Check whether dir itself is a bundle. + if _, err := os.Stat(filepath.Join(dir, "metadata.json")); err == nil { + return []string{dir}, nil + } + + // Otherwise scan for bundle subdirectories. + entries, err := os.ReadDir(dir) + if err != nil { + return nil, fmt.Errorf("read bundle directory %q: %w", dir, err) + } + var bundles []string + for _, e := range entries { + if !e.IsDir() { + continue + } + sub := filepath.Join(dir, e.Name()) + if _, statErr := os.Stat(filepath.Join(sub, "metadata.json")); statErr == nil { + bundles = append(bundles, sub) + } + } + sort.Strings(bundles) + return bundles, nil +} + +// SaveDir copies the bundle directory at src to dst, creating dst if needed. +// It performs a recursive copy of all regular files and directories; symlinks +// and special files are skipped. dst must not already exist. +func SaveDir(src, dst string) error { + return filepath.Walk(src, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + rel, err := filepath.Rel(src, path) + if err != nil { + return err + } + target := filepath.Join(dst, rel) + + if info.IsDir() { + return os.MkdirAll(target, 0o750) + } + + if !info.Mode().IsRegular() { + return nil // skip symlinks, device files, etc. + } + + if err := os.MkdirAll(filepath.Dir(target), 0o750); err != nil { + return err + } + + in, err := os.Open(path) + if err != nil { + return err + } + defer in.Close() + + out, err := os.OpenFile(target, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, info.Mode()&0o777) + if err != nil { + return err + } + defer out.Close() + + _, err = io.Copy(out, in) + return err + }) +} diff --git a/internal/bundle/remote_test.go b/internal/bundle/remote_test.go new file mode 100644 index 0000000..2de4db7 --- /dev/null +++ b/internal/bundle/remote_test.go @@ -0,0 +1,244 @@ +package bundle_test + +import ( + "archive/tar" + "compress/gzip" + "context" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" + + "github.com/phenixblue/kvirtbp/internal/bundle" +) + +// buildTarGz writes a minimal .tar.gz archive containing the given files +// (path → content) into a temp file and returns the file path. +func buildTarGz(t *testing.T, topDir string, files map[string]string) string { + t.Helper() + f, err := os.CreateTemp(t.TempDir(), "bundle-*.tar.gz") + if err != nil { + t.Fatalf("create temp file: %v", err) + } + defer f.Close() + + gw := gzip.NewWriter(f) + tw := tar.NewWriter(gw) + + // Top-level wrapper directory (GitHub-style). + if topDir != "" { + if err := tw.WriteHeader(&tar.Header{ + Typeflag: tar.TypeDir, + Name: topDir + "/", + Mode: 0o750, + }); err != nil { + t.Fatalf("write dir header: %v", err) + } + } + + for name, content := range files { + fullName := name + if topDir != "" { + fullName = topDir + "/" + name + } + if err := tw.WriteHeader(&tar.Header{ + Typeflag: tar.TypeReg, + Name: fullName, + Size: int64(len(content)), + Mode: 0o640, + }); err != nil { + t.Fatalf("write file header %q: %v", name, err) + } + if _, err := tw.Write([]byte(content)); err != nil { + t.Fatalf("write file %q: %v", name, err) + } + } + + if err := tw.Close(); err != nil { + t.Fatalf("close tar: %v", err) + } + if err := gw.Close(); err != nil { + t.Fatalf("close gzip: %v", err) + } + + return f.Name() +} + +// serveFile returns a test HTTP server that serves the file at path. +func serveFile(t *testing.T, path string) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.ServeFile(w, r, path) + })) +} + +// ---- Tests for local paths ---- + +func TestResolve_LocalPath_NoSubdir(t *testing.T) { + dir := t.TempDir() + got, cleanup, err := bundle.Resolve(context.Background(), dir, "") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer cleanup() + if got != dir { + t.Errorf("want %q, got %q", dir, got) + } +} + +func TestResolve_LocalPath_WithSubdir(t *testing.T) { + parent := t.TempDir() + sub := filepath.Join(parent, "policy", "baseline") + if err := os.MkdirAll(sub, 0o750); err != nil { + t.Fatalf("mkdir: %v", err) + } + + got, cleanup, err := bundle.Resolve(context.Background(), parent, "policy/baseline") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer cleanup() + want := filepath.Join(parent, "policy", "baseline") + if got != want { + t.Errorf("want %q, got %q", want, got) + } +} + +// ---- Tests for remote tarball ---- + +func TestResolve_RemoteTarball_FlatRoot(t *testing.T) { + archivePath := buildTarGz(t, "", map[string]string{ + "metadata.json": `{"schemaVersion":"v1alpha1"}`, + "policy.rego": `package kvirtbp`, + }) + srv := serveFile(t, archivePath) + defer srv.Close() + + dir, cleanup, err := bundle.Resolve(context.Background(), srv.URL, "") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer cleanup() + + if _, statErr := os.Stat(filepath.Join(dir, "metadata.json")); statErr != nil { + t.Errorf("expected metadata.json in resolved dir %q: %v", dir, statErr) + } + if _, statErr := os.Stat(filepath.Join(dir, "policy.rego")); statErr != nil { + t.Errorf("expected policy.rego in resolved dir %q: %v", dir, statErr) + } +} + +func TestResolve_RemoteTarball_GitHubStyleTopDir(t *testing.T) { + archivePath := buildTarGz(t, "my-repo-v1.0.0", map[string]string{ + "metadata.json": `{"schemaVersion":"v1alpha1"}`, + "policy.rego": `package kvirtbp`, + }) + srv := serveFile(t, archivePath) + defer srv.Close() + + dir, cleanup, err := bundle.Resolve(context.Background(), srv.URL, "") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer cleanup() + + // The wrapper dir should have been stripped. + if _, statErr := os.Stat(filepath.Join(dir, "metadata.json")); statErr != nil { + t.Errorf("expected metadata.json after top-dir strip in %q: %v", dir, statErr) + } +} + +func TestResolve_RemoteTarball_WithSubdir(t *testing.T) { + archivePath := buildTarGz(t, "my-repo-v1.0.0", map[string]string{ + "policy/baseline/metadata.json": `{"schemaVersion":"v1alpha1"}`, + "policy/baseline/policy.rego": `package kvirtbp`, + }) + srv := serveFile(t, archivePath) + defer srv.Close() + + dir, cleanup, err := bundle.Resolve(context.Background(), srv.URL, "policy/baseline") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer cleanup() + + if _, statErr := os.Stat(filepath.Join(dir, "metadata.json")); statErr != nil { + t.Errorf("expected metadata.json in subdir %q: %v", dir, statErr) + } +} + +func TestResolve_RemoteTarball_MissingSubdir(t *testing.T) { + archivePath := buildTarGz(t, "", map[string]string{ + "metadata.json": `{"schemaVersion":"v1alpha1"}`, + }) + srv := serveFile(t, archivePath) + defer srv.Close() + + _, cleanup, err := bundle.Resolve(context.Background(), srv.URL, "does/not/exist") + cleanup() // always call even on error + if err == nil { + t.Fatal("expected error for missing subdir, got nil") + } +} + +func TestResolve_RemoteTarball_HTTP404(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.NotFound(w, r) + })) + defer srv.Close() + + _, cleanup, err := bundle.Resolve(context.Background(), srv.URL, "") + cleanup() + if err == nil { + t.Fatal("expected error for HTTP 404, got nil") + } +} + +func TestResolve_RemoteTarball_ContextCancelled(t *testing.T) { + // Server that blocks until it is shut down. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-r.Context().Done() + })) + defer srv.Close() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately + + _, cleanup, err := bundle.Resolve(ctx, srv.URL, "") + cleanup() + if err == nil { + t.Fatal("expected error for cancelled context, got nil") + } +} + +func TestResolve_CleanupDeletesTempDir(t *testing.T) { + archivePath := buildTarGz(t, "", map[string]string{ + "metadata.json": `{}`, + }) + srv := serveFile(t, archivePath) + defer srv.Close() + + dir, cleanup, err := bundle.Resolve(context.Background(), srv.URL, "") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Verify temp dir exists before cleanup. + if _, statErr := os.Stat(dir); statErr != nil { + t.Fatalf("expected temp dir %q to exist before cleanup: %v", dir, statErr) + } + + cleanup() + + // After cleanup the directory (or its parent temp container) should be gone. + // The actual resolved dir may be a subdirectory; check the parent temp root. + parent := filepath.Dir(dir) + if _, statErr := os.Stat(parent); statErr == nil { + // parent may legitimately still exist if it is the OS temp dir itself; + // check that the specific kvirtbp-bundle-* dir is gone. + if _, statErr2 := os.Stat(dir); statErr2 == nil { + t.Errorf("expected temp dir %q to be deleted after cleanup", dir) + } + } +} diff --git a/internal/cli/collect.go b/internal/cli/collect.go new file mode 100644 index 0000000..9302956 --- /dev/null +++ b/internal/cli/collect.go @@ -0,0 +1,238 @@ +package cli + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/phenixblue/kvirtbp/internal/bundle" + "github.com/phenixblue/kvirtbp/internal/collector" + regoengine "github.com/phenixblue/kvirtbp/internal/eval/rego" + "github.com/phenixblue/kvirtbp/internal/kube" + "github.com/spf13/cobra" +) + + +func newCollectCmd(kubeconfigPath *string, kubeContext *string) *cobra.Command { + var policyBundles []string + var collectorConfigFiles []string + var collectorNamespace string + var collectorTimeout time.Duration + var noCleanup bool + var outputFile string + var bundleSubdir string + var saveBundle string + + cmd := &cobra.Command{ + Use: "collect", + Short: "Run collector Jobs on the cluster and write collector data to a file", + Long: `collect deploys short-lived Kubernetes Jobs to gather node or cluster-scope +data that Rego policies can reference via input.cluster.collectors. + +The collected data is written to a JSON file (default: collector-data.json) +that can be passed to 'kvirtbp scan --collector-data '.`, + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithTimeout(cmd.Context(), collectorTimeout) + defer cancel() + + // Resolve each --bundle (download + unpack if URL). Track the single + // resolved path used for --save-bundle: when there is exactly one + // bundle we save that directory; multiple bundles are not merged into + // a single directory (use --collector-config for that). + resolvedBundles := make([]string, 0, len(policyBundles)) + for _, rawBundle := range policyBundles { + dir, cleanup, resolveErr := bundle.Resolve(ctx, rawBundle, bundleSubdir) + if resolveErr != nil { + return fmt.Errorf("resolve bundle %q: %w", rawBundle, resolveErr) + } + defer cleanup() //nolint:gocritic // intentional: each cleanup deferred + resolvedBundles = append(resolvedBundles, dir) + } + + // Persist bundle(s) before temp-dir cleanup runs when --save-bundle is set. + // Single bundle: save directly to saveBundle. + // Multiple bundles: save each to saveBundle/bundle-0/, bundle-1/, … + var persistedBundlePaths []string + if saveBundle != "" { + for i, resolved := range resolvedBundles { + dest := saveBundle + if len(resolvedBundles) > 1 { + dest = filepath.Join(saveBundle, fmt.Sprintf("bundle-%d", i)) + } + if err := bundle.SaveDir(resolved, dest); err != nil { + return fmt.Errorf("save bundle %d to %q: %w", i, dest, err) + } + persistedBundlePaths = append(persistedBundlePaths, dest) + } + } else { + // No --save-bundle: record local paths so scan can reuse them. + // Remote URLs are skipped — they can't be referenced without saving. + for _, raw := range policyBundles { + if !isRemoteURL(raw) { + persistedBundlePaths = append(persistedBundlePaths, raw) + } + } + } + + configs, err := resolveCollectorConfigs(resolvedBundles, collectorConfigFiles) + if err != nil { + return err + } + if len(configs) == 0 { + return fmt.Errorf("no collector configs found; provide --bundle or --collector-config") + } + + clients, err := kube.NewClients(kube.Options{ + KubeconfigPath: *kubeconfigPath, + Context: *kubeContext, + }) + if err != nil { + return fmt.Errorf("connect to cluster: %w", err) + } + + if err := ensureNamespace(ctx, clients, collectorNamespace); err != nil { + return fmt.Errorf("ensure namespace %q: %w", collectorNamespace, err) + } + + opts := collector.RunOptions{ + Namespace: collectorNamespace, + GlobalTimeout: collectorTimeout, + SkipCleanup: noCleanup, + } + + data, err := runCollectors(ctx, clients, configs, opts) + if err != nil { + return err + } + + meta := collector.CollectorMeta{BundlePaths: persistedBundlePaths} + result := collector.NewCollectorResult(data, meta) + return writeCollectorResult(outputFile, result) + }, + } + + cmd.Flags().StringArrayVar(&policyBundles, "bundle", nil, "Path or HTTPS URL to a policy bundle (repeatable); collector configs from each bundle's metadata.json are merged") + cmd.Flags().StringVar(&bundleSubdir, "bundle-subdir", "", "Subdirectory within the bundle archive that contains metadata.json (for monorepo layouts)") + cmd.Flags().StringVar(&saveBundle, "save-bundle", "", "Persist resolved bundle(s) to this path. Single bundle: saved directly. Multiple bundles: saved to /bundle-0/, bundle-1/, …") + cmd.Flags().StringArrayVar(&collectorConfigFiles, "collector-config", nil, "Path to a JSON file containing []CollectorConfig (repeatable); merged after bundle configs, later files win") + cmd.Flags().StringVar(&collectorNamespace, "collector-namespace", "kvirtbp-collectors", "Kubernetes namespace for collector Jobs") + cmd.Flags().DurationVar(&collectorTimeout, "collector-timeout", 5*time.Minute, "Maximum time to wait for all collectors to finish") + cmd.Flags().BoolVar(&noCleanup, "no-collector-cleanup", false, "Keep collector Jobs after completion (useful for debugging)") + cmd.Flags().StringVar(&outputFile, "output", "collector-data.json", "Path to write the collector data JSON file") + + return cmd +} + +// resolveCollectorConfigs loads collectors from one or more bundles and/or +// standalone config files and returns the merged list. +// Bundle configs are merged left-to-right (later bundle wins on collision). +// Config-file configs are then merged on top (config files always win). +func resolveCollectorConfigs(bundlePaths, configFiles []string) ([]collector.CollectorConfig, error) { + bundleSlices := make([][]collector.CollectorConfig, 0, len(bundlePaths)) + for _, bp := range bundlePaths { + configs, err := regoengine.CollectorsFromBundle(bp) + if err != nil { + return nil, fmt.Errorf("load bundle collectors from %q: %w", bp, err) + } + bundleSlices = append(bundleSlices, configs) + } + mergedBundles := collector.MergeAll(bundleSlices...) + + fileSlices := make([][]collector.CollectorConfig, 0, len(configFiles)) + for _, cf := range configFiles { + b, err := os.ReadFile(cf) + if err != nil { + return nil, fmt.Errorf("read collector config %q: %w", cf, err) + } + var cfgList []collector.CollectorConfig + if err := json.Unmarshal(b, &cfgList); err != nil { + return nil, fmt.Errorf("decode collector config %q: %w", cf, err) + } + fileSlices = append(fileSlices, cfgList) + } + mergedFiles := collector.MergeAll(fileSlices...) + + return collector.MergeCollectorConfigs(mergedBundles, mergedFiles), nil +} + +// isRemoteURL returns true when s is an http/https URL. +func isRemoteURL(s string) bool { + return len(s) > 7 && (s[:7] == "http://" || (len(s) > 8 && s[:8] == "https://")) +} + +// ensureNamespace creates ns if it does not already exist. +func ensureNamespace(ctx context.Context, clients *kube.Clients, ns string) error { + _, err := clients.Core.CoreV1().Namespaces().Get(ctx, ns, metav1.GetOptions{}) + if err == nil { + return nil // already exists + } + if !errors.IsNotFound(err) { + return fmt.Errorf("get namespace %q: %w", ns, err) + } + + namespace := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: ns, + Labels: map[string]string{ + "app.kubernetes.io/managed-by": "kvirtbp", + }, + }, + } + _, createErr := clients.Core.CoreV1().Namespaces().Create(ctx, namespace, metav1.CreateOptions{}) + if createErr != nil && !errors.IsAlreadyExists(createErr) { + return fmt.Errorf("create namespace %q: %w", ns, createErr) + } + return nil +} + +// runCollectors executes all collectors concurrently and aggregates results. +// The returned map is map[collectorName]map[nodeNameOrCluster]map[string]any. +func runCollectors(ctx context.Context, clients *kube.Clients, configs []collector.CollectorConfig, opts collector.RunOptions) (map[string]any, error) { + var ( + mu sync.Mutex + wg sync.WaitGroup + result = make(map[string]any, len(configs)) + runErr error + ) + + for _, cfg := range configs { + c := collector.NewJobCollector(cfg) + wg.Add(1) + go func() { + defer wg.Done() + data, err := c.Collect(ctx, clients, opts) + mu.Lock() + defer mu.Unlock() + if err != nil { + if runErr == nil { + runErr = fmt.Errorf("collector %q: %w", c.Name(), err) + } + return + } + result[c.Name()] = data + }() + } + wg.Wait() + return result, runErr +} + +// writeCollectorResult encodes a CollectorResult as indented JSON and writes +// it to filePath. +func writeCollectorResult(filePath string, result collector.CollectorResult) error { + b, err := json.MarshalIndent(result, "", " ") + if err != nil { + return fmt.Errorf("encode collector data: %w", err) + } + if err := os.WriteFile(filePath, b, 0o600); err != nil { + return fmt.Errorf("write collector data to %q: %w", filePath, err) + } + return nil +} diff --git a/internal/cli/root.go b/internal/cli/root.go index 9adfc81..bdf5421 100644 --- a/internal/cli/root.go +++ b/internal/cli/root.go @@ -34,6 +34,7 @@ func NewRootCmd() *cobra.Command { root.PersistentFlags().StringVar(&kubeContext, "context", "", "Kubernetes context override") root.AddCommand(newScanCmd(&outputFlag, &kubeconfigPath, &kubeContext)) + root.AddCommand(newCollectCmd(&kubeconfigPath, &kubeContext)) root.AddCommand(newChecksCmd()) root.AddCommand(newRunbookCmd()) root.AddCommand(&cobra.Command{ diff --git a/internal/cli/scan.go b/internal/cli/scan.go index 62b254a..89b36fd 100644 --- a/internal/cli/scan.go +++ b/internal/cli/scan.go @@ -4,11 +4,14 @@ import ( "context" "crypto/sha256" "encoding/hex" + "encoding/json" "fmt" "os" "time" + "github.com/phenixblue/kvirtbp/internal/bundle" "github.com/phenixblue/kvirtbp/internal/checks" + "github.com/phenixblue/kvirtbp/internal/collector" "github.com/phenixblue/kvirtbp/internal/eval" "github.com/phenixblue/kvirtbp/internal/eval/goeval" regoengine "github.com/phenixblue/kvirtbp/internal/eval/rego" @@ -45,6 +48,14 @@ func newScanCmd(outputFlag *string, kubeconfigPath *string, kubeContext *string) var showRunbook bool var waiverFile string var resourceTypes []string + var collectorDataFiles []string + var bundleSubdir string + var collectorBundles []string + var collectorConfigFiles []string + var collectorNamespace string + var collectorTimeout time.Duration + var noCollectorCleanup bool + var noAutoBundle bool cmd := &cobra.Command{ Use: "scan", @@ -56,6 +67,47 @@ func newScanCmd(outputFlag *string, kubeconfigPath *string, kubeContext *string) return err } + // Load collector-data files first so we can extract _meta.bundlePath + // for auto-bundle discovery before resolving --policy-bundle. + var collectorData map[string]any + var collectorMeta collector.CollectorMeta + if len(collectorDataFiles) > 0 { + cd, meta, loadErr := loadAndMergeCollectorData(collectorDataFiles) + if loadErr != nil { + return loadErr + } + collectorData = cd + collectorMeta = meta + } + + // resolvedBundles is the list of local bundle directories to evaluate. + // Populated from --policy-bundle (resolved then sub-bundle detected) or + // from _meta.bundlePaths in the collector-data file(s). + var resolvedBundles []string + if policyBundle != "" { + dir, cleanup, resolveErr := bundle.Resolve(cmd.Context(), policyBundle, bundleSubdir) + if resolveErr != nil { + return fmt.Errorf("resolve policy bundle: %w", resolveErr) + } + defer cleanup() + subs, splitErr := bundle.SubBundles(dir) + if splitErr != nil { + return fmt.Errorf("detect sub-bundles in %q: %w", dir, splitErr) + } + resolvedBundles = subs + } else if !noAutoBundle && len(collectorMeta.BundlePaths) > 0 { + for _, p := range collectorMeta.BundlePaths { + subs, splitErr := bundle.SubBundles(p) + if splitErr != nil { + return fmt.Errorf("detect sub-bundles in %q: %w", p, splitErr) + } + resolvedBundles = append(resolvedBundles, subs...) + } + if len(resolvedBundles) > 0 { + fmt.Fprintf(cmd.ErrOrStderr(), "info: using %d bundle(s) from collector data\n", len(resolvedBundles)) + } + } + parsedSeverities, err := checks.ParseSeverities(severities) if err != nil { return err @@ -76,12 +128,12 @@ func newScanCmd(outputFlag *string, kubeconfigPath *string, kubeContext *string) return err } - // Merge resource types from --resource flag and bundle metadata. + // Merge resource types from --resource flag and all bundle metadata. mergedResourceTypes := append([]string(nil), resourceTypes...) - if policyBundle != "" { - bundleResources, err := regoengine.ResourceTypesFromBundle(policyBundle) + for _, bp := range resolvedBundles { + bundleResources, err := regoengine.ResourceTypesFromBundle(bp) if err != nil { - return fmt.Errorf("reading bundle resource types: %w", err) + return fmt.Errorf("reading bundle resource types from %q: %w", bp, err) } mergedResourceTypes = mergeResourceTypes(mergedResourceTypes, bundleResources) } @@ -104,15 +156,58 @@ func newScanCmd(outputFlag *string, kubeconfigPath *string, kubeContext *string) snap = kube.BuildClusterSnapshot(ctx, clients, preflightOpts) } - result, err := evaluator.Evaluate(ctx, eval.RunRequest{ - Registry: checks.DefaultChecks(), - Filter: filter, - PolicyFile: policyFile, - PolicyBundle: policyBundle, - ClusterSnapshot: &snap, - }) + // Seed snap.Collectors from pre-collected data files. + if collectorData != nil { + snap.Collectors = collectorData + } + + // Run inline custom collectors (--collector-bundle / --collector-config) + // and merge their results on top, overwriting any stale file data. + inlineConfigs, err := resolveCollectorConfigs(collectorBundles, collectorConfigFiles) if err != nil { - return err + return fmt.Errorf("resolve inline collector configs: %w", err) + } + if len(inlineConfigs) > 0 { + if clients == nil { + return fmt.Errorf("inline collectors require a live cluster connection") + } + if snap.Collectors == nil { + snap.Collectors = make(map[string]any) + } + inlineOpts := collector.RunOptions{ + Namespace: collectorNamespace, + GlobalTimeout: collectorTimeout, + SkipCleanup: noCollectorCleanup, + } + inlineData, inlineErr := runCollectors(ctx, clients, inlineConfigs, inlineOpts) + if inlineErr != nil { + return fmt.Errorf("inline collectors: %w", inlineErr) + } + for k, v := range inlineData { + snap.Collectors[k] = v + } + } + + // Evaluate each bundle independently and merge findings. + // When no bundles are present (go engine or --policy-file) a single + // evaluation runs with an empty PolicyBundle. + evalBundles := resolvedBundles + if len(evalBundles) == 0 { + evalBundles = []string{""} + } + var result checks.RunResult + for _, bp := range evalBundles { + r, evalErr := evaluator.Evaluate(ctx, eval.RunRequest{ + Registry: checks.DefaultChecks(), + Filter: filter, + PolicyFile: policyFile, + PolicyBundle: bp, + ClusterSnapshot: &snap, + }) + if evalErr != nil { + return evalErr + } + result.Findings = append(result.Findings, r.Findings...) } // The Go engine produces cluster findings via BuildPreflightFindingsFromSnapshot. @@ -183,10 +278,18 @@ func newScanCmd(outputFlag *string, kubeconfigPath *string, kubeContext *string) cmd.Flags().StringSliceVar(&excludeNamespaces, "exclude-namespace", nil, "Exclude matching namespaces from namespace-scoped coverage checks (supports glob patterns)") cmd.Flags().StringVar(&engineName, "engine", "go", "Evaluator engine: go|rego") cmd.Flags().StringVar(&policyFile, "policy-file", "", "Path to Rego policy file (used with --engine rego)") - cmd.Flags().StringVar(&policyBundle, "policy-bundle", "", "Path to Rego policy bundle directory (used with --engine rego)") + cmd.Flags().StringVar(&policyBundle, "policy-bundle", "", "Path or HTTPS URL to a Rego policy bundle directory or .tar.gz archive (used with --engine rego)") + cmd.Flags().StringVar(&bundleSubdir, "bundle-subdir", "", "Subdirectory within the bundle archive that contains metadata.json (for monorepo layouts)") cmd.Flags().BoolVar(&showRunbook, "show-runbook", false, "Append runbook hint for failing findings with remediation IDs") cmd.Flags().StringVar(&waiverFile, "waiver-file", "", "Path to waiver YAML file (checks matching a waiver are skipped from failure counting)") cmd.Flags().StringSliceVar(&resourceTypes, "resource", nil, "Additional Kubernetes resource types to fetch and expose to Rego as input.cluster.resources (format: VERSION/RESOURCE or GROUP/VERSION/RESOURCE, e.g. v1/configmaps,apps/v1/deployments)") + cmd.Flags().StringArrayVar(&collectorDataFiles, "collector-data", nil, "Path to a collector-data JSON file produced by 'kvirtbp collect' (repeatable; later files win on name collision). The _meta.bundlePath from the first file is used to auto-discover --policy-bundle.") + cmd.Flags().StringArrayVar(&collectorBundles, "collector-bundle", nil, "Path or HTTPS URL to a bundle whose metadata.json declares collectors to run inline during scan (repeatable)") + cmd.Flags().StringArrayVar(&collectorConfigFiles, "collector-config", nil, "Path to a JSON file containing []CollectorConfig to run inline during scan (repeatable)") + cmd.Flags().StringVar(&collectorNamespace, "collector-namespace", "kvirtbp-collectors", "Kubernetes namespace for inline collector Jobs") + cmd.Flags().DurationVar(&collectorTimeout, "collector-timeout", 5*time.Minute, "Timeout for inline collector Jobs") + cmd.Flags().BoolVar(&noCollectorCleanup, "no-collector-cleanup", false, "Keep inline collector Jobs after scan (useful for debugging)") + cmd.Flags().BoolVar(&noAutoBundle, "no-auto-bundle", false, "Disable automatic policy bundle discovery from _meta.bundlePath in collector-data files") return cmd } @@ -278,3 +381,34 @@ func clusterContextHash(kubeContext string, kubeconfigProvided bool) string { digest := sha256.Sum256([]byte(seed)) return hex.EncodeToString(digest[:])[:12] } + +// loadAndMergeCollectorData reads one or more collector-data files, merges +// them left-to-right at the collector-name level (later files win on +// collision), and returns the merged data map plus the CollectorMeta from the +// first file (which holds the authoritative _meta.bundlePath). +func loadAndMergeCollectorData(files []string) (map[string]any, collector.CollectorMeta, error) { + var merged map[string]any + var firstMeta collector.CollectorMeta + for i, f := range files { + b, err := os.ReadFile(f) + if err != nil { + return nil, firstMeta, fmt.Errorf("read collector data %q: %w", f, err) + } + var result collector.CollectorResult + if err := json.Unmarshal(b, &result); err != nil { + return nil, firstMeta, fmt.Errorf("decode collector data %q: %w", f, err) + } + if i == 0 { + firstMeta = result.Meta + merged = result.Data + } else { + if merged == nil { + merged = make(map[string]any) + } + for k, v := range result.Data { + merged[k] = v + } + } + } + return merged, firstMeta, nil +} diff --git a/internal/collector/collector.go b/internal/collector/collector.go new file mode 100644 index 0000000..3e4dabf --- /dev/null +++ b/internal/collector/collector.go @@ -0,0 +1,37 @@ +package collector + +import ( + "context" + "time" + + "github.com/phenixblue/kvirtbp/internal/kube" +) + +// RunOptions are passed to each Collector at execution time. +type RunOptions struct { + // Namespace is the Kubernetes namespace where Jobs are created. + Namespace string + + // GlobalTimeout is the maximum total time to wait for a collector to + // finish, regardless of its per-CollectorConfig TimeoutSeconds value. + // A zero value means no global cap. + GlobalTimeout time.Duration + + // SkipCleanup prevents the Collector from deleting the Job (and its Pods) + // after completion. Useful for debugging. + SkipCleanup bool +} + +// Collector runs a single CollectorConfig against the cluster and returns the +// collected data keyed by node name (ScopePerNode) or by CollectorDataScope +// (ScopeOnce). +type Collector interface { + // Name returns the collector name, matching CollectorConfig.Name. + Name() string + + // Collect executes the collector and returns its output. + // The returned map is map[nodeNameOrCluster]map[string]any. + // On partial failures (e.g. one node out of many fails) the node entry + // contains {"_error": ""} rather than returning a top-level error. + Collect(ctx context.Context, clients *kube.Clients, opts RunOptions) (map[string]any, error) +} diff --git a/internal/collector/collector_test.go b/internal/collector/collector_test.go new file mode 100644 index 0000000..ac3fa7e --- /dev/null +++ b/internal/collector/collector_test.go @@ -0,0 +1,132 @@ +package collector_test + +import ( + "testing" + + "github.com/phenixblue/kvirtbp/internal/collector" +) + +// ---- MergeCollectorConfigs ---- + +func TestMergeCollectorConfigs_EmptySlices(t *testing.T) { + got := collector.MergeCollectorConfigs(nil, nil) + if len(got) != 0 { + t.Fatalf("expected empty slice, got %v", got) + } +} + +func TestMergeCollectorConfigs_OnlyA(t *testing.T) { + a := []collector.CollectorConfig{ + {Name: "sysctl", Image: "alpine"}, + } + got := collector.MergeCollectorConfigs(a, nil) + if len(got) != 1 || got[0].Name != "sysctl" { + t.Fatalf("unexpected result: %v", got) + } +} + +func TestMergeCollectorConfigs_OnlyB(t *testing.T) { + b := []collector.CollectorConfig{ + {Name: "sysctl", Image: "alpine"}, + } + got := collector.MergeCollectorConfigs(nil, b) + if len(got) != 1 || got[0].Name != "sysctl" { + t.Fatalf("unexpected result: %v", got) + } +} + +func TestMergeCollectorConfigs_NoOverlap(t *testing.T) { + a := []collector.CollectorConfig{{Name: "sysctl", Image: "alpine"}} + b := []collector.CollectorConfig{{Name: "kernel", Image: "ubuntu"}} + got := collector.MergeCollectorConfigs(a, b) + if len(got) != 2 { + t.Fatalf("expected 2 elements, got %d: %v", len(got), got) + } + if got[0].Name != "sysctl" || got[1].Name != "kernel" { + t.Fatalf("unexpected order: %v", got) + } +} + +func TestMergeCollectorConfigs_BOverridesA(t *testing.T) { + a := []collector.CollectorConfig{{Name: "sysctl", Image: "alpine:3.18"}} + b := []collector.CollectorConfig{{Name: "sysctl", Image: "alpine:latest"}} + got := collector.MergeCollectorConfigs(a, b) + if len(got) != 1 { + t.Fatalf("expected 1 element after dedup, got %d", len(got)) + } + if got[0].Image != "alpine:latest" { + t.Fatalf("expected b to win on collision, got image %q", got[0].Image) + } +} + +func TestMergeCollectorConfigs_PartialOverlap(t *testing.T) { + a := []collector.CollectorConfig{ + {Name: "sysctl", Image: "alpine:old"}, + {Name: "disk", Image: "busybox"}, + } + b := []collector.CollectorConfig{ + {Name: "sysctl", Image: "alpine:new"}, + {Name: "net", Image: "nettools"}, + } + got := collector.MergeCollectorConfigs(a, b) + if len(got) != 3 { + t.Fatalf("expected 3 elements, got %d: %v", len(got), got) + } + // Order: sysctl (overridden by b), disk, net + if got[0].Name != "sysctl" || got[0].Image != "alpine:new" { + t.Errorf("got[0] wrong: %+v", got[0]) + } + if got[1].Name != "disk" { + t.Errorf("got[1] wrong: %+v", got[1]) + } + if got[2].Name != "net" { + t.Errorf("got[2] wrong: %+v", got[2]) + } +} + +// ---- CollectorConfig.ResolvedOutputPath ---- + +func TestResolvedOutputPath_Default(t *testing.T) { + cfg := collector.CollectorConfig{} + got := cfg.ResolvedOutputPath() + want := "/kvirtbp/output.json" + if got != want { + t.Fatalf("want %q, got %q", want, got) + } +} + +func TestResolvedOutputPath_Custom(t *testing.T) { + cfg := collector.CollectorConfig{OutputPath: "/tmp/out.json"} + got := cfg.ResolvedOutputPath() + if got != "/tmp/out.json" { + t.Fatalf("want /tmp/out.json, got %q", got) + } +} + +// ---- CollectorScope constants ---- + +func TestCollectorScopeConstants(t *testing.T) { + if collector.ScopeOnce != "once" { + t.Errorf("ScopeOnce = %q, want %q", collector.ScopeOnce, "once") + } + if collector.ScopePerNode != "per-node" { + t.Errorf("ScopePerNode = %q, want %q", collector.ScopePerNode, "per-node") + } +} + +// ---- NewJobCollector ---- + +func TestNewJobCollector_ReturnsCollector(t *testing.T) { + cfg := collector.CollectorConfig{ + Name: "test-collector", + Image: "alpine", + Scope: collector.ScopeOnce, + } + c := collector.NewJobCollector(cfg) + if c == nil { + t.Fatal("NewJobCollector returned nil") + } + if c.Name() != "test-collector" { + t.Fatalf("Name() = %q, want %q", c.Name(), "test-collector") + } +} diff --git a/internal/collector/job_collector.go b/internal/collector/job_collector.go new file mode 100644 index 0000000..adb883e --- /dev/null +++ b/internal/collector/job_collector.go @@ -0,0 +1,327 @@ +package collector + +import ( + "context" + "encoding/json" + "fmt" + "path" + "strings" + "sync" + "time" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/phenixblue/kvirtbp/internal/kube" +) + +const ( + jobPollInterval = 5 * time.Second + + // jobLabelKey is applied to every Job and its Pod template so they can be + // listed/cleaned up as a group. + jobLabelKey = "app.kubernetes.io/managed-by" + jobLabelVal = "kvirtbp-collector" +) + +// jobCollector is the default Collector implementation. It creates batch/v1 +// Jobs on the target cluster to gather node or cluster-scoped data. +type jobCollector struct { + cfg CollectorConfig +} + +// NewJobCollector returns a Collector that executes cfg via Kubernetes Jobs. +func NewJobCollector(cfg CollectorConfig) Collector { + return &jobCollector{cfg: cfg} +} + +func (c *jobCollector) Name() string { return c.cfg.Name } + +// Collect executes the collector. For ScopeOnce a single Job is deployed. +// For ScopePerNode one Job per node is deployed concurrently. Results are +// merged under the returned map keyed by node name or CollectorDataScope. +func (c *jobCollector) Collect(ctx context.Context, clients *kube.Clients, opts RunOptions) (map[string]any, error) { + timeout := opts.GlobalTimeout + if c.cfg.TimeoutSeconds > 0 { + perCollector := time.Duration(c.cfg.TimeoutSeconds) * time.Second + if timeout == 0 || perCollector < timeout { + timeout = perCollector + } + } + if timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + + switch c.cfg.Scope { + case ScopePerNode: + return c.collectPerNode(ctx, clients, opts) + default: // ScopeOnce and empty string both go here + return c.collectOnce(ctx, clients, opts) + } +} + +// collectOnce deploys a single Job and stores the result under "_cluster". +func (c *jobCollector) collectOnce(ctx context.Context, clients *kube.Clients, opts RunOptions) (map[string]any, error) { + jobName := safeJobName(c.cfg.Name, "") + + data, err := c.runJob(ctx, clients, opts, jobName, "") + if err != nil { + return map[string]any{CollectorDataScope: map[string]any{"_error": err.Error()}}, nil + } + return map[string]any{CollectorDataScope: data}, nil +} + +// collectPerNode lists all schedulable nodes and deploys one Job per node +// concurrently. Results are keyed by node name. +func (c *jobCollector) collectPerNode(ctx context.Context, clients *kube.Clients, opts RunOptions) (map[string]any, error) { + nodeList, err := clients.Core.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("list nodes: %w", err) + } + + var ( + mu sync.Mutex + wg sync.WaitGroup + result = make(map[string]any, len(nodeList.Items)) + ) + + for _, node := range nodeList.Items { + nodeName := node.Name + wg.Add(1) + go func() { + defer wg.Done() + jobName := safeJobName(c.cfg.Name, nodeName) + data, err := c.runJob(ctx, clients, opts, jobName, nodeName) + mu.Lock() + defer mu.Unlock() + if err != nil { + result[nodeName] = map[string]any{"_error": err.Error()} + } else { + result[nodeName] = data + } + }() + } + wg.Wait() + return result, nil +} + +// runJob creates a Job, waits for it to complete, reads its logs, optionally +// deletes it, and returns the parsed JSON output. +func (c *jobCollector) runJob(ctx context.Context, clients *kube.Clients, opts RunOptions, jobName, nodeName string) (map[string]any, error) { + job := c.buildJob(jobName, nodeName, opts.Namespace) + + if _, err := clients.Core.BatchV1().Jobs(opts.Namespace).Create(ctx, job, metav1.CreateOptions{}); err != nil { + return nil, fmt.Errorf("create job %q: %w", jobName, err) + } + + if !opts.SkipCleanup { + defer func() { + bg := context.Background() + prop := metav1.DeletePropagationForeground + _ = clients.Core.BatchV1().Jobs(opts.Namespace).Delete(bg, jobName, metav1.DeleteOptions{ + PropagationPolicy: &prop, + }) + }() + } + + if err := waitForJobComplete(ctx, clients, opts.Namespace, jobName); err != nil { + return nil, fmt.Errorf("wait for job %q: %w", jobName, err) + } + + output, err := readJobLogs(ctx, clients, opts.Namespace, jobName) + if err != nil { + return nil, fmt.Errorf("read logs for job %q: %w", jobName, err) + } + + var result map[string]any + if err := json.Unmarshal([]byte(output), &result); err != nil { + return nil, fmt.Errorf("parse JSON output from job %q: %w", jobName, err) + } + return result, nil +} + +// buildJob constructs the batch/v1 Job spec for the collector config. +func (c *jobCollector) buildJob(jobName, nodeName, namespace string) *batchv1.Job { + outputPath := c.cfg.ResolvedOutputPath() + + // Build the command list: mkdir for the output dir, user commands, then + // "cat ". The mkdir ensures the output directory exists even + // in minimal images (e.g. alpine) that don't ship /kvirtbp/. + // The cat output is the only thing that appears as pod logs; user commands + // may write freely to stdout/stderr without polluting the JSON payload. + outputDir := path.Dir(outputPath) + commands := make([]string, 0, len(c.cfg.Commands)+2) + commands = append(commands, "mkdir -p "+outputDir) + commands = append(commands, c.cfg.Commands...) + commands = append(commands, "cat "+outputPath) + + // Join with && so the cat only runs if all prior commands succeed. + shellCmd := strings.Join(commands, " && ") + + var ttlSeconds int32 = 300 // clean up completed jobs after 5 min even if SkipCleanup=false + var backoffLimit int32 = 0 // never retry; surface failures immediately + + env := make([]corev1.EnvVar, 0, len(c.cfg.Env)) + for k, v := range c.cfg.Env { + env = append(env, corev1.EnvVar{Name: k, Value: v}) + } + + privileged := c.cfg.Privileged + container := corev1.Container{ + Name: "collector", + Image: c.cfg.Image, + Command: []string{"/bin/sh", "-c", shellCmd}, + Env: env, + SecurityContext: &corev1.SecurityContext{ + Privileged: &privileged, + }, + } + + podSpec := corev1.PodSpec{ + Containers: []corev1.Container{container}, + RestartPolicy: corev1.RestartPolicyNever, + HostPID: c.cfg.HostPID, + HostNetwork: c.cfg.HostNetwork, + Tolerations: toK8sTolerations(c.cfg.Tolerations), + } + + if nodeName != "" { + podSpec.NodeName = nodeName + } + + labels := map[string]string{ + jobLabelKey: jobLabelVal, + "collector-name": c.cfg.Name, + } + + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: jobName, + Namespace: namespace, + Labels: labels, + }, + Spec: batchv1.JobSpec{ + BackoffLimit: &backoffLimit, + TTLSecondsAfterFinished: &ttlSeconds, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: labels}, + Spec: podSpec, + }, + }, + } + return job +} + +// waitForJobComplete polls the Job until it has a Complete or Failed condition. +func waitForJobComplete(ctx context.Context, clients *kube.Clients, namespace, jobName string) error { + return wait.PollUntilContextCancel(ctx, jobPollInterval, true, func(ctx context.Context) (bool, error) { + job, err := clients.Core.BatchV1().Jobs(namespace).Get(ctx, jobName, metav1.GetOptions{}) + if err != nil { + return false, fmt.Errorf("get job %q: %w", jobName, err) + } + for _, cond := range job.Status.Conditions { + if cond.Type == batchv1.JobComplete && cond.Status == corev1.ConditionTrue { + return true, nil + } + if cond.Type == batchv1.JobFailed && cond.Status == corev1.ConditionTrue { + return false, fmt.Errorf("job %q failed: %s", jobName, cond.Message) + } + } + return false, nil + }) +} + +// readJobLogs fetches the stdout logs from the first (and only) Pod of the Job. +// The last line of stdout is expected to be the JSON output produced by +// "cat ". +func readJobLogs(ctx context.Context, clients *kube.Clients, namespace, jobName string) (string, error) { + podList, err := clients.Core.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("job-name=%s", jobName), + }) + if err != nil { + return "", fmt.Errorf("list pods for job %q: %w", jobName, err) + } + if len(podList.Items) == 0 { + return "", fmt.Errorf("no pods found for job %q", jobName) + } + + podName := podList.Items[0].Name + req := clients.Core.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{}) + rc, err := req.Stream(ctx) + if err != nil { + return "", fmt.Errorf("stream logs for pod %q: %w", podName, err) + } + defer rc.Close() + + var buf strings.Builder + tmp := make([]byte, 4096) + for { + n, readErr := rc.Read(tmp) + if n > 0 { + buf.Write(tmp[:n]) + } + if readErr != nil { + break + } + } + + // The last non-empty line is the cat output (our JSON). + lines := strings.Split(strings.TrimRight(buf.String(), "\n"), "\n") + for i := len(lines) - 1; i >= 0; i-- { + line := strings.TrimSpace(lines[i]) + if line != "" { + return line, nil + } + } + return "", fmt.Errorf("pod %q produced no output", podName) +} + +// safeJobName builds a Kubernetes-safe Job name from the collector name and an +// optional node name. Names are truncated to 63 characters if necessary. +func safeJobName(collectorName, nodeName string) string { + base := "kvirtbp-" + sanitizeName(collectorName) + if nodeName != "" { + base = base + "-" + sanitizeName(nodeName) + } + if len(base) > 63 { + base = base[:63] + } + return strings.TrimRight(base, "-") +} + +// toK8sTolerations converts the config-level CollectorToleration slice to +// the corev1.Toleration type expected by the pod spec. +func toK8sTolerations(in []CollectorToleration) []corev1.Toleration { + if len(in) == 0 { + return nil + } + out := make([]corev1.Toleration, len(in)) + for i, t := range in { + out[i] = corev1.Toleration{ + Key: t.Key, + Operator: corev1.TolerationOperator(t.Operator), + Value: t.Value, + Effect: corev1.TaintEffect(t.Effect), + } + } + return out +} + +// sanitizeName replaces characters that are not valid in Kubernetes names with +// hyphens and lower-cases the result. +func sanitizeName(s string) string { + s = strings.ToLower(s) + var b strings.Builder + for _, r := range s { + if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '-' { + b.WriteRune(r) + } else { + b.WriteRune('-') + } + } + return strings.Trim(b.String(), "-") +} diff --git a/internal/collector/merge.go b/internal/collector/merge.go new file mode 100644 index 0000000..b8ff6ba --- /dev/null +++ b/internal/collector/merge.go @@ -0,0 +1,43 @@ +package collector + +// MergeAll merges an arbitrary number of CollectorConfig slices left-to-right. +// Later slices win on name collision. The result is equivalent to folding +// MergeCollectorConfigs over the sources in order. +func MergeAll(sources ...[]CollectorConfig) []CollectorConfig { + result := []CollectorConfig{} + for _, s := range sources { + result = MergeCollectorConfigs(result, s) + } + return result +} + +// MergeCollectorConfigs returns a deduplicated union of a and b. +// When both slices contain a config with the same Name, b's entry wins. +// Order is preserved: all entries from a appear first (unless replaced by b), +// then any entries from b whose names were not in a. +func MergeCollectorConfigs(a, b []CollectorConfig) []CollectorConfig { + bIndex := make(map[string]CollectorConfig, len(b)) + for _, cfg := range b { + bIndex[cfg.Name] = cfg + } + + result := make([]CollectorConfig, 0, len(a)+len(b)) + seen := make(map[string]struct{}, len(a)) + + for _, cfg := range a { + if override, ok := bIndex[cfg.Name]; ok { + result = append(result, override) + } else { + result = append(result, cfg) + } + seen[cfg.Name] = struct{}{} + } + + for _, cfg := range b { + if _, already := seen[cfg.Name]; !already { + result = append(result, cfg) + } + } + + return result +} diff --git a/internal/collector/result.go b/internal/collector/result.go new file mode 100644 index 0000000..1b1e29c --- /dev/null +++ b/internal/collector/result.go @@ -0,0 +1,78 @@ +package collector + +import ( + "encoding/json" + "fmt" + "time" +) + +// CollectorMeta is embedded in the collector-data output file under the +// reserved key "_meta". It carries provenance information that downstream +// commands (e.g. "kvirtbp scan") can use to avoid re-fetching the bundle. +type CollectorMeta struct { + // BundlePaths are the local filesystem paths where bundles were saved by + // "kvirtbp collect --save-bundle". One entry per --bundle flag. + // Empty when --save-bundle was not used. + BundlePaths []string `json:"bundlePaths,omitempty"` + + // CollectedAt is the RFC3339 UTC timestamp of the collection run. + CollectedAt string `json:"collectedAt"` +} + +// CollectorResult is the top-level structure written to the collector-data +// output file. It serialises as a flat JSON object: "_meta" holds provenance +// and every other key is a collector name from Data. +// +// The flat layout preserves backwards compatibility — existing files that +// lack "_meta" are still valid CollectorResult values (Meta will be zero). +type CollectorResult struct { + Meta CollectorMeta + Data map[string]any +} + +// NewCollectorResult creates a CollectorResult stamped with the current time. +func NewCollectorResult(data map[string]any, meta CollectorMeta) CollectorResult { + if meta.CollectedAt == "" { + meta.CollectedAt = time.Now().UTC().Format(time.RFC3339) + } + return CollectorResult{Meta: meta, Data: data} +} + +// MarshalJSON emits "_meta" (if non-zero) followed by all Data keys at the +// top level of the JSON object. +func (r CollectorResult) MarshalJSON() ([]byte, error) { + flat := make(map[string]any, len(r.Data)+1) + for k, v := range r.Data { + flat[k] = v + } + // Only include _meta when there is something meaningful to surface. + if len(r.Meta.BundlePaths) > 0 || r.Meta.CollectedAt != "" { + flat["_meta"] = r.Meta + } + return json.Marshal(flat) +} + +// UnmarshalJSON parses the flat collector-data format back into a +// CollectorResult, extracting "_meta" into Meta and everything else into Data. +func (r *CollectorResult) UnmarshalJSON(b []byte) error { + var flat map[string]json.RawMessage + if err := json.Unmarshal(b, &flat); err != nil { + return err + } + + r.Data = make(map[string]any, len(flat)) + for k, raw := range flat { + if k == "_meta" { + if err := json.Unmarshal(raw, &r.Meta); err != nil { + return fmt.Errorf("decode _meta: %w", err) + } + continue + } + var v any + if err := json.Unmarshal(raw, &v); err != nil { + return fmt.Errorf("decode collector %q: %w", k, err) + } + r.Data[k] = v + } + return nil +} diff --git a/internal/collector/types.go b/internal/collector/types.go new file mode 100644 index 0000000..33d2685 --- /dev/null +++ b/internal/collector/types.go @@ -0,0 +1,99 @@ +package collector + +// CollectorScope controls how many Jobs are deployed for a collector. +type CollectorScope string + +const ( + // ScopeOnce deploys a single cluster-wide Job. Output is stored under the + // sentinel key "_cluster" in the collector data map. + ScopeOnce CollectorScope = "once" + + // ScopePerNode deploys one Job per node (using nodeName node selector). + // Output is stored keyed by node name in the collector data map. + ScopePerNode CollectorScope = "per-node" +) + +// defaultOutputPath is the path inside the collector pod container where +// commands write their output. The CLI appends "cat " as the +// final container command so pod logs contain only the clean JSON payload. +const defaultOutputPath = "/kvirtbp/output.json" + +// CollectorConfig declares a custom pod collector, either in a bundle's +// metadata.json or in a standalone --collector-config file. +type CollectorConfig struct { + // Name is the unique key used in the collector data file and in + // input.cluster.collectors[""] in Rego. + Name string `json:"name"` + + // Image is the container image to run. + Image string `json:"image"` + + // Commands are shell commands executed in order inside the container. + // They may write freely to stdout/stderr; their canonical output must be + // written to OutputPath as valid JSON. The CLI appends + // "cat " as the last command so pod logs equal the JSON file. + Commands []string `json:"commands"` + + // Scope controls deployment shape: "once" (single Job) or "per-node" + // (one Job per node). Defaults to "once". + Scope CollectorScope `json:"scope"` + + // OutputPath is the in-pod file path where commands write JSON output. + // Defaults to /kvirtbp/output.json. + OutputPath string `json:"outputPath,omitempty"` + + // TimeoutSeconds is the per-collector deadline in seconds. 0 means use + // the global cap set by --collector-timeout. + TimeoutSeconds int `json:"timeoutSeconds,omitempty"` + + // Privileged runs the container with SecurityContext.Privileged = true. + // Must be explicitly opted into; never defaulted. + Privileged bool `json:"privileged,omitempty"` + + // HostPID mounts the host PID namespace into the container. + HostPID bool `json:"hostPID,omitempty"` + + // HostNetwork attaches the container to the host network namespace. + HostNetwork bool `json:"hostNetwork,omitempty"` + + // Env is an optional set of environment variables injected into the + // container. Values should not contain secrets; use Kubernetes Secrets + // or a mounted volume for sensitive data. + Env map[string]string `json:"env,omitempty"` + + // Tolerations are applied to the Job pod template so the collector can + // be scheduled on nodes with matching taints (e.g. control-plane nodes). + // Use {"operator": "Exists"} to tolerate all taints on a node. + Tolerations []CollectorToleration `json:"tolerations,omitempty"` +} + +// CollectorToleration is a simplified representation of a Kubernetes +// pod toleration, mirroring corev1.Toleration without importing k8s types +// into the config schema. +type CollectorToleration struct { + // Key is the taint key the toleration applies to. Empty string matches + // all taint keys (only valid when Operator is "Exists"). + Key string `json:"key,omitempty"` + + // Operator is "Exists" or "Equal" (default: "Equal"). + Operator string `json:"operator,omitempty"` + + // Value is the taint value to match (only used when Operator is "Equal"). + Value string `json:"value,omitempty"` + + // Effect is the taint effect to match: "NoSchedule", "NoExecute", + // "PreferNoSchedule", or empty string to match all effects. + Effect string `json:"effect,omitempty"` +} + +// ResolvedOutputPath returns OutputPath if set, otherwise the package default. +func (c CollectorConfig) ResolvedOutputPath() string { + if c.OutputPath != "" { + return c.OutputPath + } + return defaultOutputPath +} + +// CollectorDataScope is the sentinel key used for ScopeOnce results in the +// collector data map (input.cluster.collectors["name"]["_cluster"]). +const CollectorDataScope = "_cluster" diff --git a/internal/eval/rego/engine.go b/internal/eval/rego/engine.go index 23a295d..ffe83af 100644 --- a/internal/eval/rego/engine.go +++ b/internal/eval/rego/engine.go @@ -11,6 +11,7 @@ import ( "github.com/open-policy-agent/opa/rego" "github.com/phenixblue/kvirtbp/internal/checks" + "github.com/phenixblue/kvirtbp/internal/collector" "github.com/phenixblue/kvirtbp/internal/eval" "github.com/phenixblue/kvirtbp/internal/kube" "github.com/phenixblue/kvirtbp/internal/version" @@ -20,10 +21,11 @@ import ( type Engine struct{} type bundleMetadata struct { - SchemaVersion string `json:"schemaVersion"` - PolicyVersion string `json:"policyVersion"` - MinBinaryVersion string `json:"minBinaryVersion"` - Resources []string `json:"resources"` + SchemaVersion string `json:"schemaVersion"` + PolicyVersion string `json:"policyVersion"` + MinBinaryVersion string `json:"minBinaryVersion"` + Resources []string `json:"resources"` + Collectors []collector.CollectorConfig `json:"collectors,omitempty"` } const policySchemaVersion = "v1alpha1" @@ -189,6 +191,17 @@ func ResourceTypesFromBundle(bundlePath string) ([]string, error) { return md.Resources, nil } +// CollectorsFromBundle reads the metadata.json of a policy bundle and returns +// the collector configurations declared by the bundle. Returns nil (no error) +// when the bundle has no metadata or no collectors declared. +func CollectorsFromBundle(bundlePath string) ([]collector.CollectorConfig, error) { + md, err := readBundleMetadata(bundlePath) + if err != nil { + return nil, err + } + return md.Collectors, nil +} + func makeInput(registry []checks.Check, snapshot *kube.ClusterSnapshot) map[string]any { out := make([]map[string]string, 0, len(registry)) for _, c := range registry { diff --git a/internal/eval/rego/engine_test.go b/internal/eval/rego/engine_test.go index e73f6f8..c2557f3 100644 --- a/internal/eval/rego/engine_test.go +++ b/internal/eval/rego/engine_test.go @@ -7,7 +7,9 @@ import ( "testing" "github.com/phenixblue/kvirtbp/internal/checks" + "github.com/phenixblue/kvirtbp/internal/collector" "github.com/phenixblue/kvirtbp/internal/eval" + "github.com/phenixblue/kvirtbp/internal/kube" ) type stubCheck struct { @@ -211,3 +213,165 @@ func TestResourceTypesFromBundle_NonExistentBundle(t *testing.T) { t.Errorf("expected empty resources, got %v", got) } } + +// ---- CollectorsFromBundle ---- + +func TestCollectorsFromBundle_WithCollectors(t *testing.T) { + tmp := t.TempDir() + meta := `{ + "schemaVersion": "v1alpha1", + "collectors": [ + {"name": "sysctl", "image": "alpine", "scope": "per-node", "commands": ["sysctl -a > /kvirtbp/output.json"]} + ] + }` + if err := os.WriteFile(filepath.Join(tmp, "metadata.json"), []byte(meta), 0o644); err != nil { + t.Fatalf("write metadata: %v", err) + } + got, err := CollectorsFromBundle(tmp) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(got) != 1 { + t.Fatalf("expected 1 collector, got %d", len(got)) + } + if got[0].Name != "sysctl" || got[0].Image != "alpine" || got[0].Scope != collector.ScopePerNode { + t.Errorf("unexpected collector: %+v", got[0]) + } +} + +func TestCollectorsFromBundle_NoCollectorsField(t *testing.T) { + tmp := t.TempDir() + if err := os.WriteFile(filepath.Join(tmp, "metadata.json"), []byte(`{"schemaVersion":"v1alpha1"}`), 0o644); err != nil { + t.Fatalf("write metadata: %v", err) + } + got, err := CollectorsFromBundle(tmp) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(got) != 0 { + t.Errorf("expected nil/empty collectors, got %v", got) + } +} + +func TestCollectorsFromBundle_MissingMetadata(t *testing.T) { + tmp := t.TempDir() + got, err := CollectorsFromBundle(tmp) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(got) != 0 { + t.Errorf("expected empty collectors for missing metadata, got %v", got) + } +} + +// ---- Rego evaluation with collector data ---- + +func TestEvaluateWithCollectorData(t *testing.T) { + tmp := t.TempDir() + + // Policy that inspects input.cluster.collectors and emits a passing finding + // when a specific sysctl value is present. + policy := `package kvirtbp + +import rego.v1 + +ip_forward := object.get( + object.get( + object.get( + object.get(input.cluster, "collectors", {}), + "sysctl", {}), + "_cluster", {}), +"net.ipv4.ip_forward", "0") + +findings := [{ + "checkId": "collector-check", + "title": "IP Forwarding Check", + "category": "security", + "severity": "info", + "pass": ip_forward == "1", + "message": "net.ipv4.ip_forward should be 1" +}]` + if err := os.WriteFile(filepath.Join(tmp, "policy.rego"), []byte(policy), 0o644); err != nil { + t.Fatalf("write policy: %v", err) + } + if err := os.WriteFile(filepath.Join(tmp, "metadata.json"), []byte(`{"schemaVersion":"v1alpha1"}`), 0o644); err != nil { + t.Fatalf("write metadata: %v", err) + } + + snap := &kube.ClusterSnapshot{ + Collectors: map[string]any{ + "sysctl": map[string]any{ + "_cluster": map[string]any{ + "net.ipv4.ip_forward": "1", + }, + }, + }, + } + + engine := New() + result, err := engine.Evaluate(context.Background(), eval.RunRequest{ + PolicyBundle: tmp, + ClusterSnapshot: snap, + }) + if err != nil { + t.Fatalf("Evaluate() returned error: %v", err) + } + if len(result.Findings) != 1 { + t.Fatalf("expected 1 finding, got %d", len(result.Findings)) + } + if !result.Findings[0].Pass { + t.Errorf("expected finding to pass when collector value is '1'") + } +} + +func TestEvaluateWithCollectorData_MissingData(t *testing.T) { + tmp := t.TempDir() + + // Policy that gracefully handles absent collector data by defaulting to "0". + // When no collector data is injected the pass condition must be false. + policy := `package kvirtbp + +import rego.v1 + +# Start from input.cluster (always present) to safely handle absent collectors. +ip_forward := object.get( + object.get( + object.get( + object.get(input.cluster, "collectors", {}), + "sysctl", {}), + "_cluster", {}), +"net.ipv4.ip_forward", "0") + +findings := [{ + "checkId": "collector-check", + "title": "IP Forwarding Check", + "category": "security", + "severity": "info", + "pass": ip_forward == "1", + "message": "net.ipv4.ip_forward should be 1" +}]` + if err := os.WriteFile(filepath.Join(tmp, "policy.rego"), []byte(policy), 0o644); err != nil { + t.Fatalf("write policy: %v", err) + } + if err := os.WriteFile(filepath.Join(tmp, "metadata.json"), []byte(`{"schemaVersion":"v1alpha1"}`), 0o644); err != nil { + t.Fatalf("write metadata: %v", err) + } + + // No collector data injected — ClusterSnapshot.Collectors is nil. + snap := &kube.ClusterSnapshot{} + + engine := New() + result, err := engine.Evaluate(context.Background(), eval.RunRequest{ + PolicyBundle: tmp, + ClusterSnapshot: snap, + }) + if err != nil { + t.Fatalf("Evaluate() returned error: %v", err) + } + if len(result.Findings) != 1 { + t.Fatalf("expected 1 finding, got %d", len(result.Findings)) + } + if result.Findings[0].Pass { + t.Errorf("expected finding to fail when collector data is absent") + } +} diff --git a/internal/kube/snapshot.go b/internal/kube/snapshot.go index 20d56f7..3120aa4 100644 --- a/internal/kube/snapshot.go +++ b/internal/kube/snapshot.go @@ -53,6 +53,13 @@ type ClusterSnapshot struct { // Values are lists of ResourceSnapshot objects. Populated only when // ResourceTypes are declared in bundle metadata or via --resource flag. Resources map[string][]ResourceSnapshot `json:"resources,omitempty"` + + // Collectors holds data injected from external collectors, keyed by + // collector name. For per-node collectors the value is + // map[nodeName]map[string]any; for once-collectors it is + // map["_cluster"]map[string]any. Populated via --collector-data or the + // collect subcommand. + Collectors map[string]any `json:"collectors,omitempty"` } // ResourceSnapshot is a generic, serializable representation of a single From 4e16892169b9e6c015646940927a1f6e04368e9c Mon Sep 17 00:00:00 2001 From: Joe Searcy Date: Sun, 5 Apr 2026 02:54:17 -0400 Subject: [PATCH 2/2] fix formatting/linting --- internal/bundle/remote.go | 3 ++- internal/cli/collect.go | 1 - internal/collector/job_collector.go | 4 ++-- internal/eval/rego/engine.go | 8 ++++---- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/bundle/remote.go b/internal/bundle/remote.go index 8b2f0e2..3aba576 100644 --- a/internal/bundle/remote.go +++ b/internal/bundle/remote.go @@ -17,6 +17,7 @@ import ( "sort" "strings" ) + // Resolve returns the local directory path for the bundle at rawPath. // // If rawPath is a local filesystem path it is returned as-is and cleanup is a @@ -146,7 +147,7 @@ func fetchAndUnpack(ctx context.Context, rawURL, destDir string) error { return fmt.Errorf("write file %q: %w", target, err) } f.Close() - // Intentionally skip symlinks, hard-links, and special files. + // Intentionally skip symlinks, hard-links, and special files. } } diff --git a/internal/cli/collect.go b/internal/cli/collect.go index 9302956..5a4b946 100644 --- a/internal/cli/collect.go +++ b/internal/cli/collect.go @@ -20,7 +20,6 @@ import ( "github.com/spf13/cobra" ) - func newCollectCmd(kubeconfigPath *string, kubeContext *string) *cobra.Command { var policyBundles []string var collectorConfigFiles []string diff --git a/internal/collector/job_collector.go b/internal/collector/job_collector.go index adb883e..2805d21 100644 --- a/internal/collector/job_collector.go +++ b/internal/collector/job_collector.go @@ -163,7 +163,7 @@ func (c *jobCollector) buildJob(jobName, nodeName, namespace string) *batchv1.Jo shellCmd := strings.Join(commands, " && ") var ttlSeconds int32 = 300 // clean up completed jobs after 5 min even if SkipCleanup=false - var backoffLimit int32 = 0 // never retry; surface failures immediately + var backoffLimit int32 = 0 // never retry; surface failures immediately env := make([]corev1.EnvVar, 0, len(c.cfg.Env)) for k, v := range c.cfg.Env { @@ -194,7 +194,7 @@ func (c *jobCollector) buildJob(jobName, nodeName, namespace string) *batchv1.Jo } labels := map[string]string{ - jobLabelKey: jobLabelVal, + jobLabelKey: jobLabelVal, "collector-name": c.cfg.Name, } diff --git a/internal/eval/rego/engine.go b/internal/eval/rego/engine.go index ffe83af..51fef4e 100644 --- a/internal/eval/rego/engine.go +++ b/internal/eval/rego/engine.go @@ -21,10 +21,10 @@ import ( type Engine struct{} type bundleMetadata struct { - SchemaVersion string `json:"schemaVersion"` - PolicyVersion string `json:"policyVersion"` - MinBinaryVersion string `json:"minBinaryVersion"` - Resources []string `json:"resources"` + SchemaVersion string `json:"schemaVersion"` + PolicyVersion string `json:"policyVersion"` + MinBinaryVersion string `json:"minBinaryVersion"` + Resources []string `json:"resources"` Collectors []collector.CollectorConfig `json:"collectors,omitempty"` }