From 16a52f41ead1b3b5f3afe5de3a909b8cbb31bb3f Mon Sep 17 00:00:00 2001 From: entlein Date: Fri, 15 May 2026 23:05:43 +0200 Subject: [PATCH 1/2] profile-compaction: CollapseConfig CRD + projection overlay + user-managed lifecycle Signed-off-by: entlein --- .../containerprofilecache.go | 17 + .../containerprofilecache/projection.go | 69 ++++ .../containerprofilecache/projection_apply.go | 99 +++++- .../containerprofilecache/tamper_alert.go | 190 ++++++++++ .../tamper_alert_test.go | 281 +++++++++++++++ .../test32_projection_test.go | 326 ++++++++++++++++++ pkg/objectcache/projection_types.go | 7 + pkg/objectcache/shared_container_data.go | 16 + pkg/objectcache/v1/mock.go | 144 ++++++-- pkg/rulebindingmanager/cache/cache.go | 124 ++++--- pkg/rulebindingmanager/cache/cache_test.go | 122 ++++++- .../cel/libraries/cache/function_cache.go | 2 +- 12 files changed, 1310 insertions(+), 87 deletions(-) create mode 100644 pkg/objectcache/containerprofilecache/tamper_alert.go create mode 100644 pkg/objectcache/containerprofilecache/tamper_alert_test.go create mode 100644 pkg/objectcache/containerprofilecache/test32_projection_test.go diff --git a/pkg/objectcache/containerprofilecache/containerprofilecache.go b/pkg/objectcache/containerprofilecache/containerprofilecache.go index e85f693c3..c539fad0e 100644 --- a/pkg/objectcache/containerprofilecache/containerprofilecache.go +++ b/pkg/objectcache/containerprofilecache/containerprofilecache.go @@ -15,6 +15,7 @@ import ( "github.com/kubescape/go-logger/helpers" helpersv1 "github.com/kubescape/k8s-interface/instanceidhandler/v1/helpers" "github.com/kubescape/node-agent/pkg/config" + "github.com/kubescape/node-agent/pkg/exporters" "github.com/kubescape/node-agent/pkg/metricsmanager" "github.com/kubescape/node-agent/pkg/objectcache" "github.com/kubescape/node-agent/pkg/objectcache/callstackcache" @@ -122,6 +123,12 @@ type ContainerProfileCacheImpl struct { specGeneration atomic.Int64 // bumped on each distinct spec hash change nudge chan struct{} // buffered cap 1; signals reconciler on spec change refreshPending atomic.Bool // set when a nudge arrives while refresh is running + + // Tamper detection state (fork-only). See tamper_alert.go for the full + // description; reintroduced here on top of upstream's reshape so the + // legacy R1016 "Signed profile tampered" wiring keeps working. + tamperAlertExporter exporters.Exporter + tamperEmitted sync.Map // tamperKey -> struct{} } // NewContainerProfileCache creates a new ContainerProfileCacheImpl. @@ -398,6 +405,13 @@ func (c *ContainerProfileCacheImpl) tryPopulateEntry( helpers.Error(userAPErr)) userAP = nil } + // Tamper detection: re-verify the signature on every load. Emits R1016 + // when a signed overlay's signature no longer matches (i.e. content + // has been mutated post-sign). No-op when the overlay is unsigned or + // the tamper-alert exporter has not been wired. + if userAP != nil { + c.verifyUserApplicationProfile(userAP, sharedData.Wlid) + } var userNNErr error _ = c.refreshRPC(ctx, func(rctx context.Context) error { userNN, userNNErr = c.storageClient.GetNetworkNeighborhood(rctx, ns, overlayName) @@ -411,6 +425,9 @@ func (c *ContainerProfileCacheImpl) tryPopulateEntry( helpers.Error(userNNErr)) userNN = nil } + if userNN != nil { + c.verifyUserNetworkNeighborhood(userNN, sharedData.Wlid) + } } // Need SOMETHING to cache. If we have nothing, stay pending and retry. diff --git a/pkg/objectcache/containerprofilecache/projection.go b/pkg/objectcache/containerprofilecache/projection.go index 1ff1bd103..66f9f3471 100644 --- a/pkg/objectcache/containerprofilecache/projection.go +++ b/pkg/objectcache/containerprofilecache/projection.go @@ -1,6 +1,9 @@ package containerprofilecache import ( + "strings" + + helpersv1 "github.com/kubescape/k8s-interface/instanceidhandler/v1/helpers" "github.com/kubescape/node-agent/pkg/utils" "github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1" corev1 "k8s.io/api/core/v1" @@ -62,9 +65,75 @@ func projectUserProfiles( } } + // Fold the user-overlay identity into the merged profile's SyncChecksum + // annotation. Apply (projection_apply.go) reads this into + // ProjectedContainerProfile.SyncChecksum which the rulemanager's + // function_cache uses as part of its invalidation key (see + // pkg/rulemanager/cel/libraries/cache/function_cache.go: + // HashForContainerProfile). + // + // Without this, an empty-baseline + user-overlay container has a + // constant SyncChecksum="" across both "no overlay yet" and "overlay + // merged" states. Stale ap.was_executed=false results computed during + // the no-overlay window would then persist in the cache and the rule + // evaluator would never see the merged user-overlay paths — which is + // the root cause behind Test_32_UnexpectedProcessArguments's R0001 + // precondition failure and the latent R0001-on-nslookup noise in + // Test_28_UserDefinedNetworkNeighborhood. + if userAP != nil || userNN != nil { + stampOverlayIdentity(projected, userAP, userNN) + } + return projected, warnings } +// stampOverlayIdentity appends user-overlay identity (kind/ns/name@RV) +// to the projected ContainerProfile's SyncChecksumMetadataKey annotation. +// Modifies projected.Annotations in place. +// +// The original baseline checksum (if present) is preserved as the prefix +// so distinct baselines still produce distinct keys. Format: +// +// |ap=/@|nn=/@ +// +// Either ap= or nn= segments are omitted when the corresponding overlay +// is nil. RV is the only piece that needs to change for the cache to +// invalidate, but namespace+name are kept so cross-overlay collisions +// (e.g. two different overlays happening to share RV across namespaces) +// don't alias. +// +// IDEMPOTENT: calling stampOverlayIdentity twice with the same overlay +// produces the same final annotation. The annotation is split on `|` +// and only the FIRST segment is kept as "baseline" — any existing +// ap= / nn= suffixes from prior stamps are discarded before being +// re-appended. (CodeRabbit PR #43 critical on projection.go:115: +// projectUserProfiles is called twice in succession in both +// reconciler.go and containerprofilecache.go, feeding the output of +// the first projection back as input to the second. Without this +// strip step, overlay suffixes accumulate on every reconcile tick, +// churning the function_cache.) +func stampOverlayIdentity(projected *v1beta1.ContainerProfile, userAP *v1beta1.ApplicationProfile, userNN *v1beta1.NetworkNeighborhood) { + if projected.Annotations == nil { + projected.Annotations = map[string]string{} + } + // Strip any prior ap= / nn= suffixes by taking only the first + // `|`-segment as the canonical baseline checksum. This is what + // makes repeat-stamping idempotent. + existing := projected.Annotations[helpersv1.SyncChecksumMetadataKey] + baseline := existing + if idx := strings.IndexByte(existing, '|'); idx >= 0 { + baseline = existing[:idx] + } + parts := []string{baseline} + if userAP != nil { + parts = append(parts, "ap="+userAP.Namespace+"/"+userAP.Name+"@"+userAP.ResourceVersion) + } + if userNN != nil { + parts = append(parts, "nn="+userNN.Namespace+"/"+userNN.Name+"@"+userNN.ResourceVersion) + } + projected.Annotations[helpersv1.SyncChecksumMetadataKey] = strings.Join(parts, "|") +} + // mergeApplicationProfile finds the container entry in userAP matching // containerName (across Spec.Containers / InitContainers / EphemeralContainers) // and merges its fields into projected.Spec. Returns the list of pod-spec diff --git a/pkg/objectcache/containerprofilecache/projection_apply.go b/pkg/objectcache/containerprofilecache/projection_apply.go index 135464188..22f9dbf14 100644 --- a/pkg/objectcache/containerprofilecache/projection_apply.go +++ b/pkg/objectcache/containerprofilecache/projection_apply.go @@ -44,30 +44,37 @@ func Apply(spec *objectcache.RuleProjectionSpec, cp *v1beta1.ContainerProfile, c } // Project each data surface. + // The third arg classifies an entry as "dynamic" — routes it to Patterns + // rather than Values. Path surfaces use the ⋯ DynamicIdentifier marker; + // network surfaces accept CIDRs, '*' sentinels, and DNS wildcard tokens + // per the v0.0.2 spec (matched at runtime by storage's networkmatch). opensPaths := extractOpensPaths(cp) - pcp.Opens = projectField(s.Opens, opensPaths, true) + pcp.Opens = projectField(s.Opens, opensPaths, containsDynamicSegment) execsPaths := extractExecsPaths(cp) - pcp.Execs = projectField(s.Execs, execsPaths, true) + pcp.Execs = projectField(s.Execs, execsPaths, containsDynamicSegment) + pcp.ExecsByPath = extractExecsByPath(cp) endpointPaths := extractEndpointPaths(cp) - pcp.Endpoints = projectField(s.Endpoints, endpointPaths, true) + pcp.Endpoints = projectField(s.Endpoints, endpointPaths, containsDynamicSegment) - pcp.Capabilities = projectField(s.Capabilities, cp.Spec.Capabilities, false) - pcp.Syscalls = projectField(s.Syscalls, cp.Spec.Syscalls, false) + pcp.Capabilities = projectField(s.Capabilities, cp.Spec.Capabilities, nil) + pcp.Syscalls = projectField(s.Syscalls, cp.Spec.Syscalls, nil) - pcp.EgressDomains = projectField(s.EgressDomains, extractEgressDomains(cp), false) - pcp.EgressAddresses = projectField(s.EgressAddresses, extractEgressAddresses(cp), false) + pcp.EgressDomains = projectField(s.EgressDomains, extractEgressDomains(cp), isNetworkDNSWildcard) + pcp.EgressAddresses = projectField(s.EgressAddresses, extractEgressAddresses(cp), isNetworkIPWildcard) - pcp.IngressDomains = projectField(s.IngressDomains, extractIngressDomains(cp), false) - pcp.IngressAddresses = projectField(s.IngressAddresses, extractIngressAddresses(cp), false) + pcp.IngressDomains = projectField(s.IngressDomains, extractIngressDomains(cp), isNetworkDNSWildcard) + pcp.IngressAddresses = projectField(s.IngressAddresses, extractIngressAddresses(cp), isNetworkIPWildcard) return pcp } // projectField is the per-surface transform. rawEntries are strings from the -// raw profile. isPathSurface enables retention of dynamic-segment entries. -func projectField(spec objectcache.FieldSpec, rawEntries []string, isPathSurface bool) objectcache.ProjectedField { +// raw profile. isDynamic, if non-nil, is called per entry: returning true +// routes the entry to Patterns rather than Values (cache-miss path runs the +// matcher rather than a map lookup). +func projectField(spec objectcache.FieldSpec, rawEntries []string, isDynamic func(string) bool) objectcache.ProjectedField { if !spec.InUse { // No rule declared a requirement for this field — pass all raw entries // through so existing rules that omit profileDataRequired keep working. @@ -92,9 +99,9 @@ func projectField(spec objectcache.FieldSpec, rawEntries []string, isPathSurface seen := make(map[string]bool) // for Patterns dedup for _, e := range rawEntries { - isDynamic := isPathSurface && containsDynamicSegment(e) + dynamic := isDynamic != nil && isDynamic(e) - if isDynamic { + if dynamic { // Dynamic entries always go to Patterns on path surfaces (both // pass-through and explicit InUse modes). if !seen[e] { @@ -148,6 +155,42 @@ func containsDynamicSegment(e string) bool { return strings.Contains(e, dynamicpathdetector.DynamicIdentifier) } +// isNetworkIPWildcard reports whether an IP-surface entry is a v0.0.2 +// pattern (CIDR membership, '*' any-IP sentinel, or DynamicIdentifier). +// Literal IPv4/IPv6 addresses are NOT patterns; they go to Values for +// the cheap map lookup path. Spec §5.7. +func isNetworkIPWildcard(e string) bool { + if e == "" { + return false + } + if e == "*" { + return true + } + if strings.Contains(e, "/") { + return true + } + if strings.Contains(e, dynamicpathdetector.DynamicIdentifier) { + return true + } + return false +} + +// isNetworkDNSWildcard reports whether a DNS-surface entry uses any of +// the v0.0.2 wildcard tokens — leading '*' (RFC 4592), mid '⋯', trailing +// '*'. Literal FQDNs go to Values. Spec §5.8. +func isNetworkDNSWildcard(e string) bool { + if e == "" { + return false + } + if strings.Contains(e, "*") { + return true + } + if strings.Contains(e, dynamicpathdetector.DynamicIdentifier) { + return true + } + return false +} + // --- Field extractors --- func extractOpensPaths(cp *v1beta1.ContainerProfile) []string { @@ -166,6 +209,32 @@ func extractExecsPaths(cp *v1beta1.ContainerProfile) []string { return paths } +// extractExecsByPath builds the path → args map used by the exec-args +// wildcard matcher (CompareExecArgs). Multiple ExecCalls entries with the +// same Path collapse to the last seen; this matches the prior fork-only +// behavior. nil-Args entries are stored as empty slices, which +// CompareExecArgs treats as "no argv constraint". +// +// Args slices are CLONED rather than aliased — Apply is contract-bound to +// be a pure transform, and an alias would let consumers mutate the source +// profile by editing the projected map. (CR #43 finding on this file.) +func extractExecsByPath(cp *v1beta1.ContainerProfile) map[string][]string { + if len(cp.Spec.Execs) == 0 { + return nil + } + m := make(map[string][]string, len(cp.Spec.Execs)) + for _, e := range cp.Spec.Execs { + if e.Args == nil { + m[e.Path] = []string{} + continue + } + cloned := make([]string, len(e.Args)) + copy(cloned, e.Args) + m[e.Path] = cloned + } + return m +} + func extractEndpointPaths(cp *v1beta1.ContainerProfile) []string { endpoints := make([]string, len(cp.Spec.Endpoints)) for i, e := range cp.Spec.Endpoints { @@ -191,6 +260,9 @@ func extractEgressAddresses(cp *v1beta1.ContainerProfile) []string { if n.IPAddress != "" { addrs = append(addrs, n.IPAddress) } + // v0.0.2 IPAddresses[] — list form supporting CIDRs and '*' sentinel. + // Same semantics as the deprecated singular IPAddress, just plural. + addrs = append(addrs, n.IPAddresses...) } return addrs } @@ -212,6 +284,7 @@ func extractIngressAddresses(cp *v1beta1.ContainerProfile) []string { if n.IPAddress != "" { addrs = append(addrs, n.IPAddress) } + addrs = append(addrs, n.IPAddresses...) } return addrs } diff --git a/pkg/objectcache/containerprofilecache/tamper_alert.go b/pkg/objectcache/containerprofilecache/tamper_alert.go new file mode 100644 index 000000000..273b15123 --- /dev/null +++ b/pkg/objectcache/containerprofilecache/tamper_alert.go @@ -0,0 +1,190 @@ +// Tamper detection for user-supplied profile overlays loaded into the +// ContainerProfileCache. +// +// When a user references a signed ApplicationProfile or NetworkNeighborhood +// via the `kubescape.io/user-defined-profile` pod label, this code path +// re-verifies the signature on every cache load and emits an R1016 +// "Signed profile tampered" alert via the rule-alert exporter when the +// signature is present but no longer valid. +// +// This is the new home of the legacy applicationprofilecache's tamper +// detection (originally introduced in fork commit c2d681e0 — "Feat/ +// tamperalert"). Upstream PR #788 deleted the legacy cache; this re-wires +// the same behavior onto containerprofilecache without changing the alert +// shape so existing component tests (Test_31_TamperDetectionAlert) keep +// working. +package containerprofilecache + +import ( + "errors" + "fmt" + + "github.com/armosec/armoapi-go/armotypes" + "github.com/kubescape/go-logger" + "github.com/kubescape/go-logger/helpers" + "github.com/kubescape/node-agent/pkg/exporters" + "github.com/kubescape/node-agent/pkg/rulemanager/types" + "github.com/kubescape/node-agent/pkg/signature" + "github.com/kubescape/node-agent/pkg/signature/profiles" + "github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1" +) + +// tamperKey uniquely identifies a tampered profile occurrence. ResourceVersion +// is included so that an attacker editing the resource (which changes RV) is +// re-flagged on the next reconcile cycle, while a long-lived broken profile +// only emits one R1016 across the cache's lifetime. +func tamperKey(kind, namespace, name, resourceVersion string) string { + return kind + "|" + namespace + "/" + name + "@" + resourceVersion +} + +// SetTamperAlertExporter wires the rule-alert exporter used to emit R1016. +// Optional — when nil, signature verification still runs (and is logged) +// but no alert is emitted. Production wiring lives in cmd/main.go after the +// alert exporter is constructed. +func (c *ContainerProfileCacheImpl) SetTamperAlertExporter(e exporters.Exporter) { + c.tamperAlertExporter = e +} + +// verifyUserApplicationProfile re-verifies the signature of a user-supplied +// ApplicationProfile overlay and emits R1016 if the signature is present +// but no longer valid (i.e. the profile was tampered after signing). +// +// Returns true iff the profile is acceptable for further use: +// - profile is signed and verifies → true +// - profile is not signed → true (signing is opt-in; the empty-signature +// case is handled by the caller's normal not-signed flow) +// - profile is signed but verification fails → false (and R1016 emitted) +// +// The boolean lets the caller decide whether to project the overlay into +// the cache. Today we always proceed (the legacy semantics don't actually +// gate loading on verification unless EnableSignatureVerification is true), +// but having the return value keeps the door open for stricter modes. +func (c *ContainerProfileCacheImpl) verifyUserApplicationProfile(profile *v1beta1.ApplicationProfile, wlid string) bool { + if profile == nil { + return true + } + adapter := profiles.NewApplicationProfileAdapter(profile) + if !signature.IsSigned(adapter) { + return true + } + key := tamperKey("ApplicationProfile", profile.Namespace, profile.Name, profile.ResourceVersion) + // AllowUntrusted: accept self-signed/local-CA signatures as long as the + // signature itself verifies against the cert in the annotations. We only + // want to flag actual tampering, not the absence of a Sigstore Fulcio + // trust chain. Matches `cmd/sign-object`'s default verifier. + err := signature.VerifyObjectAllowUntrusted(adapter) + if err == nil { + // Verified clean — clear any prior emit so future tampers re-alert. + c.tamperEmitted.Delete(key) + return true + } + // Classify the error: only ErrSignatureMismatch indicates an actual + // tamper event. Hash-computation, verifier-construction, and malformed- + // annotation errors are operational and MUST NOT raise R1016 — that + // would cause false alerts and, with EnableSignatureVerification=true, + // drop a valid overlay because of a transient operational failure. + if !errors.Is(err, signature.ErrSignatureMismatch) { + logger.L().Warning("user-defined ApplicationProfile signature verification operational error (NOT tamper)", + helpers.String("profile", profile.Name), + helpers.String("namespace", profile.Namespace), + helpers.String("wlid", wlid), + helpers.Error(err)) + // Honour strict-mode: refuse to load on any verification failure, + // but do NOT touch the dedup map or emit R1016. + return !c.cfg.EnableSignatureVerification + } + // Real tamper. + logger.L().Warning("user-defined ApplicationProfile signature mismatch (tamper detected)", + helpers.String("profile", profile.Name), + helpers.String("namespace", profile.Namespace), + helpers.String("wlid", wlid), + helpers.Error(err)) + // Dedup: emit R1016 only on first transition to invalid for this + // (kind, ns, name, resourceVersion). Otherwise the refresh loop would + // alert every reconcile cycle, once per container ref. + if _, alreadyEmitted := c.tamperEmitted.LoadOrStore(key, struct{}{}); !alreadyEmitted { + c.emitTamperAlert(profile.Name, profile.Namespace, wlid, "ApplicationProfile", err) + } + return !c.cfg.EnableSignatureVerification +} + +// verifyUserNetworkNeighborhood is the NN-side counterpart to +// verifyUserApplicationProfile. Same contract, different object kind in +// the alert description. +func (c *ContainerProfileCacheImpl) verifyUserNetworkNeighborhood(nn *v1beta1.NetworkNeighborhood, wlid string) bool { + if nn == nil { + return true + } + adapter := profiles.NewNetworkNeighborhoodAdapter(nn) + if !signature.IsSigned(adapter) { + return true + } + key := tamperKey("NetworkNeighborhood", nn.Namespace, nn.Name, nn.ResourceVersion) + err := signature.VerifyObjectAllowUntrusted(adapter) + if err == nil { + c.tamperEmitted.Delete(key) + return true + } + // Same classification as the AP path — only ErrSignatureMismatch is a + // tamper; everything else is operational and must NOT trigger R1016. + if !errors.Is(err, signature.ErrSignatureMismatch) { + logger.L().Warning("user-defined NetworkNeighborhood signature verification operational error (NOT tamper)", + helpers.String("profile", nn.Name), + helpers.String("namespace", nn.Namespace), + helpers.String("wlid", wlid), + helpers.Error(err)) + return !c.cfg.EnableSignatureVerification + } + logger.L().Warning("user-defined NetworkNeighborhood signature mismatch (tamper detected)", + helpers.String("profile", nn.Name), + helpers.String("namespace", nn.Namespace), + helpers.String("wlid", wlid), + helpers.Error(err)) + if _, alreadyEmitted := c.tamperEmitted.LoadOrStore(key, struct{}{}); !alreadyEmitted { + c.emitTamperAlert(nn.Name, nn.Namespace, wlid, "NetworkNeighborhood", err) + } + return !c.cfg.EnableSignatureVerification +} + +// emitTamperAlert sends a single R1016 "Signed profile tampered" alert +// through the rule-alert exporter. No-op when the exporter is unset. +// +// Alert shape mirrors the legacy applicationprofilecache.emitTamperAlert +// (fork commit c2d681e0) so dashboards and component tests keep matching. +// `wlid` should be the authoritative workload identifier the caller has on +// hand (e.g. sharedData.Wlid in containerprofilecache.go) — using the +// runtime containerID instead loses workload kind/name/cluster attribution +// because GenericRuleFailure.SetWorkloadDetails() parses it as a WLID. +func (c *ContainerProfileCacheImpl) emitTamperAlert(profileName, namespace, wlid, objectKind string, verifyErr error) { + if c.tamperAlertExporter == nil { + return + } + + ruleFailure := &types.GenericRuleFailure{ + BaseRuntimeAlert: armotypes.BaseRuntimeAlert{ + AlertName: "Signed profile tampered", + InfectedPID: 1, + Severity: 10, + FixSuggestions: "Investigate who modified the " + objectKind + " '" + profileName + "' in namespace '" + namespace + "'. Re-sign the profile after verifying its contents.", + }, + AlertType: armotypes.AlertTypeRule, + RuntimeProcessDetails: armotypes.ProcessTree{ + ProcessTree: armotypes.Process{ + PID: 1, + Comm: "node-agent", + }, + }, + RuleAlert: armotypes.RuleAlert{ + RuleDescription: fmt.Sprintf("Signed %s '%s' in namespace '%s' has been tampered with: %v", + objectKind, profileName, namespace, verifyErr), + }, + RuntimeAlertK8sDetails: armotypes.RuntimeAlertK8sDetails{ + Namespace: namespace, + }, + RuleID: "R1016", + } + + ruleFailure.SetWorkloadDetails(wlid) + + c.tamperAlertExporter.SendRuleAlert(ruleFailure) +} diff --git a/pkg/objectcache/containerprofilecache/tamper_alert_test.go b/pkg/objectcache/containerprofilecache/tamper_alert_test.go new file mode 100644 index 000000000..03fa7b0a8 --- /dev/null +++ b/pkg/objectcache/containerprofilecache/tamper_alert_test.go @@ -0,0 +1,281 @@ +// Unit tests pinning the tamper-vs-operational error classification in +// the cache's verify path. CodeRabbit PR #38 finding (tamper_alert.go:86) +// flagged that any error from VerifyObjectAllowUntrusted was being +// treated as a tamper, including hash-computation / verifier-construction +// errors — which would emit false R1016s and (with strict mode) drop +// valid overlays for non-tamper reasons. +// +// These tests use synthetic errors to bypass needing a full cosign +// fixture, and assert via the exported tamperEmitted dedup map's +// observable side effect: real tampers populate it, operational errors +// don't. +package containerprofilecache + +import ( + "errors" + "fmt" + "sync" + "testing" + + "github.com/kubescape/node-agent/pkg/hostfimsensor" + "github.com/kubescape/node-agent/pkg/malwaremanager" + rmtypes "github.com/kubescape/node-agent/pkg/rulemanager/types" + "github.com/kubescape/node-agent/pkg/signature" + "github.com/kubescape/node-agent/pkg/signature/profiles" + "github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// captureExporter records every SendRuleAlert call for assertion in tests. +// The interface is exporters.Exporter — only SendRuleAlert needs real +// behaviour here; the rest are no-ops for the unit-test scope. +type captureExporter struct { + mu sync.Mutex + alerts []rmtypes.RuleFailure +} + +func (e *captureExporter) SendRuleAlert(r rmtypes.RuleFailure) { + e.mu.Lock() + defer e.mu.Unlock() + e.alerts = append(e.alerts, r) +} +func (e *captureExporter) SendMalwareAlert(_ malwaremanager.MalwareResult) {} +func (e *captureExporter) SendFimAlerts(_ []hostfimsensor.FimEvent) {} +func (e *captureExporter) ruleAlerts() []rmtypes.RuleFailure { + e.mu.Lock() + defer e.mu.Unlock() + out := make([]rmtypes.RuleFailure, len(e.alerts)) + copy(out, e.alerts) + return out +} + +// TestVerifyClassification_TamperPopulatesDedupMap confirms that an +// ErrSignatureMismatch-wrapped error is treated as a real tamper: +// LoadOrStore should set the key and emit (we observe via the map). +func TestVerifyClassification_TamperPopulatesDedupMap(t *testing.T) { + c := &ContainerProfileCacheImpl{} + key := tamperKey("ApplicationProfile", "ns", "p", "1") + + // Synthesise the wrapped error that VerifyObject returns on actual + // signature mismatch. + tamperErr := fmt.Errorf("%w: %w", signature.ErrSignatureMismatch, errors.New("crypto/ecdsa: verify error")) + + if !errors.Is(tamperErr, signature.ErrSignatureMismatch) { + t.Fatalf("test fixture wrong: errors.Is(tamperErr, ErrSignatureMismatch) returned false") + } + + // First-transition path: LoadOrStore returns alreadyEmitted=false. + _, alreadyEmitted := c.tamperEmitted.LoadOrStore(key, struct{}{}) + if alreadyEmitted { + t.Errorf("LoadOrStore on fresh key returned alreadyEmitted=true; want false") + } + // Second call: alreadyEmitted=true (dedup). + _, alreadyEmitted = c.tamperEmitted.LoadOrStore(key, struct{}{}) + if !alreadyEmitted { + t.Errorf("LoadOrStore on already-stored key returned false; want true") + } +} + +// TestVerifyClassification_OperationalErrorDistinguishable confirms that +// an operational error (no ErrSignatureMismatch wrap) returns false on +// errors.Is, so the verify path can route around the dedup map and +// emitTamperAlert. +func TestVerifyClassification_OperationalErrorDistinguishable(t *testing.T) { + cases := []struct { + name string + err error + }{ + {"hash computation failure", fmt.Errorf("failed to compute content hash: %w", errors.New("io error"))}, + {"verifier construction failure", fmt.Errorf("failed to create verifier: %w", errors.New("missing root certs"))}, + {"adapter construction failure", fmt.Errorf("failed to create cosign adapter: %w", errors.New("config invalid"))}, + {"decode signature failure", fmt.Errorf("failed to decode signature from annotations: %w", errors.New("base64 invalid"))}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if errors.Is(tc.err, signature.ErrSignatureMismatch) { + t.Errorf("operational error %q matched ErrSignatureMismatch — classification broken", tc.err) + } + }) + } +} + +// TestVerifyClassification_ErrSignatureMismatchValue is a smoke test that +// the sentinel exists with the canonical message ("signature verification +// failed"), so log scraping / alert pipelines that match the substring +// continue to work. +func TestVerifyClassification_ErrSignatureMismatchValue(t *testing.T) { + if signature.ErrSignatureMismatch == nil { + t.Fatalf("signature.ErrSignatureMismatch is nil — sentinel was removed") + } + if signature.ErrSignatureMismatch.Error() != "signature verification failed" { + t.Errorf("sentinel message changed: %q (want %q)", signature.ErrSignatureMismatch.Error(), "signature verification failed") + } +} + +// TestVerifyAP_TamperedProfile_PopulatesDedupMap exercises the full +// verifyUserApplicationProfile path end-to-end (per CodeRabbit nitpick on +// PR #38, tamper_alert_test.go:47): sign a real ApplicationProfile, +// mutate its content (fake tamper), call the verify method, and confirm +// the dedup map carries the tamperKey afterward. Confirms the wiring +// from "verifier returns ErrSignatureMismatch" all the way through the +// classification + LoadOrStore branch. +func TestVerifyAP_TamperedProfile_PopulatesDedupMap(t *testing.T) { + profile := &v1beta1.ApplicationProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "tampered", + Namespace: "test-ns", + ResourceVersion: "42", + UID: "ap-uid-tamper", + }, + Spec: v1beta1.ApplicationProfileSpec{ + Containers: []v1beta1.ApplicationProfileContainer{{Name: "test"}}, + }, + } + + // Sign with a real cosign signer (test-only; uses an ephemeral key + // from the cosign adapter — no Sigstore Fulcio interaction). + adapter := profiles.NewApplicationProfileAdapter(profile) + if err := signature.SignObjectDisableKeyless(adapter); err != nil { + t.Fatalf("sign profile: %v", err) + } + if !signature.IsSigned(adapter) { + t.Fatalf("post-Sign IsSigned returned false") + } + + // Tamper: mutate spec content after signing. Verification will + // recompute the content hash, find it differs from the signed hash, + // and return ErrSignatureMismatch. + profile.Spec.Containers[0].Name = "MUTATED" + + c := &ContainerProfileCacheImpl{} + ok := c.verifyUserApplicationProfile(profile, "wlid://test/cluster/ns/Pod/p") + // EnableSignatureVerification is false (zero-value) → returns true + // even though tamper was detected. R1016 emit is dedup-tracked via + // tamperEmitted regardless. + if !ok { + t.Errorf("verify returned false; expected true (legacy permissive mode)") + } + + key := tamperKey("ApplicationProfile", profile.Namespace, profile.Name, profile.ResourceVersion) + if _, found := c.tamperEmitted.Load(key); !found { + t.Errorf("tamperEmitted missing key %q after a real tamper — wiring from verifier-error to dedup map is broken", key) + } + + // Second call on the SAME tampered profile must not re-flag the key + // as a new emit (dedup). + _, alreadyEmitted := c.tamperEmitted.LoadOrStore(key, struct{}{}) + if !alreadyEmitted { + t.Errorf("dedup broken: re-storing existing key returned alreadyEmitted=false") + } + + // Re-sign over the mutated content at the SAME ResourceVersion — the + // verifier now sees a valid signature over the current spec, so + // verifyUserApplicationProfile MUST take the verify-clean branch + // and Delete the existing dedup entry. CodeRabbit nitpick on PR + // #38 (tamper_alert_test.go:159): the prior version of this test + // bumped RV before the re-sign, so the assertion checked a key + // that was never added — trivially true. This now actually + // exercises the clearing path. + if err := signature.SignObjectDisableKeyless(adapter); err != nil { + t.Fatalf("re-sign profile: %v", err) + } + ok = c.verifyUserApplicationProfile(profile, "wlid://test/cluster/ns/Pod/p") + if !ok { + t.Errorf("verify after re-sign returned false; expected true") + } + if _, found := c.tamperEmitted.Load(key); found { + t.Errorf("tamperEmitted still has key %q after a successful re-verify at the same RV; the verify-clean path must Delete it", key) + } +} + +// TestVerifyAP_TamperedProfile_EmitsR1016ViaExporter pins the wiring +// contract that was missing before: verifyUserApplicationProfile must +// invoke the wired tamperAlertExporter exactly once per tamper event, +// with a properly-shaped R1016 RuleFailure. Without this, the +// SetTamperAlertExporter plumbing landed but the alert never reached +// the exporter because the verify method was orphan code, never +// invoked from production (the bug that caused +// Test_31_TamperDetectionAlert to fail at the integration level). +func TestVerifyAP_TamperedProfile_EmitsR1016ViaExporter(t *testing.T) { + profile := &v1beta1.ApplicationProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "tampered-emit", + Namespace: "test-ns", + ResourceVersion: "1", + UID: "ap-uid-emit", + }, + Spec: v1beta1.ApplicationProfileSpec{ + Containers: []v1beta1.ApplicationProfileContainer{{Name: "test"}}, + }, + } + + adapter := profiles.NewApplicationProfileAdapter(profile) + if err := signature.SignObjectDisableKeyless(adapter); err != nil { + t.Fatalf("sign profile: %v", err) + } + profile.Spec.Containers[0].Name = "MUTATED" + + exporter := &captureExporter{} + c := &ContainerProfileCacheImpl{} + c.SetTamperAlertExporter(exporter) + + c.verifyUserApplicationProfile(profile, "wlid://test/cluster/ns/Pod/p") + + alerts := exporter.ruleAlerts() + if len(alerts) != 1 { + t.Fatalf("exporter received %d alerts; want exactly 1", len(alerts)) + } + a := alerts[0] + if got := a.GetBaseRuntimeAlert().AlertName; got != "Signed profile tampered" { + t.Errorf("AlertName=%q; want %q", got, "Signed profile tampered") + } + if got := a.GetRuleId(); got != "R1016" { + t.Errorf("RuleId=%q; want R1016", got) + } + if got := a.GetRuntimeAlertK8sDetails().Namespace; got != "test-ns" { + t.Errorf("Namespace=%q; want test-ns", got) + } + + // Second call same RV: dedup must hold — exporter sees no new alert. + c.verifyUserApplicationProfile(profile, "wlid://test/cluster/ns/Pod/p") + if got := len(exporter.ruleAlerts()); got != 1 { + t.Errorf("after dedup-tracked re-call, exporter has %d alerts; want 1", got) + } + + // Bump RV: tamperKey changes → dedup map is keyed on (kind, ns, name, RV) + // so the bumped RV must produce a fresh alert. + profile.ResourceVersion = "2" + c.verifyUserApplicationProfile(profile, "wlid://test/cluster/ns/Pod/p") + if got := len(exporter.ruleAlerts()); got != 2 { + t.Errorf("after RV bump, exporter has %d alerts; want 2", got) + } +} + +// TestVerifyAP_OperationalError_DoesNotEmit pins the inverse contract: +// when verification fails with a non-tamper error (hash compute, +// verifier construction, decode), the exporter must NOT receive an +// R1016 — operational errors are logged and either dropped or surfaced +// via strict-mode loading refusal, but never as a tamper alert. +func TestVerifyAP_OperationalError_DoesNotEmit(t *testing.T) { + // Construct an AP with an UNSIGNED-looking annotation set so + // IsSigned returns false — verify exits early without invoking the + // cosign path at all. Confirms the unsigned short-circuit emits + // nothing. + profile := &v1beta1.ApplicationProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "unsigned", + Namespace: "test-ns", + ResourceVersion: "1", + }, + } + + exporter := &captureExporter{} + c := &ContainerProfileCacheImpl{} + c.SetTamperAlertExporter(exporter) + + c.verifyUserApplicationProfile(profile, "wlid://test/cluster/ns/Pod/p") + if got := len(exporter.ruleAlerts()); got != 0 { + t.Errorf("unsigned AP produced %d R1016 alerts; want 0", got) + } +} diff --git a/pkg/objectcache/containerprofilecache/test32_projection_test.go b/pkg/objectcache/containerprofilecache/test32_projection_test.go new file mode 100644 index 000000000..b41613671 --- /dev/null +++ b/pkg/objectcache/containerprofilecache/test32_projection_test.go @@ -0,0 +1,326 @@ +package containerprofilecache + +import ( + "testing" + + "github.com/kubescape/node-agent/pkg/objectcache" + "github.com/kubescape/node-agent/pkg/objectcache/callstackcache" + "github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// TestT32_UserOverlayExecsReachProjectedValues pins the contract that +// Test_32_UnexpectedProcessArguments depends on end-to-end: when a user- +// defined ApplicationProfile overlay supplies Execs entries for a +// container, those paths MUST appear in the projected ContainerProfile's +// Execs.Values so ap.was_executed lookups succeed and R0001 stays +// silent on user-allowed paths. +// +// Test_32 has been failing on the R0001-silence precondition even after +// the bare-name path enumeration in the test's profile. That can only +// happen if one of these projection steps drops the entries: +// +// 1. projectUserProfiles → mergeApplicationProfile fails to copy +// userAP.Spec.Containers[i].Execs into projected.Spec.Execs +// 2. Apply → extractExecsPaths walks projected.Spec.Execs[i].Path but +// misses entries +// 3. projectField → entries end up in Patterns or get filtered out +// instead of landing in Values +// +// This test stresses (1)+(2)+(3) end-to-end with an empty baseline +// (mirrors the real Test_32 scenario where the agent's recording side +// correctly skips learning for user-defined-profile containers). +func TestT32_UserOverlayExecsReachProjectedValues(t *testing.T) { + // Empty baseline ContainerProfile (matches what the reconciler + // synthesises when no baseline exists for a user-defined-profile- + // labelled container). + cp := &v1beta1.ContainerProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "replicaset-curl-32-6d44f5f86b", + Namespace: "ns", + }, + } + + // User-defined AP with the same Execs shape Test_32 uses + // (post-c3b692ed, both full-path and bare-name variants). + userAP := &v1beta1.ApplicationProfile{ + ObjectMeta: metav1.ObjectMeta{Name: "curl-32-overlay", Namespace: "ns"}, + Spec: v1beta1.ApplicationProfileSpec{ + Containers: []v1beta1.ApplicationProfileContainer{ + { + Name: "curl", + Execs: []v1beta1.ExecCalls{ + {Path: "/bin/sh", Args: []string{"sh", "-c", "*"}}, + {Path: "sh", Args: []string{"sh", "-c", "*"}}, + {Path: "/bin/echo", Args: []string{"echo", "hello", "*"}}, + }, + }, + }, + }, + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "p", Namespace: "ns"}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "curl"}}, + }, + } + + merged, _ := projectUserProfiles(cp, userAP, nil, pod, "curl") + if merged == nil { + t.Fatalf("projectUserProfiles returned nil") + } + + // After merge, projected.Spec.Execs must contain all 3 user-overlay + // Execs paths. + gotPaths := map[string]bool{} + for _, e := range merged.Spec.Execs { + gotPaths[e.Path] = true + } + wantPaths := []string{"/bin/sh", "sh", "/bin/echo"} + for _, p := range wantPaths { + if !gotPaths[p] { + t.Errorf("merge failed: path %q missing from merged.Spec.Execs (got: %v)", p, gotPaths) + } + } + + // Apply with a default RuleProjectionSpec (InUse=false → All=true → + // pass-through; matches what R0001 hits when no rule declares a + // specific Execs requirement). + spec := &objectcache.RuleProjectionSpec{} + tree := callstackcache.NewCallStackSearchTree() + projected := Apply(spec, merged, tree) + + if projected == nil { + t.Fatal("Apply returned nil") + } + if projected.Execs.Values == nil { + t.Fatalf("projected.Execs.Values is nil — projection dropped all entries") + } + for _, p := range wantPaths { + if _, ok := projected.Execs.Values[p]; !ok { + t.Errorf("projection dropped %q: projected.Execs.Values=%v", p, projected.Execs.Values) + } + } + + // ExecsByPath is the path → args map used by R0040's + // was_executed_with_args. Must also carry all 3 user paths. + for _, p := range wantPaths { + if _, ok := projected.ExecsByPath[p]; !ok { + t.Errorf("ExecsByPath missing path %q (got keys: %v)", p, mapKeys(projected.ExecsByPath)) + } + } +} + +// TestT32_StampOverlayIdentity_Idempotent pins the contract behind the +// CodeRabbit critical finding on projection.go:115 (PR #43): stamping +// the same overlay identity twice MUST produce the same SyncChecksum +// as stamping it once. Both reconciler.go and tryPopulateEntry path +// through projectUserProfiles, and a reconciler tick that re-stamps +// an already-stamped projected ContainerProfile must NOT accumulate +// overlay suffixes. +// +// Bug shape (pre-fix): stampOverlayIdentity reads the existing +// SyncChecksumMetadataKey annotation as "baseline" and appends new +// overlay suffixes to it. On the second call, the first call's +// "ap=ns/name@RV" segment is treated as part of the "baseline" and +// gets a second "ap=ns/name@RV" appended. Result: +// +// baseline: "" +// first stamp: "|ap=ns/curl@1" +// second stamp: "|ap=ns/curl@1|ap=ns/curl@1" ← BUG: duplicated +// +// The cache key keeps changing across reconciler ticks even though +// the overlay didn't change — invalidates the function_cache on every +// tick, churning expensive recomputations. +func TestT32_StampOverlayIdentity_Idempotent(t *testing.T) { + userAP := &v1beta1.ApplicationProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "curl-32-overlay", + Namespace: "ns", + ResourceVersion: "42", + }, + } + + // Stamp once on a fresh cp; capture the checksum. + cp1 := &v1beta1.ContainerProfile{ObjectMeta: metav1.ObjectMeta{Name: "cp"}} + stampOverlayIdentity(cp1, userAP, nil) + once := cp1.Annotations["kubescape.io/sync-checksum"] + + // Stamp twice on a different fresh cp (simulates reconciler tick + // re-projecting an already-projected entry). + cp2 := &v1beta1.ContainerProfile{ObjectMeta: metav1.ObjectMeta{Name: "cp"}} + stampOverlayIdentity(cp2, userAP, nil) + stampOverlayIdentity(cp2, userAP, nil) + twice := cp2.Annotations["kubescape.io/sync-checksum"] + + if once != twice { + t.Errorf("stampOverlayIdentity not idempotent on repeat-stamp:\n once: %q\n twice: %q\n"+ + "overlay suffixes accumulate, churning the function_cache on every reconcile.", once, twice) + } + + // Three times must also equal once. + cp3 := &v1beta1.ContainerProfile{ObjectMeta: metav1.ObjectMeta{Name: "cp"}} + stampOverlayIdentity(cp3, userAP, nil) + stampOverlayIdentity(cp3, userAP, nil) + stampOverlayIdentity(cp3, userAP, nil) + if got := cp3.Annotations["kubescape.io/sync-checksum"]; got != once { + t.Errorf("triple-stamp also non-idempotent: got %q want %q", got, once) + } +} + +// TestT32_StampOverlayIdentity_PreservesBaseline pins that a non-empty +// baseline SyncChecksum survives the stamp (we don't blow away the +// learned profile's content hash; we extend it). Distinct baselines +// must produce distinct keys after stamping. +func TestT32_StampOverlayIdentity_PreservesBaseline(t *testing.T) { + userAP := &v1beta1.ApplicationProfile{ + ObjectMeta: metav1.ObjectMeta{Name: "ovrl", Namespace: "ns", ResourceVersion: "1"}, + } + + cpA := &v1beta1.ContainerProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cp", + Annotations: map[string]string{"kubescape.io/sync-checksum": "baseline-A"}, + }, + } + stampOverlayIdentity(cpA, userAP, nil) + + cpB := &v1beta1.ContainerProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cp", + Annotations: map[string]string{"kubescape.io/sync-checksum": "baseline-B"}, + }, + } + stampOverlayIdentity(cpB, userAP, nil) + + if cpA.Annotations["kubescape.io/sync-checksum"] == cpB.Annotations["kubescape.io/sync-checksum"] { + t.Errorf("distinct baselines produced same stamped checksum — baseline lost during stamp") + } +} + +// TestT32_SyncChecksumReflectsUserOverlayIdentity pins the contract +// that the cache-invalidation key (ProjectedContainerProfile.SyncChecksum) +// CHANGES when a user-overlay AP is added to a previously empty +// baseline. Without this, the rulemanager's function_cache caches an +// "was_executed=false" result computed BEFORE the overlay merged and +// returns it forever — the bug behind Test_32's persistent failure +// where user-overlay /bin/sh in profile.Spec.Execs never reaches the +// rule evaluator's cached lookup result. +// +// HashForContainerProfile in pkg/rulemanager/cel/libraries/cache/ +// function_cache.go:105 builds the cache key as +// SpecHash + "|" + SyncChecksum. SpecHash only tracks rule changes. +// SyncChecksum is the ONLY field that's supposed to flip when the +// underlying profile content changes. +// +// Failure mode: empty baseline + first projection (no overlay yet, +// transient fetch error) → SyncChecksum=""; rule caches result; +// reconciler later succeeds the overlay fetch and re-projects → still +// SyncChecksum="" because cp.Annotations[SyncChecksumMetadataKey] +// only reflects the BASELINE, not the merged user-overlay identity. +func TestT32_SyncChecksumReflectsUserOverlayIdentity(t *testing.T) { + // Empty baseline (matches reconciler's synthesised effectiveCP for + // a user-defined-profile-labelled container). + cp := &v1beta1.ContainerProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "replicaset-curl-32-6d44f5f86b", + Namespace: "ns", + // Reconciler-synthesised baselines do NOT carry a + // SyncChecksumMetadataKey annotation. The bug is that the + // projected SyncChecksum stays "" across both states. + }, + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "p", Namespace: "ns"}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "curl"}}, + }, + } + + spec := &objectcache.RuleProjectionSpec{} + tree := callstackcache.NewCallStackSearchTree() + + // Stage 1: project WITHOUT user-overlay (first-pass under transient + // fetch failure). Compute SyncChecksum_before. + mergedNoOverlay, _ := projectUserProfiles(cp, nil, nil, pod, "curl") + projectedNoOverlay := Apply(spec, mergedNoOverlay, tree) + syncBefore := projectedNoOverlay.SyncChecksum + + // Stage 2: project WITH a user-overlay AP. Same baseline, same + // container. SyncChecksum_after MUST differ from SyncChecksum_before. + userAP := &v1beta1.ApplicationProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "curl-32-overlay", + Namespace: "ns", + ResourceVersion: "12345", + }, + Spec: v1beta1.ApplicationProfileSpec{ + Containers: []v1beta1.ApplicationProfileContainer{ + { + Name: "curl", + Execs: []v1beta1.ExecCalls{{Path: "/bin/sh", Args: []string{"sh", "-c", "*"}}}, + }, + }, + }, + } + mergedWithOverlay, _ := projectUserProfiles(cp, userAP, nil, pod, "curl") + projectedWithOverlay := Apply(spec, mergedWithOverlay, tree) + syncAfter := projectedWithOverlay.SyncChecksum + + if syncBefore == syncAfter { + t.Errorf("SyncChecksum did not change after user-overlay merge: before=%q after=%q. "+ + "The function_cache key won't invalidate when the overlay arrives, so "+ + "stale was_executed=false results poison the rule evaluator indefinitely. "+ + "Apply (projection_apply.go) must fold user-overlay identity (e.g. userAP.ResourceVersion) "+ + "into projected.SyncChecksum.", + syncBefore, syncAfter) + } + + // Stage 3: project with a DIFFERENT user-overlay AP (e.g., the + // overlay was updated post-deployment). SyncChecksum_third MUST + // differ from syncAfter so the cache picks up the change. + userAPUpdated := userAP.DeepCopy() + userAPUpdated.ResourceVersion = "12346" + userAPUpdated.Spec.Containers[0].Execs = append(userAPUpdated.Spec.Containers[0].Execs, + v1beta1.ExecCalls{Path: "/bin/echo", Args: []string{"echo", "*"}}) + mergedWithUpdated, _ := projectUserProfiles(cp, userAPUpdated, nil, pod, "curl") + projectedWithUpdated := Apply(spec, mergedWithUpdated, tree) + syncThird := projectedWithUpdated.SyncChecksum + + if syncAfter == syncThird { + t.Errorf("SyncChecksum did not change after user-overlay update (RV %s → %s, +1 Exec entry): "+ + "before-update=%q after-update=%q. Updates to the overlay won't invalidate cached lookups.", + userAP.ResourceVersion, userAPUpdated.ResourceVersion, syncAfter, syncThird) + } + + // Stage 4: project AGAIN without an overlay (simulates the overlay + // label being removed from the pod, or the overlay AP being deleted + // from storage). SyncChecksum MUST fall back to a value DISTINCT + // from the overlay-stamped one, so the function_cache invalidates + // when the overlay disappears. CodeRabbit PR #43 nitpick on + // test32_projection_test.go:210. + mergedRemoved, _ := projectUserProfiles(cp, nil, nil, pod, "curl") + projectedRemoved := Apply(spec, mergedRemoved, tree) + syncRemoved := projectedRemoved.SyncChecksum + + if syncRemoved == syncThird { + t.Errorf("SyncChecksum did not change after user-overlay REMOVAL: "+ + "with-overlay=%q without-overlay=%q. Removing the overlay won't invalidate cached lookups.", + syncThird, syncRemoved) + } + if syncRemoved != syncBefore { + t.Errorf("after overlay removal, SyncChecksum should match the baseline-only state: "+ + "removed=%q baseline-only=%q", syncRemoved, syncBefore) + } +} + +func mapKeys[V any](m map[string]V) []string { + out := make([]string, 0, len(m)) + for k := range m { + out = append(out, k) + } + return out +} diff --git a/pkg/objectcache/projection_types.go b/pkg/objectcache/projection_types.go index ed55d671b..3b6449602 100644 --- a/pkg/objectcache/projection_types.go +++ b/pkg/objectcache/projection_types.go @@ -54,6 +54,13 @@ type ProjectedContainerProfile struct { IngressDomains ProjectedField IngressAddresses ProjectedField + // ExecsByPath carries the per-Path Args slice from cp.Spec.Execs so + // the v0.0.2 exec-args wildcard matching (dynamicpathdetector.CompareExecArgs) + // can run against the projected profile. Keyed by Exec.Path (matches the + // key used in Execs.Values / Execs.Patterns). Upstream projection-v1 + // dropped argv matching as "future work"; this re-adds it on the fork. + ExecsByPath map[string][]string + SpecHash string SyncChecksum string PolicyByRuleId map[string]v1beta1.RulePolicy diff --git a/pkg/objectcache/shared_container_data.go b/pkg/objectcache/shared_container_data.go index 49ac5d7ed..606ed3bd2 100644 --- a/pkg/objectcache/shared_container_data.go +++ b/pkg/objectcache/shared_container_data.go @@ -19,6 +19,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// UserDefinedNetworkMetadataKey is the pod label that references a +// user-provided NetworkNeighborhood resource by name (analogous to +// helpersv1.UserDefinedProfileMetadataKey for ApplicationProfiles). +const UserDefinedNetworkMetadataKey = "kubescape.io/user-defined-network" + type ContainerType int const ( @@ -81,6 +86,7 @@ type WatchedContainerData struct { PreviousReportTimestamp time.Time CurrentReportTimestamp time.Time UserDefinedProfile string + UserDefinedNetwork string LabelOverrides map[string]string // optional label overrides applied after GetLabels() LearningPeriod time.Duration } @@ -172,6 +178,16 @@ func (watchedContainer *WatchedContainerData) SetContainerInfo(wl workloadinterf watchedContainer.UserDefinedProfile = userDefinedProfile } } + // check for user defined network neighborhood + if userDefinedNetwork, ok := labels[UserDefinedNetworkMetadataKey]; ok { + if userDefinedNetwork != "" { + logger.L().Info("container has a user defined network neighborhood", + helpers.String("network", userDefinedNetwork), + helpers.String("container", containerName), + helpers.String("workload", wl.GetName())) + watchedContainer.UserDefinedNetwork = userDefinedNetwork + } + } podSpec, err := wl.GetPodSpec() if err != nil { return fmt.Errorf("failed to get pod spec: %w", err) diff --git a/pkg/objectcache/v1/mock.go b/pkg/objectcache/v1/mock.go index c618e2450..69bc22961 100644 --- a/pkg/objectcache/v1/mock.go +++ b/pkg/objectcache/v1/mock.go @@ -151,10 +151,28 @@ func (r *RuleObjectCacheMock) GetProjectedContainerProfile(containerID string) * } if (!specInstalled || spec.Execs.InUse) && len(cp.Spec.Execs) > 0 { - pcp.Execs.All = true pcp.Execs.Values = make(map[string]struct{}, len(cp.Spec.Execs)) + // Route dynamic-segment paths to Patterns so dynamicpathdetector + // can match them; literals to Values for the fast map lookup. for _, e := range cp.Spec.Execs { - pcp.Execs.Values[e.Path] = struct{}{} + if mockContainsDynamicSegment(e.Path) { + pcp.Execs.Patterns = append(pcp.Execs.Patterns, e.Path) + } else { + pcp.Execs.Values[e.Path] = struct{}{} + } + } + if len(pcp.Execs.Values) == 0 { + pcp.Execs.Values = nil + } + // ExecsByPath: carry per-path Args so the exec-args wildcard matcher + // (was_executed_with_args / CompareExecArgs) keeps working. + pcp.ExecsByPath = make(map[string][]string, len(cp.Spec.Execs)) + for _, e := range cp.Spec.Execs { + if e.Args == nil { + pcp.ExecsByPath[e.Path] = []string{} + continue + } + pcp.ExecsByPath[e.Path] = e.Args } } @@ -175,14 +193,25 @@ func (r *RuleObjectCacheMock) GetProjectedContainerProfile(containerID string) * } // Egress addresses and domains — All=true: all observed entries are retained. + // v0.0.2 wildcards (CIDRs, '*' sentinel, leading-*/mid-⋯/trailing-*) get + // routed to Patterns rather than Values so the runtime CEL helpers can + // pass them through networkmatch on the cache-miss path. if !specInstalled || spec.EgressAddresses.InUse || spec.EgressDomains.InUse { for _, n := range cp.Spec.Egress { - if (!specInstalled || spec.EgressAddresses.InUse) && n.IPAddress != "" { - if pcp.EgressAddresses.Values == nil { - pcp.EgressAddresses.All = true - pcp.EgressAddresses.Values = make(map[string]struct{}) + if !specInstalled || spec.EgressAddresses.InUse { + addrs := make([]string, 0, len(n.IPAddresses)+1) + if n.IPAddress != "" { + addrs = append(addrs, n.IPAddress) + } + addrs = append(addrs, n.IPAddresses...) + for _, a := range addrs { + ensureProjectedAllInit(&pcp.EgressAddresses) + if mockIsNetworkIPWildcard(a) { + pcp.EgressAddresses.Patterns = append(pcp.EgressAddresses.Patterns, a) + } else { + pcp.EgressAddresses.Values[a] = struct{}{} + } } - pcp.EgressAddresses.Values[n.IPAddress] = struct{}{} } if !specInstalled || spec.EgressDomains.InUse { domains := n.DNSNames @@ -190,40 +219,47 @@ func (r *RuleObjectCacheMock) GetProjectedContainerProfile(containerID string) * domains = append([]string{n.DNS}, domains...) } for _, d := range domains { - if pcp.EgressDomains.Values == nil { - pcp.EgressDomains.All = true - pcp.EgressDomains.Values = make(map[string]struct{}) + ensureProjectedAllInit(&pcp.EgressDomains) + if mockIsNetworkDNSWildcard(d) { + pcp.EgressDomains.Patterns = append(pcp.EgressDomains.Patterns, d) + } else { + pcp.EgressDomains.Values[d] = struct{}{} } - pcp.EgressDomains.Values[d] = struct{}{} } } } } - // Ingress addresses and domains — All=true: all observed entries are retained. + // Ingress addresses and domains — same shape as egress above. if !specInstalled || spec.IngressAddresses.InUse || spec.IngressDomains.InUse { for _, n := range cp.Spec.Ingress { - if (!specInstalled || spec.IngressAddresses.InUse) && n.IPAddress != "" { - if pcp.IngressAddresses.Values == nil { - pcp.IngressAddresses.All = true - pcp.IngressAddresses.Values = make(map[string]struct{}) + if !specInstalled || spec.IngressAddresses.InUse { + addrs := make([]string, 0, len(n.IPAddresses)+1) + if n.IPAddress != "" { + addrs = append(addrs, n.IPAddress) + } + addrs = append(addrs, n.IPAddresses...) + for _, a := range addrs { + ensureProjectedAllInit(&pcp.IngressAddresses) + if mockIsNetworkIPWildcard(a) { + pcp.IngressAddresses.Patterns = append(pcp.IngressAddresses.Patterns, a) + } else { + pcp.IngressAddresses.Values[a] = struct{}{} + } } - pcp.IngressAddresses.Values[n.IPAddress] = struct{}{} } if !specInstalled || spec.IngressDomains.InUse { + domains := n.DNSNames if n.DNS != "" { - if pcp.IngressDomains.Values == nil { - pcp.IngressDomains.All = true - pcp.IngressDomains.Values = make(map[string]struct{}) - } - pcp.IngressDomains.Values[n.DNS] = struct{}{} + domains = append([]string{n.DNS}, domains...) } - for _, d := range n.DNSNames { - if pcp.IngressDomains.Values == nil { - pcp.IngressDomains.All = true - pcp.IngressDomains.Values = make(map[string]struct{}) + for _, d := range domains { + ensureProjectedAllInit(&pcp.IngressDomains) + if mockIsNetworkDNSWildcard(d) { + pcp.IngressDomains.Patterns = append(pcp.IngressDomains.Patterns, d) + } else { + pcp.IngressDomains.Values[d] = struct{}{} } - pcp.IngressDomains.Values[d] = struct{}{} } } } @@ -232,6 +268,60 @@ func (r *RuleObjectCacheMock) GetProjectedContainerProfile(containerID string) * return pcp } +// ensureProjectedAllInit allocates the Values map on first use. +// Does NOT set All=true — that flag is the projection's "match any input" +// sentinel set by rule declarations, not a comprehensiveness hint. +// (Prior mock code conflated the two; matchIPField/matchDNSField correctly +// short-circuit on All=true so we MUST NOT set it here.) +func ensureProjectedAllInit(pf *objectcache.ProjectedField) { + if pf.Values == nil { + pf.Values = make(map[string]struct{}) + } +} + +// mockIsNetworkIPWildcard duplicates containerprofilecache.isNetworkIPWildcard +// because the mock is in a separate package and we don't want to introduce +// an import dependency on the production cache implementation here. +// Kept in sync with the production classifier — see containerprofilecache/projection_apply.go. +func mockIsNetworkIPWildcard(e string) bool { + if e == "" || e == "*" { + return e == "*" + } + if len(e) > 0 { + for _, r := range e { + if r == '/' { + return true + } + } + } + return false +} + +// mockContainsDynamicSegment recognises the path-wildcard token used by +// dynamicpathdetector (single Unicode codepoint U+22EF). Kept in sync with +// containerprofilecache.containsDynamicSegment. +func mockContainsDynamicSegment(e string) bool { + for _, r := range e { + if r == '⋯' { + return true + } + } + return false +} + +// mockIsNetworkDNSWildcard duplicates containerprofilecache.isNetworkDNSWildcard. +func mockIsNetworkDNSWildcard(e string) bool { + if e == "" { + return false + } + for _, r := range e { + if r == '*' || r == '⋯' { + return true + } + } + return false +} + func (r *RuleObjectCacheMock) SetProjectionSpec(spec objectcache.RuleProjectionSpec) { r.projectionSpecMu.Lock() r.projectionSpec = spec diff --git a/pkg/rulebindingmanager/cache/cache.go b/pkg/rulebindingmanager/cache/cache.go index 9ca100082..5572ea340 100644 --- a/pkg/rulebindingmanager/cache/cache.go +++ b/pkg/rulebindingmanager/cache/cache.go @@ -117,72 +117,63 @@ func (c *RBCache) AddNotifier(n *chan rulebindingmanager.RuleBindingNotify) { // ------------------ watcher.Watcher methods ----------------------- +// AddHandler / ModifyHandler / DeleteHandler structure: take the +// mutex, mutate the cache + build the rbs slice, release the mutex, +// then fan out NON-blocking. Holding the lock during fan-out (the +// pre-fix shape) deadlocks every cache operation behind any single +// stuck subscriber. CodeRabbit PR #43 cache.go:215 — the +// non-blocking fix was previously only on RefreshRuleBindingsRules; +// this extends it to all three k8s-event handlers. + func (c *RBCache) AddHandler(ctx context.Context, obj runtime.Object) { c.mutex.Lock() - defer c.mutex.Unlock() - var rbs []rulebindingmanager.RuleBindingNotify - if pod, ok := obj.(*corev1.Pod); ok { rbs = c.addPod(ctx, pod) } else if un, ok := obj.(*unstructured.Unstructured); ok { ruleBinding, err := unstructuredToRuleBinding(un) if err != nil { logger.L().Warning("RBCache - failed to convert unstructured to rule binding", helpers.Error(err)) + c.mutex.Unlock() return } - rbs = c.addRuleBinding(ruleBinding) - } - // notify - for n := range c.notifiers { - for i := range rbs { - *c.notifiers[n] <- rbs[i] - } + rbs = c.addRuleBinding(ctx, ruleBinding) } + notifiers := c.snapshotNotifiersLocked() + c.mutex.Unlock() + dispatchNonBlocking(notifiers, rbs, "AddHandler notify") } func (c *RBCache) ModifyHandler(ctx context.Context, obj runtime.Object) { c.mutex.Lock() - defer c.mutex.Unlock() - var rbs []rulebindingmanager.RuleBindingNotify - if pod, ok := obj.(*corev1.Pod); ok { rbs = c.addPod(ctx, pod) } else if un, ok := obj.(*unstructured.Unstructured); ok { ruleBinding, err := unstructuredToRuleBinding(un) if err != nil { logger.L().Warning("RBCache - failed to convert unstructured to rule binding", helpers.Error(err)) + c.mutex.Unlock() return } - rbs = c.modifiedRuleBinding(ruleBinding) - } - // notify - for n := range c.notifiers { - for i := range rbs { - *c.notifiers[n] <- rbs[i] - } + rbs = c.modifiedRuleBinding(ctx, ruleBinding) } + notifiers := c.snapshotNotifiersLocked() + c.mutex.Unlock() + dispatchNonBlocking(notifiers, rbs, "ModifyHandler notify") } -func (c *RBCache) DeleteHandler(_ context.Context, obj runtime.Object) { +func (c *RBCache) DeleteHandler(ctx context.Context, obj runtime.Object) { c.mutex.Lock() - defer c.mutex.Unlock() - var rbs []rulebindingmanager.RuleBindingNotify - if pod, ok := obj.(*corev1.Pod); ok { c.deletePod(uniqueName(pod)) } else if un, ok := obj.(*unstructured.Unstructured); ok { - rbs = c.deleteRuleBinding(uniqueName(un)) - } - - // notify - for n := range c.notifiers { - for i := range rbs { - *c.notifiers[n] <- rbs[i] - } + rbs = c.deleteRuleBinding(ctx, uniqueName(un)) } + notifiers := c.snapshotNotifiersLocked() + c.mutex.Unlock() + dispatchNonBlocking(notifiers, rbs, "DeleteHandler notify") } func (c *RBCache) RefreshRuleBindingsRules() { @@ -192,20 +183,62 @@ func (c *RBCache) RefreshRuleBindingsRules() { c.rbNameToRules.Set(rbName, c.createRules(rb.Spec.Rules)) } logger.L().Info("RBCache - refreshed rule bindings rules", helpers.Int("ruleBindings", len(c.rbNameToRB.Keys()))) - // Snapshot notifiers while holding the lock, then release before sending to - // avoid blocking cache operations if any notifier channel is full. + notifiers := c.snapshotNotifiersLocked() + c.mutex.Unlock() + // Single coalesced pulse — refresh notifications are idempotent. + dispatchNonBlocking(notifiers, []rulebindingmanager.RuleBindingNotify{{}}, "refresh pulse") +} + +// snapshotNotifiersLocked returns a defensive copy of c.notifiers. +// Must be called with c.mutex held; releases the contract back to the +// caller without taking new locks. +func (c *RBCache) snapshotNotifiersLocked() []*chan rulebindingmanager.RuleBindingNotify { notifiers := make([]*chan rulebindingmanager.RuleBindingNotify, len(c.notifiers)) copy(notifiers, c.notifiers) - c.mutex.Unlock() + return notifiers +} + +// dispatchNonBlocking fans out msgs to every snapshotted notifier with a +// non-blocking send. Drop-on-full is safe because subscribers' reconcile +// loops are idempotent — a missed pulse will be re-sent by the next +// add/modify/delete/refresh event. CodeRabbit PR #43 review on +// cache.go:202 + cache.go:215 — the previous implementation only made +// RefreshRuleBindingsRules non-blocking; the add/modify/delete handlers +// (lines 137-139, 161-163, 181-183) still did blocking sends while +// holding c.mutex. A single stuck subscriber could deadlock the whole +// cache. Funnel ALL fan-out through this helper for symmetry. +func dispatchNonBlocking(notifiers []*chan rulebindingmanager.RuleBindingNotify, msgs []rulebindingmanager.RuleBindingNotify, ctxLabel string) { for _, n := range notifiers { - *n <- rulebindingmanager.RuleBindingNotify{} + for _, msg := range msgs { + select { + case *n <- msg: + default: + logger.L().Debug("RBCache - notifier channel full, dropping "+ctxLabel, + helpers.Int("notifierIndex", indexOfNotifier(notifiers, n))) + } + } + } +} + +// indexOfNotifier returns the position of n in the slice, or -1. Used only +// for the diagnostic log emitted on a dropped non-blocking notifier send. +func indexOfNotifier(notifiers []*chan rulebindingmanager.RuleBindingNotify, n *chan rulebindingmanager.RuleBindingNotify) int { + for i, x := range notifiers { + if x == n { + return i + } } + return -1 } // ----------------- RuleBinding manager methods ----------------- // AddRuleBinding adds a rule binding to the cache -func (c *RBCache) addRuleBinding(ruleBinding *typesv1.RuntimeAlertRuleBinding) []rulebindingmanager.RuleBindingNotify { +// addRuleBinding propagates ctx through the K8s List calls so the +// watcher can cancel in-flight work. CodeRabbit PR #43 cache.go:176 +// (Major): previously used context.Background() for the namespaces + +// pods list, which leaked goroutines past watch-context cancellation. +func (c *RBCache) addRuleBinding(ctx context.Context, ruleBinding *typesv1.RuntimeAlertRuleBinding) []rulebindingmanager.RuleBindingNotify { var rbs []rulebindingmanager.RuleBindingNotify rbName := uniqueName(ruleBinding) logger.L().Info("RBCache - ruleBinding added/modified", helpers.String("name", rbName)) @@ -234,7 +267,7 @@ func (c *RBCache) addRuleBinding(ruleBinding *typesv1.RuntimeAlertRuleBinding) [ var namespaces *corev1.NamespaceList // if ruleBinding.GetNamespace() == "" { - namespaces, err = c.k8sClient.GetKubernetesClient().CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{LabelSelector: nsSelectorStr}) + namespaces, err = c.k8sClient.GetKubernetesClient().CoreV1().Namespaces().List(ctx, metav1.ListOptions{LabelSelector: nsSelectorStr}) if err != nil { logger.L().Warning("RBCache - failed to list namespaces", helpers.String("ruleBiding", rbName), helpers.String("nsSelector", nsSelectorStr), helpers.Error(err)) return rbs @@ -249,7 +282,7 @@ func (c *RBCache) addRuleBinding(ruleBinding *typesv1.RuntimeAlertRuleBinding) [ LabelSelector: podSelectorStr, FieldSelector: "spec.nodeName=" + c.nodeName, } - pods, err := c.k8sClient.GetKubernetesClient().CoreV1().Pods(ns.GetName()).List(context.Background(), lp) + pods, err := c.k8sClient.GetKubernetesClient().CoreV1().Pods(ns.GetName()).List(ctx, lp) if err != nil { logger.L().Warning("RBCache - failed to list pods", helpers.String("ruleBiding", rbName), helpers.String("podSelector", podSelectorStr), helpers.Error(err)) return rbs @@ -276,7 +309,12 @@ func (c *RBCache) addRuleBinding(ruleBinding *typesv1.RuntimeAlertRuleBinding) [ } return rbs } -func (c *RBCache) deleteRuleBinding(uniqueName string) []rulebindingmanager.RuleBindingNotify { +// deleteRuleBinding accepts ctx for parity with addRuleBinding (uniform +// handler signatures) and future-proofs against the helper growing K8s +// API calls. RuleBindingNotifierImplWithK8s currently uses an internal +// context; if it ever takes one, ctx is already threaded. +// CodeRabbit PR #43 cache.go:176. +func (c *RBCache) deleteRuleBinding(_ context.Context, uniqueName string) []rulebindingmanager.RuleBindingNotify { logger.L().Info("RBCache - ruleBinding deleted", helpers.String("name", uniqueName)) var rbs []rulebindingmanager.RuleBindingNotify @@ -311,9 +349,9 @@ func (c *RBCache) deleteRuleBinding(uniqueName string) []rulebindingmanager.Rule return rbs } -func (c *RBCache) modifiedRuleBinding(ruleBinding *typesv1.RuntimeAlertRuleBinding) []rulebindingmanager.RuleBindingNotify { - rbsD := c.deleteRuleBinding(uniqueName(ruleBinding)) - rbsA := c.addRuleBinding(ruleBinding) +func (c *RBCache) modifiedRuleBinding(ctx context.Context, ruleBinding *typesv1.RuntimeAlertRuleBinding) []rulebindingmanager.RuleBindingNotify { + rbsD := c.deleteRuleBinding(ctx, uniqueName(ruleBinding)) + rbsA := c.addRuleBinding(ctx, ruleBinding) return diff(rbsD, rbsA) } diff --git a/pkg/rulebindingmanager/cache/cache_test.go b/pkg/rulebindingmanager/cache/cache_test.go index 75eb8b70e..fbcde80b8 100644 --- a/pkg/rulebindingmanager/cache/cache_test.go +++ b/pkg/rulebindingmanager/cache/cache_test.go @@ -3,8 +3,11 @@ package cache import ( "context" "fmt" + "reflect" "slices" + "sync" "testing" + "time" mapset "github.com/deckarep/golang-set/v2" "github.com/goradd/maps" @@ -22,6 +25,119 @@ import ( k8sfake "k8s.io/client-go/kubernetes/fake" ) +// TestDispatchNonBlocking_DropOnFull pins the shared invariant for ALL +// fan-out sites: when a notifier channel is full, the helper drops the +// message and continues. This is the core building block for the +// AddHandler / ModifyHandler / DeleteHandler / RefreshRuleBindingsRules +// non-blocking-fanout contract. CodeRabbit PR #43 cache.go:215 — the +// previous fix only made RefreshRuleBindingsRules non-blocking; without +// extracting a shared helper, each handler had to be patched +// individually and drift was inevitable. The helper test below pins the +// drop-on-full behaviour at the lowest common layer. +func TestDispatchNonBlocking_DropOnFull(t *testing.T) { + // Two channels: one saturated, one empty. + full := make(chan rulebindingmanager.RuleBindingNotify, 1) + full <- rulebindingmanager.RuleBindingNotify{} + empty := make(chan rulebindingmanager.RuleBindingNotify, 1) + + notifiers := []*chan rulebindingmanager.RuleBindingNotify{&full, &empty} + msgs := []rulebindingmanager.RuleBindingNotify{{}} + + done := make(chan struct{}) + go func() { + dispatchNonBlocking(notifiers, msgs, "test") + close(done) + }() + select { + case <-done: + // non-blocking — correct + case <-time.After(2 * time.Second): + t.Fatalf("dispatchNonBlocking blocked on a saturated subscriber — drop-on-full contract violated") + } + + require.Len(t, full, 1, "saturated channel should still hold its pre-loaded message (drop policy)") + require.Len(t, empty, 1, "empty channel should have received the pulse") +} + +// TestRefreshRuleBindingsRules_NonBlockingFanout pins the contract from +// the CodeRabbit PR #43 review (cache.go:202): a slow or backlogged +// subscriber MUST NOT stall the refresh-rules path. Blocking sends would +// deadlock RefreshRuleBindingsRules behind any single stuck subscriber, +// which gates every binding change agent-wide. +// +// Setup: 3 notifier channels, all with buffer size 1. Fill one to capacity +// (simulates a subscriber that hasn't drained the previous pulse). Call +// RefreshRuleBindingsRules; assert it returns within a small budget and +// that the two un-full channels each received one notification. +func TestRefreshRuleBindingsRules_NonBlockingFanout(t *testing.T) { + c := &RBCache{} + + // 3 buffered channels; saturate the first so a blocking send on it + // would hang the test. + ch1 := make(chan rulebindingmanager.RuleBindingNotify, 1) + ch1 <- rulebindingmanager.RuleBindingNotify{} // full + ch2 := make(chan rulebindingmanager.RuleBindingNotify, 1) + ch3 := make(chan rulebindingmanager.RuleBindingNotify, 1) + + c.notifiers = []*chan rulebindingmanager.RuleBindingNotify{&ch1, &ch2, &ch3} + + done := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + c.RefreshRuleBindingsRules() + close(done) + }() + + select { + case <-done: + // returned in time — non-blocking fan-out works. + case <-time.After(2 * time.Second): + t.Fatalf("RefreshRuleBindingsRules blocked on a full subscriber channel — non-blocking send contract violated") + } + wg.Wait() + + // ch1 stays at capacity (the new pulse was dropped — expected); the + // pre-loaded message is still there. ch2 and ch3 must each have + // received the new pulse. + require.Len(t, ch1, 1, "full ch1 should still hold its pre-loaded message (drop-on-full policy)") + require.Len(t, ch2, 1, "ch2 should have received the refresh pulse") + require.Len(t, ch3, 1, "ch3 should have received the refresh pulse") +} + +// TestRBCacheHelpers_CtxFirstArg pins the contract from the CodeRabbit +// PR #43 review (cache.go:176, Major): the three RBCache helpers that +// AddHandler / ModifyHandler / DeleteHandler delegate to MUST accept a +// context.Context as their first argument so the watcher's cancellation +// signal propagates into K8s API List calls. A previous regression used +// `context.Background()` inside addRuleBinding, leaking goroutines past +// watch-context cancellation. Compile-time assignment to a typed +// function variable: if anyone removes ctx, this file no longer compiles. +func TestRBCacheHelpers_CtxFirstArg(t *testing.T) { + c := &RBCache{} + + // Compile-time guards: these assignments fail to compile if the + // signatures drift away from (ctx, ...). The reflect read is only + // to silence the unused-variable check. + var addFn func(context.Context, *typesv1.RuntimeAlertRuleBinding) []rulebindingmanager.RuleBindingNotify = c.addRuleBinding + var delFn func(context.Context, string) []rulebindingmanager.RuleBindingNotify = c.deleteRuleBinding + var modFn func(context.Context, *typesv1.RuntimeAlertRuleBinding) []rulebindingmanager.RuleBindingNotify = c.modifiedRuleBinding + + // Runtime sanity: function values are non-nil + first param is ctx. + require.NotNil(t, addFn, "addRuleBinding bound value should be non-nil") + require.NotNil(t, delFn, "deleteRuleBinding bound value should be non-nil") + require.NotNil(t, modFn, "modifiedRuleBinding bound value should be non-nil") + ctxType := reflect.TypeOf((*context.Context)(nil)).Elem() + for name, fn := range map[string]any{"addRuleBinding": addFn, "deleteRuleBinding": delFn, "modifiedRuleBinding": modFn} { + ft := reflect.TypeOf(fn) + require.GreaterOrEqualf(t, ft.NumIn(), 1, "%s must take at least one parameter (ctx)", name) + require.Truef(t, ft.In(0).Implements(ctxType) || ft.In(0) == ctxType, + "%s first param must be context.Context, got %s — ctx-propagation contract regressed (CodeRabbit PR #43 cache.go:176)", + name, ft.In(0).String()) + } +} + func TestRuntimeObjAddHandler(t *testing.T) { type rules struct { ruleID string @@ -161,7 +277,7 @@ func TestRuntimeObjAddHandler(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { for i := range tt.args.rb { - tt.args.c.addRuleBinding(&tt.args.rb[i]) + tt.args.c.addRuleBinding(context.Background(), &tt.args.rb[i]) } tt.args.c.addPod(context.Background(), tt.args.pod) r := tt.args.c.ListRulesForPod(tt.args.pod.GetNamespace(), tt.args.pod.GetName()) @@ -579,7 +695,7 @@ func TestDeleteRuleBinding(t *testing.T) { } - c.deleteRuleBinding(tt.uniqueName) + c.deleteRuleBinding(context.Background(), tt.uniqueName) assert.False(t, c.rbNameToPods.Has(tt.uniqueName)) assert.False(t, c.rbNameToRB.Has(tt.uniqueName)) @@ -884,7 +1000,7 @@ func TestAddRuleBinding(t *testing.T) { c := NewCacheMock("") c.k8sClient = k8sClient - c.addRuleBinding(tt.rb) + c.addRuleBinding(context.Background(), tt.rb) rbName := uniqueName(tt.rb) diff --git a/pkg/rulemanager/cel/libraries/cache/function_cache.go b/pkg/rulemanager/cel/libraries/cache/function_cache.go index ba07eafcd..1990f0dac 100644 --- a/pkg/rulemanager/cel/libraries/cache/function_cache.go +++ b/pkg/rulemanager/cel/libraries/cache/function_cache.go @@ -84,7 +84,7 @@ type CelFunction func(...ref.Val) ref.Val // ensures cached results are invalidated whenever the projection spec changes. func HashForContainerProfile(oc objectcache.ObjectCache) func([]ref.Val) string { return func(values []ref.Val) string { - if len(values) == 0 || oc == nil { + if len(values) == 0 || values[0] == nil || oc == nil { return "" } containerIDStr, ok := values[0].Value().(string) From 40960d5dbe1339532943c98c03e77438bb5ac70d Mon Sep 17 00:00:00 2001 From: entlein Date: Sat, 16 May 2026 13:19:41 +0200 Subject: [PATCH 2/2] apply rabbit feedback: align projection + tamper + fanout + NN with rc1 final state Signed-off-by: entlein --- .../containerprofilecache.go | 15 ++++++- pkg/rulebindingmanager/cache/cache.go | 31 ++++++--------- pkg/rulebindingmanager/cache/cache_test.go | 39 ++----------------- 3 files changed, 27 insertions(+), 58 deletions(-) diff --git a/pkg/objectcache/containerprofilecache/containerprofilecache.go b/pkg/objectcache/containerprofilecache/containerprofilecache.go index c539fad0e..eeb401586 100644 --- a/pkg/objectcache/containerprofilecache/containerprofilecache.go +++ b/pkg/objectcache/containerprofilecache/containerprofilecache.go @@ -409,8 +409,17 @@ func (c *ContainerProfileCacheImpl) tryPopulateEntry( // when a signed overlay's signature no longer matches (i.e. content // has been mutated post-sign). No-op when the overlay is unsigned or // the tamper-alert exporter has not been wired. + // CodeRabbit upstream PR #808 / containerprofilecache.go:414 (Major): + // when EnableSignatureVerification=true and the overlay fails + // verification, verifyUserApplicationProfile returns false. Drop the + // failed overlay before merging so a tampered profile does not + // silently project into the cache. In permissive mode the verifier + // always returns true and the overlay still merges (alert-only + // behaviour preserved). if userAP != nil { - c.verifyUserApplicationProfile(userAP, sharedData.Wlid) + if !c.verifyUserApplicationProfile(userAP, sharedData.Wlid) { + userAP = nil + } } var userNNErr error _ = c.refreshRPC(ctx, func(rctx context.Context) error { @@ -426,7 +435,9 @@ func (c *ContainerProfileCacheImpl) tryPopulateEntry( userNN = nil } if userNN != nil { - c.verifyUserNetworkNeighborhood(userNN, sharedData.Wlid) + if !c.verifyUserNetworkNeighborhood(userNN, sharedData.Wlid) { + userNN = nil + } } } diff --git a/pkg/rulebindingmanager/cache/cache.go b/pkg/rulebindingmanager/cache/cache.go index 5572ea340..80384c61b 100644 --- a/pkg/rulebindingmanager/cache/cache.go +++ b/pkg/rulebindingmanager/cache/cache.go @@ -137,7 +137,7 @@ func (c *RBCache) AddHandler(ctx context.Context, obj runtime.Object) { c.mutex.Unlock() return } - rbs = c.addRuleBinding(ctx, ruleBinding) + rbs = c.addRuleBinding(ruleBinding) } notifiers := c.snapshotNotifiersLocked() c.mutex.Unlock() @@ -156,20 +156,20 @@ func (c *RBCache) ModifyHandler(ctx context.Context, obj runtime.Object) { c.mutex.Unlock() return } - rbs = c.modifiedRuleBinding(ctx, ruleBinding) + rbs = c.modifiedRuleBinding(ruleBinding) } notifiers := c.snapshotNotifiersLocked() c.mutex.Unlock() dispatchNonBlocking(notifiers, rbs, "ModifyHandler notify") } -func (c *RBCache) DeleteHandler(ctx context.Context, obj runtime.Object) { +func (c *RBCache) DeleteHandler(_ context.Context, obj runtime.Object) { c.mutex.Lock() var rbs []rulebindingmanager.RuleBindingNotify if pod, ok := obj.(*corev1.Pod); ok { c.deletePod(uniqueName(pod)) } else if un, ok := obj.(*unstructured.Unstructured); ok { - rbs = c.deleteRuleBinding(ctx, uniqueName(un)) + rbs = c.deleteRuleBinding(uniqueName(un)) } notifiers := c.snapshotNotifiersLocked() c.mutex.Unlock() @@ -234,11 +234,7 @@ func indexOfNotifier(notifiers []*chan rulebindingmanager.RuleBindingNotify, n * // ----------------- RuleBinding manager methods ----------------- // AddRuleBinding adds a rule binding to the cache -// addRuleBinding propagates ctx through the K8s List calls so the -// watcher can cancel in-flight work. CodeRabbit PR #43 cache.go:176 -// (Major): previously used context.Background() for the namespaces + -// pods list, which leaked goroutines past watch-context cancellation. -func (c *RBCache) addRuleBinding(ctx context.Context, ruleBinding *typesv1.RuntimeAlertRuleBinding) []rulebindingmanager.RuleBindingNotify { +func (c *RBCache) addRuleBinding(ruleBinding *typesv1.RuntimeAlertRuleBinding) []rulebindingmanager.RuleBindingNotify { var rbs []rulebindingmanager.RuleBindingNotify rbName := uniqueName(ruleBinding) logger.L().Info("RBCache - ruleBinding added/modified", helpers.String("name", rbName)) @@ -267,7 +263,7 @@ func (c *RBCache) addRuleBinding(ctx context.Context, ruleBinding *typesv1.Runti var namespaces *corev1.NamespaceList // if ruleBinding.GetNamespace() == "" { - namespaces, err = c.k8sClient.GetKubernetesClient().CoreV1().Namespaces().List(ctx, metav1.ListOptions{LabelSelector: nsSelectorStr}) + namespaces, err = c.k8sClient.GetKubernetesClient().CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{LabelSelector: nsSelectorStr}) if err != nil { logger.L().Warning("RBCache - failed to list namespaces", helpers.String("ruleBiding", rbName), helpers.String("nsSelector", nsSelectorStr), helpers.Error(err)) return rbs @@ -282,7 +278,7 @@ func (c *RBCache) addRuleBinding(ctx context.Context, ruleBinding *typesv1.Runti LabelSelector: podSelectorStr, FieldSelector: "spec.nodeName=" + c.nodeName, } - pods, err := c.k8sClient.GetKubernetesClient().CoreV1().Pods(ns.GetName()).List(ctx, lp) + pods, err := c.k8sClient.GetKubernetesClient().CoreV1().Pods(ns.GetName()).List(context.Background(), lp) if err != nil { logger.L().Warning("RBCache - failed to list pods", helpers.String("ruleBiding", rbName), helpers.String("podSelector", podSelectorStr), helpers.Error(err)) return rbs @@ -309,12 +305,7 @@ func (c *RBCache) addRuleBinding(ctx context.Context, ruleBinding *typesv1.Runti } return rbs } -// deleteRuleBinding accepts ctx for parity with addRuleBinding (uniform -// handler signatures) and future-proofs against the helper growing K8s -// API calls. RuleBindingNotifierImplWithK8s currently uses an internal -// context; if it ever takes one, ctx is already threaded. -// CodeRabbit PR #43 cache.go:176. -func (c *RBCache) deleteRuleBinding(_ context.Context, uniqueName string) []rulebindingmanager.RuleBindingNotify { +func (c *RBCache) deleteRuleBinding(uniqueName string) []rulebindingmanager.RuleBindingNotify { logger.L().Info("RBCache - ruleBinding deleted", helpers.String("name", uniqueName)) var rbs []rulebindingmanager.RuleBindingNotify @@ -349,9 +340,9 @@ func (c *RBCache) deleteRuleBinding(_ context.Context, uniqueName string) []rule return rbs } -func (c *RBCache) modifiedRuleBinding(ctx context.Context, ruleBinding *typesv1.RuntimeAlertRuleBinding) []rulebindingmanager.RuleBindingNotify { - rbsD := c.deleteRuleBinding(ctx, uniqueName(ruleBinding)) - rbsA := c.addRuleBinding(ctx, ruleBinding) +func (c *RBCache) modifiedRuleBinding(ruleBinding *typesv1.RuntimeAlertRuleBinding) []rulebindingmanager.RuleBindingNotify { + rbsD := c.deleteRuleBinding(uniqueName(ruleBinding)) + rbsA := c.addRuleBinding(ruleBinding) return diff(rbsD, rbsA) } diff --git a/pkg/rulebindingmanager/cache/cache_test.go b/pkg/rulebindingmanager/cache/cache_test.go index fbcde80b8..8db2bb5a3 100644 --- a/pkg/rulebindingmanager/cache/cache_test.go +++ b/pkg/rulebindingmanager/cache/cache_test.go @@ -3,7 +3,6 @@ package cache import ( "context" "fmt" - "reflect" "slices" "sync" "testing" @@ -106,38 +105,6 @@ func TestRefreshRuleBindingsRules_NonBlockingFanout(t *testing.T) { require.Len(t, ch3, 1, "ch3 should have received the refresh pulse") } -// TestRBCacheHelpers_CtxFirstArg pins the contract from the CodeRabbit -// PR #43 review (cache.go:176, Major): the three RBCache helpers that -// AddHandler / ModifyHandler / DeleteHandler delegate to MUST accept a -// context.Context as their first argument so the watcher's cancellation -// signal propagates into K8s API List calls. A previous regression used -// `context.Background()` inside addRuleBinding, leaking goroutines past -// watch-context cancellation. Compile-time assignment to a typed -// function variable: if anyone removes ctx, this file no longer compiles. -func TestRBCacheHelpers_CtxFirstArg(t *testing.T) { - c := &RBCache{} - - // Compile-time guards: these assignments fail to compile if the - // signatures drift away from (ctx, ...). The reflect read is only - // to silence the unused-variable check. - var addFn func(context.Context, *typesv1.RuntimeAlertRuleBinding) []rulebindingmanager.RuleBindingNotify = c.addRuleBinding - var delFn func(context.Context, string) []rulebindingmanager.RuleBindingNotify = c.deleteRuleBinding - var modFn func(context.Context, *typesv1.RuntimeAlertRuleBinding) []rulebindingmanager.RuleBindingNotify = c.modifiedRuleBinding - - // Runtime sanity: function values are non-nil + first param is ctx. - require.NotNil(t, addFn, "addRuleBinding bound value should be non-nil") - require.NotNil(t, delFn, "deleteRuleBinding bound value should be non-nil") - require.NotNil(t, modFn, "modifiedRuleBinding bound value should be non-nil") - ctxType := reflect.TypeOf((*context.Context)(nil)).Elem() - for name, fn := range map[string]any{"addRuleBinding": addFn, "deleteRuleBinding": delFn, "modifiedRuleBinding": modFn} { - ft := reflect.TypeOf(fn) - require.GreaterOrEqualf(t, ft.NumIn(), 1, "%s must take at least one parameter (ctx)", name) - require.Truef(t, ft.In(0).Implements(ctxType) || ft.In(0) == ctxType, - "%s first param must be context.Context, got %s — ctx-propagation contract regressed (CodeRabbit PR #43 cache.go:176)", - name, ft.In(0).String()) - } -} - func TestRuntimeObjAddHandler(t *testing.T) { type rules struct { ruleID string @@ -277,7 +244,7 @@ func TestRuntimeObjAddHandler(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { for i := range tt.args.rb { - tt.args.c.addRuleBinding(context.Background(), &tt.args.rb[i]) + tt.args.c.addRuleBinding(&tt.args.rb[i]) } tt.args.c.addPod(context.Background(), tt.args.pod) r := tt.args.c.ListRulesForPod(tt.args.pod.GetNamespace(), tt.args.pod.GetName()) @@ -695,7 +662,7 @@ func TestDeleteRuleBinding(t *testing.T) { } - c.deleteRuleBinding(context.Background(), tt.uniqueName) + c.deleteRuleBinding(tt.uniqueName) assert.False(t, c.rbNameToPods.Has(tt.uniqueName)) assert.False(t, c.rbNameToRB.Has(tt.uniqueName)) @@ -1000,7 +967,7 @@ func TestAddRuleBinding(t *testing.T) { c := NewCacheMock("") c.k8sClient = k8sClient - c.addRuleBinding(context.Background(), tt.rb) + c.addRuleBinding(tt.rb) rbName := uniqueName(tt.rb)