policy: preserve scheduler rule counters#1407
Conversation
|
@copilot review |
There was a problem hiding this comment.
Pull request overview
Tightens the userspace policy-scheduler retirement work from #1396: missing scheduler references now reject commit instead of warning, and Rust policy hit counters survive snapshot rebuilds across scheduler active/inactive flips by being keyed by stable rule id in a process-wide registry.
Changes:
- Promote missing zone-pair and global policy scheduler references from
ValidateConfigwarnings to a strict commit-time error incompileExpanded, with scope-qualified error messages and updated tests. - Move
PolicyRule.hit_countto anArc<AtomicU64>looked up from a global, rule-id-keyed registry with size-threshold-driven pruning, so rebuilds reuse counters; added a Rust test covering active→inactive→active toggling. - Update #1378 plan and
docs/userspace-dataplane-gaps.mdto record the 2026-05-17 closeout and narrow the remaining blocker to integration/HA validation.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/config/compiler.go | Wires the strict scheduler validator into compileExpanded and adds a scope prefix for global policies. |
| pkg/config/parser_ast_test.go | Renames and rewrites the two scheduler-reference tests to assert commit-time errors instead of warnings. |
| userspace-dp/src/policy.rs | Introduces a global OnceLock<Mutex<FxHashMap>> registry of Arc<AtomicU64> hit counters keyed by rule_id, with prune threshold; parse_policy_state now reuses counters across rebuilds. |
| userspace-dp/src/policy_tests.rs | Adds scheduled_allow_snapshot helper and hit_counters_survive_scheduler_snapshot_rebuild covering active/inactive/active. |
| docs/pr/1373-retire-ebpf-dataplane/plan-1378-policy-schedulers.md | Documents the closeout slice and validation commands. |
| docs/userspace-dataplane-gaps.md | Narrows remaining #1378 scope to integration/HA failover evidence. |
Comments suppressed due to low confidence (2)
userspace-dp/src/policy.rs:131
prune_policy_counter_registryis invoked fromparse_policy_statewhile the previousPolicyStateis typically still alive in the caller (it's only dropped after the new snapshot replaces it). At prune time, every entry from the old snapshot therefore hasArc::strong_count > 1and is retained, even for rules removed by the new snapshot. In practice the registry only shrinks one rebuild cycle after a rule disappears, so the threshold is more lenient than it looks and a rapid churn of rule_ids can still push the map well pastPOLICY_COUNTER_REGISTRY_PRUNE_THRESHOLDfor an extended period. Consider basing retention purely onactive_rule_idsmembership (since not-present rules are never re-looked-up bypolicy_rule_hit_counteragain and the lingering Arc just delays GC).
fn prune_policy_counter_registry(active_rule_ids: &FxHashSet<String>) {
if let Ok(mut counters) = policy_counter_registry().lock() {
if counters.len() <= POLICY_COUNTER_REGISTRY_PRUNE_THRESHOLD {
return;
}
counters.retain(|rule_id, counter| {
active_rule_ids.contains(rule_id) || Arc::strong_count(counter) > 1
});
}
}
userspace-dp/src/policy.rs:144
- Inconsistent lock-poisoning handling between the two registry accessors:
prune_policy_counter_registrysilently no-ops on a poisoned mutex (if let Ok(mut counters) = ...lock()), whilepolicy_rule_hit_counterpanics on the same condition (.expect("policy counter registry poisoned")). Either both should propagate (panic) so a poisoned registry is surfaced immediately, or both should recover so a panic on one snapshot build does not bring down later ones. The current asymmetry means a poisoning incident produces hard panics on hit-counter lookup but quietly skipped pruning.
fn prune_policy_counter_registry(active_rule_ids: &FxHashSet<String>) {
if let Ok(mut counters) = policy_counter_registry().lock() {
if counters.len() <= POLICY_COUNTER_REGISTRY_PRUNE_THRESHOLD {
return;
}
counters.retain(|rule_id, counter| {
active_rule_ids.contains(rule_id) || Arc::strong_count(counter) > 1
});
}
}
fn policy_rule_hit_counter(rule_id: &str) -> Arc<AtomicU64> {
let mut counters = policy_counter_registry()
.lock()
.expect("policy counter registry poisoned");
if let Some(counter) = counters.get(rule_id) {
return counter.clone();
}
let counter = Arc::new(AtomicU64::new(0));
counters.insert(rule_id.to_string(), counter.clone());
counter
}
| if err := validatePolicySchedulerReferencesStrict(cfg); err != nil { | ||
| return nil, err | ||
| } |
| static POLICY_COUNTERS: OnceLock<Mutex<PolicyCounterRegistry>> = OnceLock::new(); | ||
|
|
||
| fn policy_counter_registry() -> &'static Mutex<PolicyCounterRegistry> { | ||
| POLICY_COUNTERS.get_or_init(|| Mutex::new(FxHashMap::default())) | ||
| } | ||
|
|
||
| fn prune_policy_counter_registry(active_rule_ids: &FxHashSet<String>) { | ||
| if let Ok(mut counters) = policy_counter_registry().lock() { | ||
| if counters.len() <= POLICY_COUNTER_REGISTRY_PRUNE_THRESHOLD { | ||
| return; | ||
| } | ||
| counters.retain(|rule_id, counter| { | ||
| active_rule_ids.contains(rule_id) || Arc::strong_count(counter) > 1 | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| fn policy_rule_hit_counter(rule_id: &str) -> Arc<AtomicU64> { | ||
| let mut counters = policy_counter_registry() | ||
| .lock() | ||
| .expect("policy counter registry poisoned"); | ||
| if let Some(counter) = counters.get(rule_id) { | ||
| return counter.clone(); | ||
| } | ||
|
|
||
| let counter = Arc::new(AtomicU64::new(0)); | ||
| counters.insert(rule_id.to_string(), counter.clone()); | ||
| counter | ||
| } |
I completed an adversarial pass over commit |
Round-1 Codex review on
|
|
@copilot review |
Addressed.
Completed another adversarial pass on current HEAD ( |
Round-2 Codex review on
|
|
@copilot review |
| impl PolicyCounterStore { | ||
| pub(crate) fn reconcile_rules(&self, rules: &[PolicyRuleSnapshot]) { | ||
| let active_rule_ids: FxHashSet<String> = rules.iter().map(stable_policy_rule_id).collect(); | ||
| if let Ok(mut counters) = self.counters.lock() { | ||
| counters.retain(|rule_id, _| active_rule_ids.contains(rule_id)); | ||
| } | ||
| } | ||
|
|
||
| pub(crate) fn clear(&self) { | ||
| if let Ok(counters) = self.counters.lock() { | ||
| for counter in counters.values() { | ||
| counter.reset(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn rule_hit_counter(&self, rule_id: &str) -> Arc<PolicyRuleCounter> { | ||
| let mut counters = self.counters.lock().expect("policy counter store poisoned"); | ||
| if let Some(counter) = counters.get(rule_id) { | ||
| return counter.clone(); | ||
| } | ||
|
|
||
| let counter = Arc::new(PolicyRuleCounter::default()); | ||
| counters.insert(rule_id.to_string(), counter.clone()); | ||
| counter | ||
| } | ||
| } |
| if s.dp != nil && s.dp.IsLoaded() { | ||
| policyID := policySetID*dataplane.MaxRulesPerPolicy + uint32(len(pi.Rules)) | ||
| if ctrs, err := s.dp.ReadPolicyCounters(policyID); err == nil { | ||
| pr.HitPackets = ctrs.Packets | ||
| pr.HitBytes = ctrs.Bytes | ||
| } | ||
| } |
| policyID++ | ||
| } | ||
| policySetID++ | ||
| } |
| func policyRuleIDForCounter(cfg *config.Config, policyID uint32) string { | ||
| if cfg == nil { | ||
| return "" | ||
| } | ||
| policySetID := policyID / dataplane.MaxRulesPerPolicy | ||
| ruleIndex := policyID % dataplane.MaxRulesPerPolicy | ||
|
|
||
| var currentSet uint32 | ||
| for _, zpp := range cfg.Security.Policies { | ||
| if zpp == nil { | ||
| continue | ||
| } | ||
| if currentSet == policySetID { | ||
| if int(ruleIndex) >= len(zpp.Policies) || zpp.Policies[ruleIndex] == nil { | ||
| return "" | ||
| } | ||
| return stablePolicyRuleID(zpp.FromZone, zpp.ToZone, zpp.Policies[ruleIndex].Name) | ||
| } | ||
| currentSet++ | ||
| } | ||
| if currentSet == policySetID { | ||
| if int(ruleIndex) >= len(cfg.Security.GlobalPolicies) || cfg.Security.GlobalPolicies[ruleIndex] == nil { | ||
| return "" | ||
| } | ||
| return stablePolicyRuleID("junos-global", "junos-global", cfg.Security.GlobalPolicies[ruleIndex].Name) | ||
| } | ||
| return "" | ||
| } |
Implemented.
Completed an adversarial re-review on current HEAD ( |
Claude r3 review on
|
Round-3 triple-review synthesis on
|
| Reviewer | Verdict |
|---|---|
| Claude | MERGE-NEEDS-MINOR (asymmetric nil-check) |
| Codex | MERGE-NEEDS-MINOR (mocked DPs in Go tests) |
| Gemini Pro 3 | MERGE-READY |
Codex MINOR — Go tests mock past the risk surface
"The new API/gRPC tests still bypass the production counter-preservation bridge.
pkg/api/policy_counters_test.go:13andpkg/grpcapi/server_show_zones_test.go:13install fake dataplanes whoseReadPolicyCountersis just a map lookup. That tests handler ID arithmetic, but not the real path: RustPolicyCounterStore→ helperpolicy_rule_countersstatus → GoManager.lastStatus→userspace.Manager.ReadPolicyCounters→ API/gRPC."
"
pkg/dataplane/userspace/manager_test.go:88is adjacent coverage. It manually swapslastSnapshotwhile leavinglastStatusstale, then expects the old counter to reappear after re-add. It does not exerciseCompile,UpdatePolicyScheduleState,apply_snapshot, helper status refresh, or the scheduler active/inactive preservation transition."
Gemini counterpoint
Rust-side test hit_counters_survive_scheduler_snapshot_rebuild in userspace-dp/src/policy_tests.rs DOES exercise the real PolicyCounterStore with full reconcile/eval/snapshot-rebuild flow. The Go tests are intentionally boundary tests for the HTTP/gRPC marshaling layer.
My read
Both right at different layers. Rust side has real production-path coverage (the actual counter survival logic). Go side has marshaling-only coverage with fakes. The gap Codex points to (Compile → UpdatePolicyScheduleState → apply_snapshot end-to-end) is a Go-side integration test that nobody wrote. Worth adding but doesn't undermine the Rust correctness coverage.
metrics.go behavior change
Both Claude and Codex flagged the if rule == nil { continue } removal in the per-zone-pair loop. Gemini explains: zpp.Policies is config-layer-guaranteed non-nil (defensive code removal); GlobalPolicies keeps the nil check because globals CAN be nil per compilation rules. Codex also flags this is a behavior change beyond test tightening (LastCompileResult()==nil early-return also removed).
If zpp.Policies invariant holds, the asymmetric nil-check removal is correct cleanup. Worth a one-line code comment explaining the invariant so the asymmetry doesn't look like an oversight.
Recommendation
Block on: add comment to collectPolicyCounters documenting the zpp.Policies non-nil invariant (Claude+Codex MINOR).
Defer: add a Go-side end-to-end integration test that exercises Compile → status helper → ReadPolicyCounters without fakes (Codex MINOR — follow-up issue).
Codex task: task-mpaah2ui-xq0njo. Gemini task: task-mpaahf6n-fdtge8. Not merging — author's decision.
| impl PolicyCounterStore { | ||
| pub(crate) fn reconcile_rules(&self, rules: &[PolicyRuleSnapshot]) { | ||
| let active_rule_ids: FxHashSet<String> = rules.iter().map(stable_policy_rule_id).collect(); | ||
| if let Ok(mut counters) = self.counters.lock() { | ||
| counters.retain(|rule_id, _| active_rule_ids.contains(rule_id)); | ||
| } | ||
| } | ||
|
|
||
| pub(crate) fn clear(&self) { | ||
| if let Ok(counters) = self.counters.lock() { | ||
| for counter in counters.values() { | ||
| counter.reset(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn rule_hit_counter(&self, rule_id: &str) -> Arc<PolicyRuleCounter> { | ||
| let mut counters = self.counters.lock().expect("policy counter store poisoned"); | ||
| if let Some(counter) = counters.get(rule_id) { | ||
| return counter.clone(); | ||
| } | ||
|
|
||
| let counter = Arc::new(PolicyRuleCounter::default()); | ||
| counters.insert(rule_id.to_string(), counter.clone()); | ||
| counter | ||
| } | ||
| } |
| var currentSet uint32 | ||
| for _, zpp := range cfg.Security.Policies { | ||
| if zpp == nil { | ||
| continue | ||
| } | ||
| if currentSet == policySetID { | ||
| if int(ruleIndex) >= len(zpp.Policies) || zpp.Policies[ruleIndex] == nil { | ||
| return "" | ||
| } | ||
| return stablePolicyRuleID(zpp.FromZone, zpp.ToZone, zpp.Policies[ruleIndex].Name) | ||
| } | ||
| currentSet++ | ||
| } |
| total.Packets += counter.Packets | ||
| total.Bytes += counter.Bytes | ||
| return total, nil |
| self.packets.fetch_add(1, Ordering::Relaxed); | ||
| if packet_len != 0 { | ||
| self.bytes.fetch_add(packet_len, Ordering::Relaxed); | ||
| } |
Claude r6 review on
|
Round-6 triple-review synthesis on
|
| Reviewer | Verdict |
|---|---|
| Claude | MERGE-READY |
| Codex | MERGE-READY (narrowly) |
| Gemini Pro 3 | MERGE-NEEDS-MINOR |
r5 Read-path IPC test — CLOSED
Both reviewers confirm real Unix-socket IPC roundtrip exercises the production manager path:
m := New()(m.inner non-nil)- Real
net.Listen("unix", controlSock)fake helper requestLocked(ControlRequest{Type: "status"})→ real JSON encode/decode- Helper returns
PolicyRuleCounters: [{RuleID, Packets: 23, Bytes: 2300}] recordHelperStatusLockedingestsReadPolicyCounters(0)returns the helper's counters
Codex MINOR caveat (C — "mostly yes")
"It does not seed an explicit same-rule stale zero entry, which would be a sharper fixture, but the intended stale-cache-through-status-refresh path is covered."
The starting cache is empty rather than "preloaded with stale zero counters for the same RuleID". The test still proves the IPC-fresh-counter is what ReadPolicyCounters returns; what it doesn't pin is "fresh helper overwrites a pre-existing stale entry". Codex calls this a non-blocking sharpness nit.
Gemini MINOR (B+C)
Same framing as Codex on point C. Adds point B: "no helper-restart simulation or multi-roundtrip". Both are sharpness nits, not coverage holes for the original r3+r4 concern.
Recommendation
MERGE-READY. The Clear + Read IPC tests now both pin counter behavior across real helper roundtrip. The "stale-cache overlay" + "multi-roundtrip" cases are reasonable follow-ups for an issue, not blockers for this PR. The original r3+r4 production-path coverage gap is closed.
Codex task: task-mpalim0e-kct9hw. Gemini task: task-mpaliy4j-48o1rw. Not merging — author's decision.
Follow-up fix on
|
Claude r7 review on
|
Round-7 triple-review synthesis on
|
| Reviewer | Verdict |
|---|---|
| Claude | MERGE-READY |
| Codex | MERGE-READY |
| Gemini Pro 3 | MERGE-READY |
All three converge. r6 sharpness MINOR is closed.
What r7 confirmed
Codex (no findings):
"A. Yes.
refreshStatus()is called twice, and the fake helper serves two sequentialstatusresponses: 23/2300, then 31/3100... B. Yes. The secondReadPolicyCounters(0)explicitly expectsPackets: 31, Bytes: 3100, so it would fail if the first helper value23/2300stayed cached. C. Yes.m.lastStatusis preloaded before either IPC withPolicyRuleCounterscontaining the samewantRuleID; packets/bytes are zero by Go default. D. Yes. Both refreshes callm.requestLocked(ControlRequest{Type: "status"}, &status), which goes throughrequestDetailedLockedandnet.DialTimeout("unix", m.cfg.ControlSocket, ...). This is not them.inner=nilcache-only shortcut. E. Safe."
Gemini Pro 3 (quote-line verification of all four checks A-D, all PASS).
Coverage closure
The IPC test now pins:
- Real helper IPC (m.inner != nil) — closed in r6
- Counter value reflects latest helper sync — closed in r7
- Multi-roundtrip refresh (two sequential IPCs) — closed in r7
- Same-RuleID counter update with stale-cache overlay (m.lastStatus preloaded with zero) — closed in r7
- No deadlock / race risk (buffered channels, sequential accept loop) — verified
Recommendation
Merge-ready. Four-round IPC test arc concluded.
Codex task: task-mpambkr5-pyxiwv. Gemini task: task-mpambwfs-2q77ug. Not merging — author's decision.
Agent-Logs-Url: https://github.com/psaab/xpf/sessions/44910551-6d29-4da9-916f-072abba2e573 Co-authored-by: psaab <196946+psaab@users.noreply.github.com>
Agent-Logs-Url: https://github.com/psaab/xpf/sessions/44910551-6d29-4da9-916f-072abba2e573 Co-authored-by: psaab <196946+psaab@users.noreply.github.com>
Agent-Logs-Url: https://github.com/psaab/xpf/sessions/9158ee1c-37c2-4067-95a5-20a292e2b448 Co-authored-by: psaab <196946+psaab@users.noreply.github.com>
Agent-Logs-Url: https://github.com/psaab/xpf/sessions/9158ee1c-37c2-4067-95a5-20a292e2b448 Co-authored-by: psaab <196946+psaab@users.noreply.github.com>
Agent-Logs-Url: https://github.com/psaab/xpf/sessions/9158ee1c-37c2-4067-95a5-20a292e2b448 Co-authored-by: psaab <196946+psaab@users.noreply.github.com>
Agent-Logs-Url: https://github.com/psaab/xpf/sessions/109b8dbc-29e5-406f-b482-8b7f62214c86 Co-authored-by: psaab <196946+psaab@users.noreply.github.com>
a7f374b to
f17e714
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 25 out of 25 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
pkg/dataplane/userspace/policycounters.go:87
ReadPolicyCountersadds the legacy inner counter and the helper counter (total.Packets += counter.Packets, same for bytes). This is correct only as long as exactly one of the two backends is actually reporting non-zero values; if a deployment ever has both the legacy eBPF dataplane loaded and the userspace helper reporting for the same rule, hit counters will be double-counted. Consider either documenting/asserting the mutual-exclusion invariant, or preferring helper values over inner ones when a helper rule-id match is found (rather than summing).
var total dataplane.CounterValue
var innerErr error
if m.inner != nil {
total, innerErr = m.inner.ReadPolicyCounters(policyID)
}
m.mu.Lock()
defer m.mu.Unlock()
cfg := (*config.Config)(nil)
if m.lastSnapshot != nil {
cfg = m.lastSnapshot.Config
}
ruleID := policyRuleIDForCounter(cfg, policyID)
if ruleID == "" {
if innerErr != nil {
return dataplane.CounterValue{}, innerErr
}
return total, nil
}
counter, ok := buildPolicyRuleCounterIndex(&m.lastStatus)[ruleID]
if !ok {
if innerErr != nil {
return dataplane.CounterValue{}, innerErr
}
return total, nil
}
total.Packets += counter.Packets
total.Bytes += counter.Bytes
return total, nil
| var currentSet uint32 | ||
| for _, zpp := range cfg.Security.Policies { | ||
| if zpp == nil { | ||
| continue | ||
| } | ||
| if currentSet == policySetID { | ||
| if int(ruleIndex) >= len(zpp.Policies) || zpp.Policies[ruleIndex] == nil { | ||
| return "" | ||
| } | ||
| return stablePolicyRuleID(zpp.FromZone, zpp.ToZone, zpp.Policies[ruleIndex].Name) | ||
| } | ||
| currentSet++ | ||
| } |
| if s.dp != nil && s.dp.IsLoaded() { | ||
| policyID := policySetID*dataplane.MaxRulesPerPolicy + uint32(len(pi.Rules)) | ||
| if ctrs, err := s.dp.ReadPolicyCounters(policyID); err == nil { | ||
| pr.HitPackets = ctrs.Packets | ||
| pr.HitBytes = ctrs.Bytes | ||
| } | ||
| } |
| ) -> PolicyAction { | ||
| evaluate_policy_with_len( | ||
| state, from_id, to_id, src_ip, dst_ip, protocol, src_port, dst_port, 0, | ||
| ) | ||
| } |
Preserves scheduler rule counters (#1407) in policy. - Expose userspace scheduler policy counters - docs: correct #1378 scheduler test inventory - log: use relative cargo manifest path - test: cover scheduler policy counters in API - test: cover userspace policy counters in metrics and grpc global path - fix: harden policy metrics counter ID mapping Touches pkg/dataplane/userspace, pkg/grpcapi/server_show_zones_test.go, userspace-dp/src/policy_tests.rs, and pkg/api/policy_counters_test.go across documentation, tests, configuration schema, userspace dataplane, dataplane integration, and operator/API output. The largest file deltas are pkg/dataplane/userspace/manager_test.go, pkg/grpcapi/server_show_zones_test.go, and userspace-dp/src/policy_tests.rs. The diff is 1405 additions and 85 deletions across 25 files.
Summary
Refs #1378.
Narrows the remaining userspace policy-scheduler gap after #1396:
Validation
go test ./pkg/config ./pkg/dataplane/userspacecargo test policy:: -- --nocapturegit diff --check