diff --git a/_Log.md b/_Log.md index 1980d3714..abc8acf97 100644 --- a/_Log.md +++ b/_Log.md @@ -11,6 +11,18 @@ - **Action**: PR #1395 cleanup — reverted an unintended `go.mod` direct/indirect dependency classification change introduced by automated tooling so the round-4 fix stays scoped to three-color policer compiler logic/tests/docs. - **File(s)**: `go.mod`, `_Log.md` +- **Timestamp**: 2026-05-17T05:12:00Z + - **Action**: Round-5 follow-up fix — in userspace pending-XSK-startup compile path, defer `lastSnapshot` cache update until ingress/local/NAT map sync succeeds so sync failures cannot poison cached snapshot state with an unpublished generation. + - **File(s)**: `pkg/dataplane/userspace/manager.go`, `_Log.md` + +- **Timestamp**: 2026-05-17T05:16:00Z + - **Action**: Restored `go.mod` after an unintended direct/indirect dependency classification flip introduced by an automation-only progress update. + - **File(s)**: `go.mod`, `_Log.md` + +- **Timestamp**: 2026-05-17T04:48:51Z + - **Action**: Re-restored `go.mod` after a subsequent tooling pass reintroduced the same direct/indirect dependency classification flip. + - **File(s)**: `go.mod`, `_Log.md` + - **Timestamp**: 2026-05-17T05:03:00Z - **Action**: PR #1395 round-4 follow-up cleanup — moved three-color mode marker assignment outside repeated same-mode child loops to avoid redundant writes while preserving duplicate-sibling merge semantics. - **File(s)**: `pkg/config/compiler_firewall.go`, `_Log.md` diff --git a/docs/pr/1373-retire-ebpf-dataplane/plan-1378-policy-schedulers.md b/docs/pr/1373-retire-ebpf-dataplane/plan-1378-policy-schedulers.md index 8d824ccb2..ea8a8eac0 100644 --- a/docs/pr/1373-retire-ebpf-dataplane/plan-1378-policy-schedulers.md +++ b/docs/pr/1373-retire-ebpf-dataplane/plan-1378-policy-schedulers.md @@ -8,9 +8,9 @@ scheduled policy rules activate and deactivate correctly without the eBPF ## Dependencies -- #1381 must land first. `UpdatePolicyScheduleState` currently dispatches - through the embedded eBPF `DataPlane`; userspace needs either the split - interface or an explicit stub/snapshot branch. +- The safe slice no longer waits on #1381. The userspace manager now shadows + `UpdatePolicyScheduleState` and republishes a userspace snapshot instead of + falling through to the embedded eBPF manager. ## Design @@ -20,6 +20,14 @@ identity must not depend on transient array position alone; use a config-driven UUID if available or `(policy_set_id, policy_name, rule_name)`/equivalent compiled identity. +Safe #1378 slice status: this change wires `rule_id`, `scheduler_name`, and +`inactive` through userspace policy snapshots and Rust policy evaluation. The +daemon reconciles the scheduler lifecycle on every committed config while +holding the apply semaphore; userspace snapshot rebuilds are seeded with that +same active-state map, and runtime scheduler ticks acquire the same semaphore +before publishing one coherent snapshot delta. Missing scheduler references are +compile errors. + On scheduler state changes, publish one atomic userspace snapshot delta that contains the updated inactive bits for all affected rules. Do not issue per-rule fast-path toggles because first-match ordering requires same-instant @@ -31,9 +39,14 @@ existing sessions unless a separate `policy-rematch` feature is implemented. That matches Junos default behavior: schedulers block new lookups, not existing sessions. -Scheduler granularity is 60 seconds. Tests and docs must use deterministic -clock injection or windows that span multiple evaluator ticks; the earlier -30-second integration target is invalid. +Scheduler granularity is 60 seconds. The wall clock is used only by the Go +control-plane scheduler to decide the next active-state map; workers receive +booleans in the snapshot and never evaluate wall-clock time in the packet path. +The scheduler compares wall elapsed time with Go's monotonic elapsed time at +each evaluation. Backward wall-clock steps or drift beyond tolerance fail +closed for that evaluation by publishing all scheduler bits inactive. +Tests and docs must use deterministic scheduler inputs or windows that span +multiple evaluator ticks; the earlier 30-second integration target is invalid. Missing scheduler references fail closed as commit errors. Do not copy the existing eBPF behavior that can default missing scheduler state to active. @@ -43,6 +56,13 @@ existing eBPF behavior that can default missing scheduler state to active. - One inactive-branch per rule on miss path is acceptable; no scheduler clock evaluation occurs in the packet worker. - Snapshot publication is ArcSwap-atomic across all rule inactive bits. +- Snapshots carrying scheduler inactive bits require protocol version 2; the + Rust control server rejects older/unknown snapshot versions instead of + silently ignoring scheduling fields, and status exposes the helper's supported + snapshot protocol so new Go refuses to publish scheduled-policy snapshots to + an old helper before the fail-open path can occur. The refusal actively + disarms helper forwarding with `set_forwarding_state armed=false`; recording + a compile error while leaving the old helper armed is not fail-closed. - Hit counters are keyed by stable rule identity outside rebuilt rule structs so counters survive scheduler snapshot rebuilds. - Do not copy the existing eBPF indexing bug in @@ -64,8 +84,9 @@ existing eBPF behavior that can default missing scheduler state to active. - Scheduler atomicity: first-match policy ordering requires affected inactive bits to publish as one coherent snapshot. Per-rule toggles can expose an impossible mixed policy state. -- Clock drift: scheduler state is daemon-clock derived. HA peers must recompute - after failover rather than trusting stale peer-local state. +- Clock drift: scheduler state is daemon-clock derived. The scheduler must + fail closed on wall-clock discontinuity, and HA peers must recompute after + failover rather than trusting stale peer-local state. - Counter continuity: stable rule identity is mandatory because inactive flips and snapshot rebuilds must not reset operator-visible hit counters. - Missing scheduler references: fail-open behavior admits traffic outside the diff --git a/go.mod b/go.mod index e3175fa5f..9af98c17b 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/insomniacslk/dhcp v0.0.0-20251020182700-175e84fbb167 github.com/mdlayher/ndp v1.1.0 github.com/prometheus/client_golang v1.23.2 + github.com/prometheus/client_model v0.6.2 github.com/vishvananda/netlink v1.3.1 golang.org/x/net v0.47.0 golang.org/x/sync v0.18.0 @@ -27,7 +28,6 @@ require ( github.com/mdlayher/socket v0.5.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pierrec/lz4/v4 v4.1.14 // indirect - github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/procfs v0.16.1 // indirect github.com/u-root/uio v0.0.0-20230220225925-ffce2a382923 // indirect diff --git a/pkg/config/compiler.go b/pkg/config/compiler.go index 41611ff78..f80311dbd 100644 --- a/pkg/config/compiler.go +++ b/pkg/config/compiler.go @@ -221,7 +221,6 @@ func compileExpanded(tree *ConfigTree) (*Config, error) { if err := validateThreeColorPolicersStrict(cfg.Firewall.ThreeColorPolicers); err != nil { return nil, err } - if warnings := ValidateConfig(cfg); len(warnings) > 0 { for _, w := range warnings { cfg.Warnings = append(cfg.Warnings, w) @@ -273,6 +272,37 @@ func validateThreeColorPolicersStrict(policers map[string]*ThreeColorPolicerConf return nil } +func validatePolicySchedulerReferencesStrict(cfg *Config) error { + if cfg == nil { + return nil + } + check := func(pol *Policy) error { + if pol == nil || pol.SchedulerName == "" { + return nil + } + if _, ok := cfg.Schedulers[pol.SchedulerName]; ok { + return nil + } + return fmt.Errorf("policy %q references undefined scheduler %q", pol.Name, pol.SchedulerName) + } + for _, zpp := range cfg.Security.Policies { + if zpp == nil { + continue + } + for _, pol := range zpp.Policies { + if err := check(pol); err != nil { + return err + } + } + } + for _, pol := range cfg.Security.GlobalPolicies { + if err := check(pol); err != nil { + return err + } + } + return nil +} + func validateClassOfServiceStrict(cos *ClassOfServiceConfig) error { if cos == nil { return nil @@ -518,6 +548,14 @@ func ValidateConfig(cfg *Config) []string { } } } + for _, p := range cfg.Security.GlobalPolicies { + if p.SchedulerName != "" { + if _, ok := cfg.Schedulers[p.SchedulerName]; !ok { + warnings = append(warnings, fmt.Sprintf( + "global policy %q: scheduler %q not defined", p.Name, p.SchedulerName)) + } + } + } // Validate routing-instance interface references for _, ri := range cfg.RoutingInstances { diff --git a/pkg/config/parser_ast_test.go b/pkg/config/parser_ast_test.go index f94d39a66..20fbcc257 100644 --- a/pkg/config/parser_ast_test.go +++ b/pkg/config/parser_ast_test.go @@ -1473,7 +1473,6 @@ security { policy sched-test { match { source-address any; destination-address any; application any; } then { permit; } - scheduler-name missing-sched; } } } @@ -1488,7 +1487,7 @@ security { if err != nil { t.Fatalf("CompileConfig: %v", err) } - var foundIfaceWarn, foundPoolWarn, foundSchedWarn bool + var foundIfaceWarn, foundPoolWarn bool for _, w := range cfg.Warnings { if strings.Contains(w, "missing-iface") && strings.Contains(w, "not in interfaces") { foundIfaceWarn = true @@ -1496,9 +1495,6 @@ security { if strings.Contains(w, "missing-pool") && strings.Contains(w, "not defined") { foundPoolWarn = true } - if strings.Contains(w, "missing-sched") && strings.Contains(w, "not defined") { - foundSchedWarn = true - } } if !foundIfaceWarn { t.Errorf("missing warning for zone referencing unconfigured interface, got: %v", cfg.Warnings) @@ -1506,8 +1502,61 @@ security { if !foundPoolWarn { t.Errorf("missing warning for SNAT referencing undefined pool, got: %v", cfg.Warnings) } - if !foundSchedWarn { - t.Errorf("missing warning for policy referencing undefined scheduler, got: %v", cfg.Warnings) +} + +func TestPolicySchedulerMissingReferenceWarns(t *testing.T) { + input := `security { + policies { + from-zone trust to-zone untrust { + policy sched-test { + match { source-address any; destination-address any; application any; } + then { permit; } + scheduler-name missing-sched; + } + } + } +} +` + parser := NewParser(input) + tree, errs := parser.Parse() + if len(errs) > 0 { + t.Fatalf("parse errors: %v", errs) + } + cfg, err := CompileConfig(tree) + if err != nil { + t.Fatalf("CompileConfig returned error for warning-only missing scheduler reference: %v", err) + } + warnings := strings.Join(cfg.Warnings, "\n") + if !strings.Contains(warnings, `policy "sched-test": scheduler "missing-sched" not defined`) { + t.Fatalf("CompileConfig warnings = %v, want missing scheduler warning", cfg.Warnings) + } +} + +func TestGlobalPolicySchedulerMissingReferenceWarns(t *testing.T) { + input := `security { + policies { + global { + policy sched-global { + match { source-address any; destination-address any; application any; } + then { permit; } + scheduler-name missing-sched; + } + } + } +} +` + parser := NewParser(input) + tree, errs := parser.Parse() + if len(errs) > 0 { + t.Fatalf("parse errors: %v", errs) + } + cfg, err := CompileConfig(tree) + if err != nil { + t.Fatalf("CompileConfig returned error for warning-only missing global scheduler reference: %v", err) + } + warnings := strings.Join(cfg.Warnings, "\n") + if !strings.Contains(warnings, `global policy "sched-global": scheduler "missing-sched" not defined`) { + t.Fatalf("CompileConfig warnings = %v, want missing global scheduler warning", cfg.Warnings) } } diff --git a/pkg/daemon/compile_error_policy_test.go b/pkg/daemon/compile_error_policy_test.go new file mode 100644 index 000000000..fec27bcf9 --- /dev/null +++ b/pkg/daemon/compile_error_policy_test.go @@ -0,0 +1,21 @@ +package daemon + +import ( + "errors" + "fmt" + "testing" + + dpuserspace "github.com/psaab/xpf/pkg/dataplane/userspace" +) + +func TestCompileErrorMustAbortApply(t *testing.T) { + if !compileErrorMustAbortApply(dpuserspace.ErrPolicySchedulerProtocolIncompatible) { + t.Fatal("protocol incompatibility must abort apply") + } + if !compileErrorMustAbortApply(fmt.Errorf("wrapped: %w", dpuserspace.ErrPolicySchedulerProtocolIncompatible)) { + t.Fatal("wrapped protocol incompatibility must abort apply") + } + if compileErrorMustAbortApply(errors.New("compile failed for unrelated dataplane reason")) { + t.Fatal("non-protocol compile failures must not abort apply") + } +} diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 174194e13..cf19e1cf0 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -79,6 +79,9 @@ type Daemon struct { snmpAgent *snmp.Agent lldpMgr *lldp.Manager scheduler *scheduler.Scheduler + schedulerCancel context.CancelFunc + policySchedulerConfigHash [32]byte + policySchedulerEpoch atomic.Uint64 cluster *cluster.Manager sessionSync *cluster.SessionSync syncBulkPrimed atomic.Bool diff --git a/pkg/daemon/daemon_apply.go b/pkg/daemon/daemon_apply.go index db21af78c..e4a0acd63 100644 --- a/pkg/daemon/daemon_apply.go +++ b/pkg/daemon/daemon_apply.go @@ -4,6 +4,7 @@ package daemon import ( "bytes" "context" + "errors" "fmt" "log/slog" "net" @@ -67,7 +68,9 @@ func (d *Daemon) bootstrapFromFile() error { func (d *Daemon) applyConfig(cfg *config.Config) { _ = d.applySem.Acquire(context.Background(), 1) defer d.applySem.Release(1) - d.applyConfigLocked(cfg) + if err := d.applyConfigLocked(cfg); err != nil { + slog.Warn("apply config failed", "err", err) + } } // commitAndApply atomically promotes the candidate config and @@ -98,7 +101,9 @@ func (d *Daemon) commitAndApply(ctx context.Context, comment string, syncPeer bo if err != nil { return nil, err } - d.applyConfigLocked(compiled) + if err := d.applyConfigLocked(compiled); err != nil { + return nil, err + } if syncPeer { d.syncConfigToPeer() } @@ -121,7 +126,9 @@ func (d *Daemon) syncAndApply(ctx context.Context, configText string, chassisPre return nil, err } if compiled != nil { - d.applyConfigLocked(compiled) + if err := d.applyConfigLocked(compiled); err != nil { + return nil, err + } } return compiled, nil } @@ -138,7 +145,9 @@ func (d *Daemon) commitConfirmedAndApply(ctx context.Context, minutes int, syncP if err != nil { return nil, err } - d.applyConfigLocked(compiled) + if err := d.applyConfigLocked(compiled); err != nil { + return nil, err + } if syncPeer { d.syncConfigToPeer() } @@ -147,10 +156,10 @@ func (d *Daemon) commitConfirmedAndApply(ctx context.Context, minutes int, syncP // applyConfigLocked runs the actual reconcile pipeline. MUST be // called with d.applySem held. -func (d *Daemon) applyConfigLocked(cfg *config.Config) { +func (d *Daemon) applyConfigLocked(cfg *config.Config) error { if d.applyBodyForTest != nil { d.applyBodyForTest(cfg) - return + return nil } // Reset VIP warning suppression so new config gets fresh warnings. d.vipWarnedIfaces = nil @@ -394,6 +403,8 @@ func (d *Daemon) applyConfigLocked(cfg *config.Config) { // programming is done. This avoids the double-bind that causes EBUSY // on mlx5 zero-copy queues. rethMACPending := false + deferWorkersActive := false + var clearDeferWorkers func() if d.cluster != nil && cfg.Chassis.Cluster != nil && d.dp != nil { cc := cfg.Chassis.Cluster for rethName, physName := range cfg.RethToPhysical() { @@ -416,28 +427,44 @@ func (d *Daemon) applyConfigLocked(cfg *config.Config) { type deferSetter interface{ SetDeferWorkers(bool) } if ds, ok := d.dp.(deferSetter); ok { ds.SetDeferWorkers(true) + deferWorkersActive = true + clearDeferWorkers = func() { + ds.SetDeferWorkers(false) + } + defer func() { + if deferWorkersActive { + clearDeferWorkers() + } + }() } } } + policySchedulerApplyTime := time.Now() + policySchedulerActiveState := d.policySchedulerActiveStateForApplyLocked(cfg, policySchedulerApplyTime) + d.seedPolicySchedulerActiveStateLocked(policySchedulerActiveState) + // 2. Compile eBPF dataplane var compileResult *dataplane.CompileResult if d.dp != nil { var err error if compileResult, err = d.dp.Compile(cfg); err != nil { d.recordCompileFailure(err) + if compileErrorMustAbortApply(err) { + return err + } } else { d.recordCompileSuccess() } } + policySchedulerActiveState = d.reconcilePolicySchedulerLockedAt(cfg, policySchedulerApplyTime) + d.publishInitialPolicySchedulerStateLocked(cfg, policySchedulerActiveState, compileResult) // Clear defer flag after Compile so subsequent recompiles (where MAC // is already set) don't skip workers. - if rethMACPending { - type deferSetter interface{ SetDeferWorkers(bool) } - if ds, ok := d.dp.(deferSetter); ok { - ds.SetDeferWorkers(false) - } + if deferWorkersActive { + clearDeferWorkers() + deferWorkersActive = false } // 2.1. Wire aggressive session aging config to GC. @@ -1014,4 +1041,19 @@ func (d *Daemon) applyConfigLocked(cfg *config.Config) { d.applyStep0Tunables(userspaceDP, claimHostTunables, governor, netdevBudget, coalesceExplicit, coalesceEnable, coalesceRX, coalesceTX, rssAllowed) } + return nil +} + +func compileErrorMustAbortApply(err error) bool { + return errors.Is(err, dpuserspace.ErrPolicySchedulerProtocolIncompatible) +} + +func (d *Daemon) publishInitialPolicySchedulerStateLocked(cfg *config.Config, activeState map[string]bool, compileResult *dataplane.CompileResult) { + if d.dp == nil || activeState == nil || compileResult == nil { + return + } + if _, isUserspace := d.dp.(*dpuserspace.Manager); isUserspace { + return + } + d.dp.UpdatePolicyScheduleState(cfg, activeState) } diff --git a/pkg/daemon/daemon_run.go b/pkg/daemon/daemon_run.go index 0bec2451f..0c92c29eb 100644 --- a/pkg/daemon/daemon_run.go +++ b/pkg/daemon/daemon_run.go @@ -38,7 +38,6 @@ import ( "github.com/psaab/xpf/pkg/ra" "github.com/psaab/xpf/pkg/routing" "github.com/psaab/xpf/pkg/rpm" - "github.com/psaab/xpf/pkg/scheduler" "github.com/psaab/xpf/pkg/snmp" "github.com/psaab/xpf/pkg/vrrp" ) @@ -73,6 +72,7 @@ func collectAppliedTunnels(cfg *config.Config) []*config.TunnelConfig { // Run starts the daemon and blocks until shutdown. func (d *Daemon) Run(ctx context.Context) error { d.daemonCtx = ctx + d.startPolicySchedulerLoopLocked() // Wrap the default slog handler to support system syslog forwarding. // Syslog clients are added later when config is applied. @@ -604,21 +604,6 @@ func (d *Daemon) Run(ctx context.Context) error { } } - // Start policy scheduler if configured. - if cfg := d.store.ActiveConfig(); cfg != nil && len(cfg.Schedulers) > 0 && d.dp != nil { - d.scheduler = scheduler.New(cfg.Schedulers, func(activeState map[string]bool) { - slog.Info("scheduler state changed, updating policy rules") - if activeCfg := d.store.ActiveConfig(); activeCfg != nil { - d.dp.UpdatePolicyScheduleState(activeCfg, activeState) - } - }) - wg.Add(1) - go func() { - defer wg.Done() - d.scheduler.Run(ctx) - }() - } - // Start periodic neighbor resolution to keep ARP entries warm for // known forwarding targets (DNAT pools, gateways, address-book hosts). // Without this, bpf_fib_lookup returns NO_NEIGH when ARP expires, diff --git a/pkg/daemon/daemon_scheduler.go b/pkg/daemon/daemon_scheduler.go new file mode 100644 index 000000000..d0b090a21 --- /dev/null +++ b/pkg/daemon/daemon_scheduler.go @@ -0,0 +1,145 @@ +package daemon + +import ( + "context" + "crypto/sha256" + "log/slog" + "sort" + "time" + + "github.com/psaab/xpf/pkg/config" + "github.com/psaab/xpf/pkg/scheduler" +) + +type policySchedulerActiveStateSetter interface { + SetPolicySchedulerActiveState(map[string]bool) +} + +// reconcilePolicySchedulerLocked runs under applySem. It makes the scheduler +// lifecycle follow committed config instead of only daemon startup, and returns +// the active-state map that must be used for the same apply transaction. +func (d *Daemon) reconcilePolicySchedulerLocked(cfg *config.Config) map[string]bool { + return d.reconcilePolicySchedulerLockedAt(cfg, time.Now()) +} + +func (d *Daemon) reconcilePolicySchedulerLockedAt(cfg *config.Config, now time.Time) map[string]bool { + hash, hasSchedulers := policySchedulerConfigHash(cfg) + if hasSchedulers && d.scheduler != nil && hash == d.policySchedulerConfigHash { + d.startPolicySchedulerLoopLocked() + return d.scheduler.ActiveState() + } + + if d.schedulerCancel != nil { + d.schedulerCancel() + d.schedulerCancel = nil + } + d.scheduler = nil + epoch := d.policySchedulerEpoch.Add(1) + + if !hasSchedulers { + d.policySchedulerConfigHash = [32]byte{} + return nil + } + + sched, activeState := scheduler.NewPrimed(cfg.Schedulers, func(activeState map[string]bool) { + d.publishPolicyScheduleState(epoch, activeState) + }, now) + d.scheduler = sched + d.policySchedulerConfigHash = hash + d.startPolicySchedulerLoopLocked() + return activeState +} + +func (d *Daemon) policySchedulerActiveStateForApplyLocked(cfg *config.Config, now time.Time) map[string]bool { + hash, hasSchedulers := policySchedulerConfigHash(cfg) + if !hasSchedulers { + return nil + } + if d.scheduler != nil && hash == d.policySchedulerConfigHash { + return d.scheduler.ActiveState() + } + _, activeState := scheduler.NewPrimed(cfg.Schedulers, func(map[string]bool) {}, now) + return activeState +} + +func policySchedulerConfigHash(cfg *config.Config) ([32]byte, bool) { + if cfg == nil || len(cfg.Schedulers) == 0 { + return [32]byte{}, false + } + h := sha256.New() + names := make([]string, 0, len(cfg.Schedulers)) + for name := range cfg.Schedulers { + names = append(names, name) + } + sort.Strings(names) + for _, name := range names { + writePolicySchedulerHashString(h, name) + sched := cfg.Schedulers[name] + if sched == nil { + writePolicySchedulerHashString(h, "") + continue + } + writePolicySchedulerHashString(h, sched.Name) + writePolicySchedulerHashString(h, sched.StartTime) + writePolicySchedulerHashString(h, sched.StopTime) + writePolicySchedulerHashString(h, sched.StartDate) + writePolicySchedulerHashString(h, sched.StopDate) + if sched.Daily { + _, _ = h.Write([]byte{1}) + } else { + _, _ = h.Write([]byte{0}) + } + } + var out [32]byte + copy(out[:], h.Sum(nil)) + return out, true +} + +func writePolicySchedulerHashString(h interface{ Write([]byte) (int, error) }, s string) { + var lenBuf [8]byte + for i := 0; i < len(lenBuf); i++ { + lenBuf[i] = byte(uint64(len(s)) >> (8 * i)) + } + _, _ = h.Write(lenBuf[:]) + _, _ = h.Write([]byte(s)) +} + +func (d *Daemon) startPolicySchedulerLoopLocked() { + if d.daemonCtx == nil || d.scheduler == nil || d.schedulerCancel != nil { + return + } + ctx, cancel := context.WithCancel(d.daemonCtx) + d.schedulerCancel = cancel + go d.scheduler.Run(ctx) +} + +func (d *Daemon) publishPolicyScheduleState(epoch uint64, activeState map[string]bool) { + ctx := d.daemonCtx + if ctx == nil { + ctx = context.Background() + } + if err := d.applySem.Acquire(ctx, 1); err != nil { + slog.Warn("scheduler: failed to acquire apply semaphore", "err", err) + return + } + defer d.applySem.Release(1) + + if epoch != d.policySchedulerEpoch.Load() { + return + } + cfg := d.store.ActiveConfig() + if cfg == nil || d.dp == nil { + return + } + d.seedPolicySchedulerActiveStateLocked(activeState) + d.dp.UpdatePolicyScheduleState(cfg, activeState) +} + +func (d *Daemon) seedPolicySchedulerActiveStateLocked(activeState map[string]bool) { + if d.dp == nil { + return + } + if setter, ok := d.dp.(policySchedulerActiveStateSetter); ok { + setter.SetPolicySchedulerActiveState(activeState) + } +} diff --git a/pkg/daemon/daemon_scheduler_test.go b/pkg/daemon/daemon_scheduler_test.go new file mode 100644 index 000000000..ffc7df643 --- /dev/null +++ b/pkg/daemon/daemon_scheduler_test.go @@ -0,0 +1,88 @@ +package daemon + +import ( + "context" + "testing" + "time" + + "github.com/psaab/xpf/pkg/config" + "github.com/psaab/xpf/pkg/scheduler" + "golang.org/x/sync/semaphore" +) + +func TestStartPolicySchedulerLoopLockedWaitsForDaemonContext(t *testing.T) { + sched, _ := scheduler.NewPrimed(map[string]*config.SchedulerConfig{ + "always": {Name: "always"}, + }, func(map[string]bool) {}, time.Now()) + + d := &Daemon{scheduler: sched} + d.startPolicySchedulerLoopLocked() + if d.schedulerCancel != nil { + t.Fatal("scheduler loop started before daemon context was available") + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + d.daemonCtx = ctx + d.startPolicySchedulerLoopLocked() + if d.schedulerCancel == nil { + t.Fatal("scheduler loop did not start after daemon context became available") + } + d.schedulerCancel() +} + +func TestReconcilePolicySchedulerLockedKeepsByteIdenticalScheduler(t *testing.T) { + cfg := &config.Config{ + Schedulers: map[string]*config.SchedulerConfig{ + "always": {Name: "always"}, + }, + } + d := &Daemon{} + + first := d.reconcilePolicySchedulerLocked(cfg) + if d.scheduler == nil { + t.Fatal("scheduler was not created") + } + sched := d.scheduler + epoch := d.policySchedulerEpoch.Load() + + second := d.reconcilePolicySchedulerLocked(&config.Config{ + Schedulers: map[string]*config.SchedulerConfig{ + "always": {Name: "always"}, + }, + }) + if d.scheduler != sched { + t.Fatal("byte-identical scheduler config recreated the scheduler") + } + if got := d.policySchedulerEpoch.Load(); got != epoch { + t.Fatalf("epoch = %d, want unchanged %d", got, epoch) + } + if first["always"] != second["always"] { + t.Fatalf("active state changed across identical reconcile: first=%v second=%v", first, second) + } +} + +func TestPublishPolicyScheduleStateUsesDaemonContext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + d := &Daemon{ + daemonCtx: ctx, + applySem: semaphore.NewWeighted(1), + } + if err := d.applySem.Acquire(context.Background(), 1); err != nil { + t.Fatalf("acquire semaphore: %v", err) + } + defer d.applySem.Release(1) + + done := make(chan struct{}) + go func() { + d.publishPolicyScheduleState(0, map[string]bool{"always": true}) + close(done) + }() + + select { + case <-done: + case <-time.After(500 * time.Millisecond): + t.Fatal("publishPolicyScheduleState blocked on apply semaphore after daemon context cancellation") + } +} diff --git a/pkg/daemon/policy_scheduler_apply_test.go b/pkg/daemon/policy_scheduler_apply_test.go new file mode 100644 index 000000000..a6755390e --- /dev/null +++ b/pkg/daemon/policy_scheduler_apply_test.go @@ -0,0 +1,134 @@ +package daemon + +import ( + "errors" + "testing" + "time" + + "github.com/psaab/xpf/pkg/cluster" + "github.com/psaab/xpf/pkg/config" + "github.com/psaab/xpf/pkg/dataplane" + dpuserspace "github.com/psaab/xpf/pkg/dataplane/userspace" + "github.com/psaab/xpf/pkg/scheduler" +) + +type policySchedulerApplyTestDP struct { + *dataplane.Manager + + compileErr error + compileCalls int + deferStates []bool + updateCalls int + updateStateSeen map[string]bool +} + +func (d *policySchedulerApplyTestDP) Compile(*config.Config) (*dataplane.CompileResult, error) { + d.compileCalls++ + if d.compileErr != nil { + return nil, d.compileErr + } + return &dataplane.CompileResult{}, nil +} + +func (d *policySchedulerApplyTestDP) SetDeferWorkers(v bool) { + d.deferStates = append(d.deferStates, v) +} + +func (d *policySchedulerApplyTestDP) UpdatePolicyScheduleState(_ *config.Config, activeState map[string]bool) { + d.updateCalls++ + d.updateStateSeen = activeState +} + +func TestApplyConfigClearsDeferWorkersOnAbortCompileError(t *testing.T) { + dp := &policySchedulerApplyTestDP{ + compileErr: dpuserspace.ErrPolicySchedulerProtocolIncompatible, + } + d := &Daemon{ + cluster: &cluster.Manager{}, + dp: dp, + } + cfg := &config.Config{ + Chassis: config.ChassisConfig{ + Cluster: &config.ClusterConfig{ + ClusterID: 1, + NodeID: 0, + }, + }, + Interfaces: config.InterfacesConfig{ + Interfaces: map[string]*config.InterfaceConfig{ + "reth0": {Name: "reth0", RedundancyGroup: 1}, + "lo": {Name: "lo", RedundantParent: "reth0"}, + }, + }, + } + + if err := d.applyConfigLocked(cfg); !errors.Is(err, dpuserspace.ErrPolicySchedulerProtocolIncompatible) { + t.Fatalf("applyConfigLocked error = %v, want protocol incompatibility", err) + } + if dp.compileCalls != 1 { + t.Fatalf("Compile calls = %d, want 1", dp.compileCalls) + } + if len(dp.deferStates) != 2 || !dp.deferStates[0] || dp.deferStates[1] { + t.Fatalf("defer worker states = %v, want [true false]", dp.deferStates) + } +} + +func TestApplyConfigProtocolAbortPreservesExistingScheduler(t *testing.T) { + oldCfg := &config.Config{ + Schedulers: map[string]*config.SchedulerConfig{ + "old": {Name: "old"}, + }, + } + oldScheduler, oldState := scheduler.NewPrimed(oldCfg.Schedulers, func(map[string]bool) {}, testPolicySchedulerApplyNow()) + oldHash, _ := policySchedulerConfigHash(oldCfg) + dp := &policySchedulerApplyTestDP{ + compileErr: dpuserspace.ErrPolicySchedulerProtocolIncompatible, + } + d := &Daemon{ + dp: dp, + scheduler: oldScheduler, + policySchedulerConfigHash: oldHash, + } + d.policySchedulerEpoch.Store(42) + newCfg := &config.Config{ + Schedulers: map[string]*config.SchedulerConfig{ + "new": {Name: "new"}, + }, + } + + if err := d.applyConfigLocked(newCfg); !errors.Is(err, dpuserspace.ErrPolicySchedulerProtocolIncompatible) { + t.Fatalf("applyConfigLocked error = %v, want protocol incompatibility", err) + } + if d.scheduler != oldScheduler { + t.Fatal("protocol abort replaced scheduler before apply completed") + } + if got := d.policySchedulerEpoch.Load(); got != 42 { + t.Fatalf("policySchedulerEpoch = %d, want unchanged 42", got) + } + if d.policySchedulerConfigHash != oldHash { + t.Fatal("protocol abort changed scheduler config hash") + } + if got := d.scheduler.ActiveState()["old"]; got != oldState["old"] { + t.Fatalf("old scheduler active state = %t, want %t", got, oldState["old"]) + } +} + +func TestApplyConfigPublishesScheduleStateToNonUserspaceDataplane(t *testing.T) { + dp := &policySchedulerApplyTestDP{} + d := &Daemon{dp: dp} + cfg := &config.Config{} + activeState := map[string]bool{"workhours": true} + + d.publishInitialPolicySchedulerStateLocked(cfg, activeState, &dataplane.CompileResult{}) + + if dp.updateCalls != 1 { + t.Fatalf("UpdatePolicyScheduleState calls = %d, want 1", dp.updateCalls) + } + if got, ok := dp.updateStateSeen["workhours"]; !ok || !got { + t.Fatalf("active state for workhours = %t, present=%t; want active true", got, ok) + } +} + +func testPolicySchedulerApplyNow() time.Time { + return time.Date(2026, 5, 17, 12, 0, 0, 0, time.UTC) +} diff --git a/pkg/dataplane/README.md b/pkg/dataplane/README.md index 5a8306e6e..9e31c3db8 100644 --- a/pkg/dataplane/README.md +++ b/pkg/dataplane/README.md @@ -34,8 +34,8 @@ sent while exact queues were still backlogged. NAT, static NAT, NAT64 prefixes, NPTv6, screen profiles, default policy, flow timeouts, firewall filters, flow config, port mirroring. -- `CompileResult` — `compiler.go`. Zone/policy/NAT/app IDs and the - per-interface networkd configs. +- `CompileResult` — `compiler.go`. Zone/policy/NAT/app IDs, compiled + policy-scheduler rule slots, and the per-interface networkd configs. - Session iteration: `IterateSessions`, `BatchIterateSessions`, `IterateSessionsV6`, `BatchIterateSessionsV6`. diff --git a/pkg/dataplane/compiler.go b/pkg/dataplane/compiler.go index 4bd6761ae..dce17ca60 100644 --- a/pkg/dataplane/compiler.go +++ b/pkg/dataplane/compiler.go @@ -33,6 +33,8 @@ type CompileResult struct { PolicySets int // number of policy sets created FilterIDs map[string]uint32 // "inet:name" or "inet6:name" -> filter_id + PolicyScheduleRuleSlots []PolicyScheduleRuleSlot + Lo0FilterV4 uint32 // lo0 inet filter ID (0=none), set by compileFirewallFilters Lo0FilterV6 uint32 // lo0 inet6 filter ID (0=none), set by compileFirewallFilters @@ -68,6 +70,18 @@ type CompileResult struct { ethtoolApplied map[string]bool } +// PolicyScheduleRuleSlot records the exact compiled policy_rules map slot for a +// scheduled policy. A single policy can compile into multiple dense app-term +// slots; runtime scheduler updates must toggle those compiled slots rather than +// recomputing indexes from the original config policy position. +type PolicyScheduleRuleSlot struct { + PolicySetID uint32 + RuleIndex uint32 + RuleID uint32 + PolicyName string + SchedulerName string +} + // cachedInterfaceByName returns a cached *net.Interface, performing the // syscall only on the first lookup for each name. func (r *CompileResult) cachedInterfaceByName(name string) (*net.Interface, error) { @@ -792,6 +806,15 @@ func compilePolicies(dp DataPlane, cfg *config.Config, result *CompileResult) er } result.PolicyNames[rule.RuleID] = pol.Name + if pol.SchedulerName != "" { + result.PolicyScheduleRuleSlots = append(result.PolicyScheduleRuleSlots, PolicyScheduleRuleSlot{ + PolicySetID: policySetID, + RuleIndex: uint32(i), + RuleID: rule.RuleID, + PolicyName: pol.Name, + SchedulerName: pol.SchedulerName, + }) + } slog.Debug("policy rule compiled", "from", zpp.FromZone, "to", zpp.ToZone, @@ -913,6 +936,15 @@ func compilePolicies(dp DataPlane, cfg *config.Config, result *CompileResult) er } result.PolicyNames[rule.RuleID] = pol.Name + if pol.SchedulerName != "" { + result.PolicyScheduleRuleSlots = append(result.PolicyScheduleRuleSlots, PolicyScheduleRuleSlot{ + PolicySetID: policySetID, + RuleIndex: uint32(i), + RuleID: rule.RuleID, + PolicyName: pol.Name, + SchedulerName: pol.SchedulerName, + }) + } slog.Debug("global policy rule compiled", "policy", pol.Name, "action", rule.Action, diff --git a/pkg/dataplane/compiler_test.go b/pkg/dataplane/compiler_test.go index 7fc9005aa..2fbf93c59 100644 --- a/pkg/dataplane/compiler_test.go +++ b/pkg/dataplane/compiler_test.go @@ -7,6 +7,97 @@ import ( "github.com/psaab/xpf/pkg/config" ) +type policyScheduleSlotTestDP struct { + DataPlane + + rules []PolicyRule +} + +func (d *policyScheduleSlotTestDP) SetZonePairPolicy(fromZone, toZone uint16, ps PolicySet) error { + return nil +} + +func (d *policyScheduleSlotTestDP) SetPolicyRule(policySetID uint32, ruleIndex uint32, rule PolicyRule) error { + d.rules = append(d.rules, rule) + return nil +} + +func (d *policyScheduleSlotTestDP) DeleteStaleZonePairPolicies(written map[ZonePairKey]bool) {} + +func TestCompilePoliciesRecordsExpandedPolicyScheduleSlots(t *testing.T) { + cfg := &config.Config{} + cfg.Security.Policies = []*config.ZonePairPolicies{{ + FromZone: "trust", + ToZone: "untrust", + Policies: []*config.Policy{ + { + Name: "plain", + Action: config.PolicyPermit, + Match: config.PolicyMatch{ + SourceAddresses: []string{"any"}, + DestinationAddresses: []string{"any"}, + Applications: []string{"any"}, + }, + }, + { + Name: "scheduled", + SchedulerName: "workhours", + Action: config.PolicyPermit, + Match: config.PolicyMatch{ + SourceAddresses: []string{"any"}, + DestinationAddresses: []string{"any"}, + Applications: []string{"app-a", "app-b"}, + }, + }, + }, + }} + cfg.Security.GlobalPolicies = []*config.Policy{{ + Name: "global-scheduled", + SchedulerName: "night", + Action: config.PolicyPermit, + Match: config.PolicyMatch{ + SourceAddresses: []string{"any"}, + DestinationAddresses: []string{"any"}, + Applications: []string{"app-c", "app-d"}, + }, + }} + result := &CompileResult{ + ZoneIDs: map[string]uint16{ + "trust": 1, + "untrust": 2, + }, + AppIDs: map[string]uint32{ + "app-a": 1, + "app-b": 2, + "app-c": 3, + "app-d": 4, + }, + } + dp := &policyScheduleSlotTestDP{} + + if err := compilePolicies(dp, cfg, result); err != nil { + t.Fatalf("compilePolicies: %v", err) + } + + want := []PolicyScheduleRuleSlot{ + {PolicySetID: 0, RuleIndex: 1, RuleID: 1, PolicyName: "scheduled", SchedulerName: "workhours"}, + {PolicySetID: 0, RuleIndex: 2, RuleID: 2, PolicyName: "scheduled", SchedulerName: "workhours"}, + {PolicySetID: 1, RuleIndex: 0, RuleID: MaxRulesPerPolicy, PolicyName: "global-scheduled", SchedulerName: "night"}, + {PolicySetID: 1, RuleIndex: 1, RuleID: MaxRulesPerPolicy + 1, PolicyName: "global-scheduled", SchedulerName: "night"}, + } + if len(result.PolicyScheduleRuleSlots) != len(want) { + t.Fatalf("got %d slots, want %d: %#v", len(result.PolicyScheduleRuleSlots), len(want), result.PolicyScheduleRuleSlots) + } + for i := range want { + if got := result.PolicyScheduleRuleSlots[i]; got != want[i] { + t.Fatalf("slot %d = %#v, want %#v", i, got, want[i]) + } + } + if len(dp.rules) != 5 { + t.Fatalf("compiled %d policy rules, want 5", len(dp.rules)) + } +} + func TestExpandFilterTermNegateFlags(t *testing.T) { prefixLists := map[string]*config.PrefixList{ "rfc1918": { diff --git a/pkg/dataplane/dpdk/dpdk_cgo.go b/pkg/dataplane/dpdk/dpdk_cgo.go index d2d04056f..0aa740466 100644 --- a/pkg/dataplane/dpdk/dpdk_cgo.go +++ b/pkg/dataplane/dpdk/dpdk_cgo.go @@ -342,43 +342,38 @@ func (m *Manager) SetDefaultPolicy(action uint8) error { return nil } -func (m *Manager) UpdatePolicyScheduleState(cfg *config.Config, activeState map[string]bool) { +func (m *Manager) UpdatePolicyScheduleState(_ *config.Config, activeState map[string]bool) { shm := m.platform.shm - if shm == nil || cfg == nil { + if shm == nil { + return + } + result := m.LastCompileResult() + if result == nil { return } - policySetID := uint32(0) - for _, zpp := range cfg.Security.Policies { - for i, pol := range zpp.Policies { - if pol.SchedulerName == "" { - policySetID++ - continue - } - - active, exists := activeState[pol.SchedulerName] - if !exists { - active = true // default active if scheduler not found - } + for _, slot := range result.PolicyScheduleRuleSlots { + active, exists := activeState[slot.SchedulerName] + if !exists { + active = true // default active if scheduler not found + } - idx := policySetID*C.MAX_RULES_PER_POLICY + uint32(i) - ptr := (*C.struct_policy_rule)(unsafe.Pointer( - uintptr(unsafe.Pointer(shm.policy_rules)) + - uintptr(idx)*unsafe.Sizeof(C.struct_policy_rule{}))) + idx := slot.PolicySetID*C.MAX_RULES_PER_POLICY + slot.RuleIndex + ptr := (*C.struct_policy_rule)(unsafe.Pointer( + uintptr(unsafe.Pointer(shm.policy_rules)) + + uintptr(idx)*unsafe.Sizeof(C.struct_policy_rule{}))) - var newActive C.uint8_t - if active { - newActive = 1 - } - if ptr.active != newActive { - ptr.active = newActive - slog.Info("DPDK policy schedule state updated", - "policy", pol.Name, - "scheduler", pol.SchedulerName, - "active", active) - } + var newActive C.uint8_t + if active { + newActive = 1 + } + if ptr.active != newActive { + ptr.active = newActive + slog.Info("DPDK policy schedule state updated", + "policy", slot.PolicyName, + "scheduler", slot.SchedulerName, + "active", active) } - policySetID++ } } diff --git a/pkg/dataplane/maps.go b/pkg/dataplane/maps.go index c66627f4b..29bc70e53 100644 --- a/pkg/dataplane/maps.go +++ b/pkg/dataplane/maps.go @@ -1494,45 +1494,40 @@ func (m *Manager) ClearFilterConfigs() error { // UpdatePolicyScheduleState iterates policy rules and toggles the Active flag // based on scheduler state. Only rules whose scheduler state changed are updated. -func (m *Manager) UpdatePolicyScheduleState(cfg *config.Config, activeState map[string]bool) { +func (m *Manager) UpdatePolicyScheduleState(_ *config.Config, activeState map[string]bool) { zm, ok := m.maps["policy_rules"] if !ok { return } + result := m.LastCompileResult() + if result == nil { + return + } - policySetID := uint32(0) - for _, zpp := range cfg.Security.Policies { - for i, pol := range zpp.Policies { - if pol.SchedulerName == "" { - policySetID++ - continue - } - - active, exists := activeState[pol.SchedulerName] - if !exists { - active = true // default active if scheduler not found - } + for _, slot := range result.PolicyScheduleRuleSlots { + active, exists := activeState[slot.SchedulerName] + if !exists { + active = true // default active if scheduler not found + } - idx := policySetID*MaxRulesPerPolicy + uint32(i) - var rule PolicyRule - if err := zm.Lookup(idx, &rule); err != nil { - continue - } + idx := slot.PolicySetID*MaxRulesPerPolicy + slot.RuleIndex + var rule PolicyRule + if err := zm.Lookup(idx, &rule); err != nil { + continue + } - var newActive uint8 - if active { - newActive = 1 - } - if rule.Active != newActive { - rule.Active = newActive - zm.Update(idx, rule, ebpf.UpdateAny) - slog.Info("policy schedule state updated", - "policy", pol.Name, - "scheduler", pol.SchedulerName, - "active", active) - } + var newActive uint8 + if active { + newActive = 1 + } + if rule.Active != newActive { + rule.Active = newActive + zm.Update(idx, rule, ebpf.UpdateAny) + slog.Info("policy schedule state updated", + "policy", slot.PolicyName, + "scheduler", slot.SchedulerName, + "active", active) } - policySetID++ } } diff --git a/pkg/dataplane/userspace/manager.go b/pkg/dataplane/userspace/manager.go index a9df84bfd..a532a5870 100644 --- a/pkg/dataplane/userspace/manager.go +++ b/pkg/dataplane/userspace/manager.go @@ -26,6 +26,8 @@ import ( var _ dataplane.DataPlane = (*Manager)(nil) +var ErrPolicySchedulerProtocolIncompatible = errors.New("userspace policy scheduler snapshot protocol incompatible") + // DataplaneMode describes which packet-processing pipeline is active. type DataplaneMode int @@ -58,28 +60,29 @@ type Manager struct { dataplane.DataPlane inner *dataplane.Manager - mu sync.Mutex - sessionMu sync.Mutex // separate lock for session sync requests (Phase 3) - proc *exec.Cmd - cfg config.UserspaceConfig - clusterHA bool - generation uint64 - syncCancel context.CancelFunc - lastStatus ProcessStatus - lastSnapshot *ConfigSnapshot - haGroups map[int]HAGroupStatus - lastIngressIfaces []uint32 - lastRSTv4 []netip.Addr - lastRSTv6 []netip.Addr - lastRSTAttempt time.Time - lastRSTInstallOK bool - lastSnapshotHash [32]byte // content hash of last published snapshot (excludes volatile fields) + mu sync.Mutex + sessionMu sync.Mutex // separate lock for session sync requests (Phase 3) + proc *exec.Cmd + cfg config.UserspaceConfig + clusterHA bool + generation uint64 + syncCancel context.CancelFunc + lastStatus ProcessStatus + lastSnapshot *ConfigSnapshot + policySchedulerActive map[string]bool + haGroups map[int]HAGroupStatus + lastIngressIfaces []uint32 + lastRSTv4 []netip.Addr + lastRSTv6 []netip.Addr + lastRSTAttempt time.Time + lastRSTInstallOK bool + lastSnapshotHash [32]byte // content hash of last published snapshot (excludes volatile fields) // #1197: O(1) neighbor lookup index for the listener hot path. // Keyed by (ifindex, ip-string). Rebuilt whenever lastSnapshot.Neighbors // is replaced. Read under m.mu (existing snapshot lock). neighborIndex map[neighborIndexKey]*NeighborSnapshot // #1197: ifindex set for listener filter; rebuilt on config commit. - monitoredIfindexes map[int]struct{} + monitoredIfindexes map[int]struct{} lastBindingIndices []uint32 neighborsPrewarmed bool ctrlEnableAt time.Time @@ -156,6 +159,32 @@ func New() *Manager { } } +func copyPolicySchedulerActiveState(activeState map[string]bool) map[string]bool { + if activeState == nil { + return nil + } + out := make(map[string]bool, len(activeState)) + for name, active := range activeState { + out[name] = active + } + return out +} + +func (m *Manager) policySchedulerActiveStateSnapshot() map[string]bool { + m.mu.Lock() + defer m.mu.Unlock() + return copyPolicySchedulerActiveState(m.policySchedulerActive) +} + +// SetPolicySchedulerActiveState seeds the active-state map used by the next +// full snapshot build. The daemon calls this while holding applySem so config +// commits and scheduler flips cannot publish hybrid policy snapshots. +func (m *Manager) SetPolicySchedulerActiveState(activeState map[string]bool) { + m.mu.Lock() + defer m.mu.Unlock() + m.policySchedulerActive = copyPolicySchedulerActiveState(activeState) +} + // EventStream returns the event stream instance, or nil if not available. func (m *Manager) EventStream() *EventStream { m.mu.Lock() @@ -265,7 +294,8 @@ func (m *Manager) Compile(cfg *config.Config) (*dataplane.CompileResult, error) return nil, err } ucfg := deriveUserspaceConfig(cfg) - snap := buildSnapshot(cfg, ucfg, m.bumpGeneration(), m.readFIBGeneration()) + activeState := m.policySchedulerActiveStateSnapshot() + snap := buildSnapshotWithSchedulerState(cfg, ucfg, m.bumpGeneration(), m.readFIBGeneration(), activeState) m.syncInterfaceAttachments(result, snap) m.mu.Lock() @@ -296,13 +326,18 @@ func (m *Manager) Compile(cfg *config.Config) (*dataplane.CompileResult, error) pendingXSKStartup = false samePlanRefresh = false } - m.lastSnapshot = snap // #1197 v4 (Codex code-review v3 #1+#2): rebuild listener // caches ONLY after a successful apply_snapshot. Doing it // here (before publish) leaves the listener thinking // userspace-dp has entries it doesn't if apply_snapshot fails. // Moved to the post-success path below (after line 343). if pendingXSKStartup { + if err := m.ensurePolicySchedulerProtocolLocked(cfg); err != nil { + if disarmErr := m.disarmPolicySchedulerProtocolFailureLocked(err); disarmErr != nil { + return result, errors.Join(err, disarmErr) + } + return result, err + } if err := m.syncIngressIfaceMapLocked(snap); err != nil { return result, err } @@ -312,6 +347,7 @@ func (m *Manager) Compile(cfg *config.Config) (*dataplane.CompileResult, error) if err := m.syncInterfaceNATAddressMapsLocked(snap); err != nil { return result, err } + m.lastSnapshot = snap m.cfg = ucfg slog.Info( "userspace: deferring snapshot publish during XSK startup", @@ -339,6 +375,12 @@ func (m *Manager) Compile(cfg *config.Config) (*dataplane.CompileResult, error) if err := m.ensureProcessLocked(ucfg); err != nil { return result, err } + if err := m.ensurePolicySchedulerProtocolLocked(cfg); err != nil { + if disarmErr := m.disarmPolicySchedulerProtocolFailureLocked(err); disarmErr != nil { + return result, errors.Join(err, disarmErr) + } + return result, err + } if m.deferWorkers { snap.DeferWorkers = true } @@ -353,6 +395,7 @@ func (m *Manager) Compile(cfg *config.Config) (*dataplane.CompileResult, error) if err := m.requestLocked(ControlRequest{Type: "apply_snapshot", Snapshot: &publishSnap}, &status); err != nil { return result, fmt.Errorf("publish userspace snapshot: %w", err) } + m.lastSnapshot = snap // #1197 v4: apply_snapshot succeeded — userspace-dp has the // new neighbors. NOW rebuild listener caches; before this // point the index would shadow events for entries the @@ -381,6 +424,66 @@ func (m *Manager) Compile(cfg *config.Config) (*dataplane.CompileResult, error) return result, nil } +// UpdatePolicyScheduleState republishes the userspace policy snapshot with one +// coherent inactive-bit view. This shadows the embedded eBPF manager method; +// scheduled userspace policies must not update the policy_rules BPF map. +func (m *Manager) UpdatePolicyScheduleState(cfg *config.Config, activeState map[string]bool) { + activeCopy := copyPolicySchedulerActiveState(activeState) + + m.mu.Lock() + defer m.mu.Unlock() + + m.policySchedulerActive = activeCopy + if cfg == nil { + if m.lastSnapshot == nil { + return + } + cfg = m.lastSnapshot.Config + } + if cfg == nil || m.lastSnapshot == nil { + return + } + if m.proc == nil || m.proc.Process == nil { + return + } + + if err := m.ensurePolicySchedulerProtocolLocked(cfg); err != nil { + if disarmErr := m.disarmPolicySchedulerProtocolFailureLocked(err); disarmErr != nil { + slog.Warn("userspace: failed to disarm helper after refusing policy scheduler publish", + "protocol_err", err, "err", disarmErr) + } + slog.Warn("userspace: refusing policy scheduler publish to incompatible helper", "err", err) + return + } + next := *m.lastSnapshot + nextGeneration := m.generation + 1 + next.Generation = nextGeneration + next.FIBGeneration = m.readFIBGeneration() + next.GeneratedAt = time.Now().UTC() + next.Config = cfg + next.Policies = buildPolicySnapshotsWithSchedulerState(cfg, activeCopy) + + publishSnap := next + publishSnap.Neighbors = filterPublishableNeighbors(next.Neighbors) + var status ProcessStatus + if err := m.requestLocked(ControlRequest{Type: "apply_snapshot", Snapshot: &publishSnap}, &status); err != nil { + slog.Warn("userspace: failed to publish policy scheduler state", "err", err) + return + } + m.generation = nextGeneration + m.lastSnapshot = &next + m.rebuildNeighborIndex() + m.rebuildMonitoredIfindexes() + m.publishedSnapshot = next.Generation + m.publishedPlanKey = snapshotBindingPlanKey(&next) + if h, ok := snapshotContentHash(&next); ok { + m.lastSnapshotHash = h + } + if err := m.applyHelperStatusLocked(&status); err != nil { + slog.Warn("userspace: failed to sync helper status after policy scheduler publish", "err", err) + } +} + func (m *Manager) syncInterfaceAttachments(result *dataplane.CompileResult, snapshot *ConfigSnapshot) { if result == nil { return @@ -422,6 +525,84 @@ func (m *Manager) readFIBGeneration() uint32 { return gen } +func configHasScheduledPolicy(cfg *config.Config) bool { + if cfg == nil { + return false + } + for _, zpp := range cfg.Security.Policies { + if zpp == nil { + continue + } + for _, pol := range zpp.Policies { + if pol != nil && pol.SchedulerName != "" { + return true + } + } + } + for _, pol := range cfg.Security.GlobalPolicies { + if pol != nil && pol.SchedulerName != "" { + return true + } + } + return false +} + +func (m *Manager) ensurePolicySchedulerProtocolLocked(cfg *config.Config) error { + if !configHasScheduledPolicy(cfg) { + return nil + } + if m.lastStatus.ConfigSnapshotProtocolVersion >= ProtocolVersion { + return nil + } + var status ProcessStatus + if err := m.requestLocked(ControlRequest{Type: "status"}, &status); err == nil { + m.recordHelperStatusLocked(&status) + if status.ConfigSnapshotProtocolVersion >= ProtocolVersion { + return nil + } + } + return fmt.Errorf( + "%w: helper config snapshot protocol version %d < required %d for policy scheduler snapshots", + ErrPolicySchedulerProtocolIncompatible, + m.lastStatus.ConfigSnapshotProtocolVersion, + ProtocolVersion, + ) +} + +func (m *Manager) recordHelperStatusLocked(status *ProcessStatus) { + status.DataplaneMode = m.mode.String() + status.ConfiguredMode = m.configuredMode.String() + status.EntryPrograms = m.entryProgramsLocked() + status.FallbackCounters = m.readFallbackStatsLocked() + if m.eventStream != nil { + es := m.eventStream.Status() + status.EventStream = &es + } + m.lastStatus = *status +} + +func (m *Manager) disarmPolicySchedulerProtocolFailureLocked(protocolErr error) error { + if m.proc == nil || m.proc.Process == nil { + return nil + } + req := ControlRequest{ + Type: "set_forwarding_state", + Forwarding: &ForwardingControlRequest{ + Armed: false, + }, + } + var status ProcessStatus + if err := m.requestLocked(req, &status); err != nil { + return fmt.Errorf("userspace: disarm helper after policy scheduler protocol error: %w", err) + } + if err := m.applyHelperStatusLocked(&status); err != nil { + m.recordHelperStatusLocked(&status) + return fmt.Errorf("userspace: sync helper status after policy scheduler fail-closed disarm: %w", err) + } + slog.Warn("userspace: disarmed helper after policy scheduler protocol error", "err", protocolErr) + return nil +} + // bpfKtimeNs returns the current CLOCK_BOOTTIME in nanoseconds, matching // the clock used by BPF's bpf_ktime_get_ns() for session Created timestamps. func (m *Manager) bpfKtimeNs() uint64 { diff --git a/pkg/dataplane/userspace/manager_test.go b/pkg/dataplane/userspace/manager_test.go index d6ac05ecd..700562ea1 100644 --- a/pkg/dataplane/userspace/manager_test.go +++ b/pkg/dataplane/userspace/manager_test.go @@ -2587,6 +2587,307 @@ func TestBuildPolicySnapshotsIncludesGlobalPolicies(t *testing.T) { } } +func TestBuildPolicySnapshotsRoundTripsSchedulerInactiveAndRuleID(t *testing.T) { + cfg := &config.Config{} + cfg.Security.Policies = []*config.ZonePairPolicies{{ + FromZone: "trust", + ToZone: "untrust", + Policies: []*config.Policy{{ + Name: "zone-allow", + SchedulerName: "workhours", + Match: config.PolicyMatch{ + SourceAddresses: []string{"any"}, + DestinationAddresses: []string{"any"}, + Applications: []string{"any"}, + }, + Action: config.PolicyPermit, + }}, + }} + cfg.Security.GlobalPolicies = []*config.Policy{{ + Name: "global-deny-all", + SchedulerName: "always", + Match: config.PolicyMatch{ + SourceAddresses: []string{"any"}, + DestinationAddresses: []string{"any"}, + Applications: []string{"any"}, + }, + Action: config.PolicyDeny, + }} + + unseeded := buildPolicySnapshots(cfg) + if len(unseeded) != 2 { + t.Fatalf("len(unseeded) = %d, want 2", len(unseeded)) + } + for _, pol := range unseeded { + if !pol.Inactive { + t.Fatalf("policy %q inactive = false with nil scheduler state, want fail-closed true", pol.RuleID) + } + } + + snap := buildPolicySnapshotsWithSchedulerState(cfg, map[string]bool{ + "workhours": false, + "always": true, + }) + if len(snap) != 2 { + t.Fatalf("len(snap) = %d, want 2", len(snap)) + } + if got, want := snap[0].RuleID, "trust->untrust/zone-allow"; got != want { + t.Fatalf("snap[0].RuleID = %q, want %q", got, want) + } + if got, want := snap[0].SchedulerName, "workhours"; got != want { + t.Fatalf("snap[0].SchedulerName = %q, want %q", got, want) + } + if !snap[0].Inactive { + t.Fatalf("snap[0].Inactive = false, want true for inactive scheduler") + } + if got, want := snap[1].RuleID, "junos-global->junos-global/global-deny-all"; got != want { + t.Fatalf("snap[1].RuleID = %q, want %q", got, want) + } + if snap[1].Inactive { + t.Fatalf("snap[1].Inactive = true, want false for active scheduler") + } + + data, err := json.Marshal(snap) + if err != nil { + t.Fatalf("Marshal: %v", err) + } + var roundTrip []PolicyRuleSnapshot + if err := json.Unmarshal(data, &roundTrip); err != nil { + t.Fatalf("Unmarshal: %v", err) + } + if len(roundTrip) != 2 { + t.Fatalf("len(roundTrip) = %d, want 2", len(roundTrip)) + } + if roundTrip[0].RuleID != snap[0].RuleID || + roundTrip[0].SchedulerName != snap[0].SchedulerName || + roundTrip[0].Inactive != snap[0].Inactive { + t.Fatalf("roundTrip[0] = %+v, want scheduler/inactive/rule_id from %+v", roundTrip[0], snap[0]) + } +} + +func TestUpdatePolicyScheduleStatePublishesUserspaceSnapshot(t *testing.T) { + dir := t.TempDir() + controlSock := filepath.Join(dir, "control.sock") + ln, err := net.Listen("unix", controlSock) + if err != nil { + t.Fatalf("listen control socket: %v", err) + } + defer ln.Close() + + reqCh := make(chan ControlRequest, 1) + done := make(chan struct{}, 1) + go func() { + conn, err := ln.Accept() + if err != nil { + return + } + defer conn.Close() + var req ControlRequest + if err := json.NewDecoder(conn).Decode(&req); err != nil { + return + } + reqCh <- req + _ = json.NewEncoder(conn).Encode(ControlResponse{ + OK: true, + Status: &ProcessStatus{ + Enabled: true, + LastSnapshotGeneration: req.Snapshot.Generation, + LastFIBGeneration: req.Snapshot.FIBGeneration, + }, + }) + done <- struct{}{} + }() + + cfg := &config.Config{} + cfg.Security.Policies = []*config.ZonePairPolicies{{ + FromZone: "trust", + ToZone: "untrust", + Policies: []*config.Policy{{ + Name: "scheduled-allow", + SchedulerName: "workhours", + Match: config.PolicyMatch{ + SourceAddresses: []string{"any"}, + DestinationAddresses: []string{"any"}, + Applications: []string{"any"}, + }, + Action: config.PolicyPermit, + }}, + }} + cfg.Schedulers = map[string]*config.SchedulerConfig{ + "workhours": {Name: "workhours"}, + } + + m := New() + m.proc = &exec.Cmd{Process: &os.Process{Pid: os.Getpid()}} + m.cfg.ControlSocket = controlSock + m.generation = 7 + m.lastSnapshot = buildSnapshot(cfg, config.UserspaceConfig{ControlSocket: controlSock}, 7, 0) + m.lastStatus.ConfigSnapshotProtocolVersion = ProtocolVersion + + m.UpdatePolicyScheduleState(cfg, map[string]bool{"workhours": false}) + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for apply_snapshot publish") + } + req := <-reqCh + if req.Type != "apply_snapshot" { + t.Fatalf("request type = %q, want apply_snapshot", req.Type) + } + if req.Snapshot == nil { + t.Fatal("apply_snapshot missing snapshot") + } + if req.Snapshot.Version != ProtocolVersion { + t.Fatalf("snapshot version = %d, want %d", req.Snapshot.Version, ProtocolVersion) + } + if req.Snapshot.Generation <= 7 { + t.Fatalf("snapshot generation = %d, want > 7", req.Snapshot.Generation) + } + if len(req.Snapshot.Policies) != 1 { + t.Fatalf("policy count = %d, want 1", len(req.Snapshot.Policies)) + } + pol := req.Snapshot.Policies[0] + if pol.RuleID != "trust->untrust/scheduled-allow" { + t.Fatalf("policy rule_id = %q", pol.RuleID) + } + if pol.SchedulerName != "workhours" { + t.Fatalf("scheduler_name = %q", pol.SchedulerName) + } + if !pol.Inactive { + t.Fatalf("inactive = false, want true for inactive scheduler state") + } + if m.lastSnapshot == nil || len(m.lastSnapshot.Policies) != 1 || !m.lastSnapshot.Policies[0].Inactive { + t.Fatalf("manager lastSnapshot did not keep inactive policy bit: %+v", m.lastSnapshot) + } +} + +func TestUpdatePolicyScheduleStateRefusesOldHelperForScheduledPolicies(t *testing.T) { + dir := t.TempDir() + controlSock := filepath.Join(dir, "control.sock") + ln, err := net.Listen("unix", controlSock) + if err != nil { + t.Fatalf("listen control socket: %v", err) + } + defer ln.Close() + + reqCh := make(chan ControlRequest, 2) + done := make(chan struct{}, 1) + go func() { + for i := 0; i < 2; i++ { + conn, err := ln.Accept() + if err != nil { + return + } + var req ControlRequest + if err := json.NewDecoder(conn).Decode(&req); err != nil { + conn.Close() + return + } + reqCh <- req + status := &ProcessStatus{ + PID: 1234, + ForwardingArmed: true, + } + if req.Type == "set_forwarding_state" { + status.ForwardingArmed = false + } + _ = json.NewEncoder(conn).Encode(ControlResponse{ + OK: true, + Status: status, + }) + conn.Close() + } + done <- struct{}{} + }() + + cfg := &config.Config{} + cfg.Security.Policies = []*config.ZonePairPolicies{{ + FromZone: "trust", + ToZone: "untrust", + Policies: []*config.Policy{{ + Name: "scheduled-allow", + SchedulerName: "workhours", + Action: config.PolicyPermit, + }}, + }} + cfg.Schedulers = map[string]*config.SchedulerConfig{ + "workhours": {Name: "workhours"}, + } + + m := New() + m.proc = &exec.Cmd{Process: &os.Process{Pid: os.Getpid()}} + m.cfg.ControlSocket = controlSock + m.generation = 7 + m.lastSnapshot = buildSnapshot(cfg, config.UserspaceConfig{ControlSocket: controlSock}, 7, 0) + m.lastSnapshot.Policies[0].Inactive = false + + m.UpdatePolicyScheduleState(cfg, map[string]bool{"workhours": false}) + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for status probe") + } + req := <-reqCh + if req.Type != "status" { + t.Fatalf("first request type = %q, want status", req.Type) + } + select { + case req := <-reqCh: + if req.Type != "set_forwarding_state" || req.Forwarding == nil || req.Forwarding.Armed { + t.Fatalf("second request = %+v, want fail-closed set_forwarding_state armed=false", req) + } + default: + t.Fatal("expected fail-closed set_forwarding_state request") + } + if m.lastStatus.ForwardingArmed { + t.Fatal("helper status should be disarmed after protocol mismatch") + } + if m.generation != 7 { + t.Fatalf("generation = %d, want unchanged 7 after protocol mismatch", m.generation) + } + if m.lastStatus.PID != 1234 { + t.Fatalf("lastStatus PID = %d, want status probe PID 1234", m.lastStatus.PID) + } + if m.lastSnapshot == nil || len(m.lastSnapshot.Policies) != 1 || m.lastSnapshot.Policies[0].Inactive { + t.Fatalf("manager should not cache rejected inactive state when publish is refused: %+v", m.lastSnapshot) + } +} + +func TestUpdatePolicyScheduleStateWithoutHelperDoesNotMutateSnapshot(t *testing.T) { + cfg := &config.Config{} + cfg.Security.Policies = []*config.ZonePairPolicies{{ + FromZone: "trust", + ToZone: "untrust", + Policies: []*config.Policy{{ + Name: "scheduled-allow", + SchedulerName: "workhours", + Action: config.PolicyPermit, + }}, + }} + cfg.Schedulers = map[string]*config.SchedulerConfig{ + "workhours": {Name: "workhours"}, + } + + m := New() + m.generation = 7 + m.lastSnapshot = buildSnapshot(cfg, config.UserspaceConfig{}, 7, 0) + m.lastSnapshot.Policies[0].Inactive = false + + m.UpdatePolicyScheduleState(cfg, map[string]bool{"workhours": false}) + + if m.generation != 7 { + t.Fatalf("generation = %d, want unchanged 7", m.generation) + } + if m.lastSnapshot == nil || len(m.lastSnapshot.Policies) != 1 || m.lastSnapshot.Policies[0].Inactive { + t.Fatalf("lastSnapshot mutated without helper: %+v", m.lastSnapshot) + } + if got, ok := m.policySchedulerActive["workhours"]; !ok || got { + t.Fatalf("policySchedulerActive[workhours] = %t, present=%t; want false and present", got, ok) + } +} + func TestUserspaceSupportsScreenProfilesBasic(t *testing.T) { cfg := &config.Config{} cfg.Security.Screen = map[string]*config.ScreenProfile{ diff --git a/pkg/dataplane/userspace/maps_sync.go b/pkg/dataplane/userspace/maps_sync.go index a57ca825c..504dcb5e8 100644 --- a/pkg/dataplane/userspace/maps_sync.go +++ b/pkg/dataplane/userspace/maps_sync.go @@ -604,17 +604,7 @@ ctrlReady: // even for packets that bypassed the BPF pipeline (#332). m.syncBPFCountersLocked(status) - // Populate runtime mode and observability fields in status. - status.DataplaneMode = m.mode.String() - status.ConfiguredMode = m.configuredMode.String() - status.EntryPrograms = m.entryProgramsLocked() - status.FallbackCounters = m.readFallbackStatsLocked() - if m.eventStream != nil { - es := m.eventStream.Status() - status.EventStream = &es - } - - m.lastStatus = *status + m.recordHelperStatusLocked(status) return nil } diff --git a/pkg/dataplane/userspace/process.go b/pkg/dataplane/userspace/process.go index 778ddaa61..816ac47ba 100644 --- a/pkg/dataplane/userspace/process.go +++ b/pkg/dataplane/userspace/process.go @@ -24,7 +24,11 @@ import ( func (m *Manager) ensureProcessLocked(cfg config.UserspaceConfig) error { tuneSocketBuffers() if m.proc != nil && m.proc.Process != nil && configEqual(m.cfg, cfg) { - if err := m.requestLocked(ControlRequest{Type: "ping"}, nil); err == nil { + var status ProcessStatus + if err := m.requestLocked(ControlRequest{Type: "ping"}, &status); err == nil { + if status.PID != 0 || status.ConfigSnapshotProtocolVersion != 0 { + m.lastStatus = status + } return nil } slog.Warn("userspace dataplane helper unhealthy, restarting") @@ -108,7 +112,11 @@ func (m *Manager) ensureProcessLocked(cfg config.UserspaceConfig) error { deadline := time.Now().Add(5 * time.Second) for time.Now().Before(deadline) { if _, err := os.Stat(cfg.ControlSocket); err == nil { - if err := m.requestLocked(ControlRequest{Type: "ping"}, nil); err == nil { + var status ProcessStatus + if err := m.requestLocked(ControlRequest{Type: "ping"}, &status); err == nil { + if status.PID != 0 || status.ConfigSnapshotProtocolVersion != 0 { + m.lastStatus = status + } slog.Info("userspace dataplane helper started", "pid", cmd.Process.Pid, "socket", cfg.ControlSocket) return nil } @@ -321,6 +329,12 @@ func (m *Manager) syncSnapshotLocked() error { // for parity with update_neighbors path. publishSnap := *m.lastSnapshot publishSnap.Neighbors = filterPublishableNeighbors(m.lastSnapshot.Neighbors) + if err := m.ensurePolicySchedulerProtocolLocked(publishSnap.Config); err != nil { + if disarmErr := m.disarmPolicySchedulerProtocolFailureLocked(err); disarmErr != nil { + return errors.Join(err, disarmErr) + } + return err + } var status ProcessStatus if err := m.requestLocked(ControlRequest{Type: "apply_snapshot", Snapshot: &publishSnap}, &status); err != nil { return fmt.Errorf("publish userspace snapshot: %w", err) diff --git a/pkg/dataplane/userspace/protocol.go b/pkg/dataplane/userspace/protocol.go index 3024f50f1..7d9bb999a 100644 --- a/pkg/dataplane/userspace/protocol.go +++ b/pkg/dataplane/userspace/protocol.go @@ -7,7 +7,7 @@ import ( ) const ( - ProtocolVersion = 1 + ProtocolVersion = 2 TypeUserspace = "userspace" ) @@ -365,9 +365,12 @@ type PolicyApplicationSnapshot struct { } type PolicyRuleSnapshot struct { + RuleID string `json:"rule_id,omitempty"` Name string `json:"name"` FromZone string `json:"from_zone,omitempty"` ToZone string `json:"to_zone,omitempty"` + SchedulerName string `json:"scheduler_name,omitempty"` + Inactive bool `json:"inactive,omitempty"` SourceAddresses []string `json:"source_addresses,omitempty"` DestinationAddresses []string `json:"destination_addresses,omitempty"` Applications []string `json:"applications,omitempty"` @@ -422,28 +425,29 @@ type UserspaceCapabilities struct { } type ProcessStatus struct { - PID int `json:"pid"` - StartedAt time.Time `json:"started_at"` - ControlSocket string `json:"control_socket"` - StateFile string `json:"state_file"` - Workers int `json:"workers"` - RingEntries int `json:"ring_entries"` - HelperMode string `json:"helper_mode"` - IOUringPlanned bool `json:"io_uring_planned"` - IOUringActive bool `json:"io_uring_active,omitempty"` - IOUringMode string `json:"io_uring_mode,omitempty"` - IOUringLastError string `json:"io_uring_last_error,omitempty"` - Enabled bool `json:"enabled"` - ForwardingArmed bool `json:"forwarding_armed,omitempty"` - Capabilities UserspaceCapabilities `json:"capabilities"` - LastSnapshotGeneration uint64 `json:"last_snapshot_generation"` - LastFIBGeneration uint32 `json:"last_fib_generation,omitempty"` - LastSnapshotAt time.Time `json:"last_snapshot_at,omitempty"` - InterfaceAddresses int `json:"interface_addresses,omitempty"` - NeighborEntries int `json:"neighbor_entries,omitempty"` - NeighborGeneration uint64 `json:"neighbor_generation,omitempty"` - RouteEntries int `json:"route_entries,omitempty"` - WorkerHeartbeats []time.Time `json:"worker_heartbeats,omitempty"` + PID int `json:"pid"` + ConfigSnapshotProtocolVersion int `json:"config_snapshot_protocol_version,omitempty"` + StartedAt time.Time `json:"started_at"` + ControlSocket string `json:"control_socket"` + StateFile string `json:"state_file"` + Workers int `json:"workers"` + RingEntries int `json:"ring_entries"` + HelperMode string `json:"helper_mode"` + IOUringPlanned bool `json:"io_uring_planned"` + IOUringActive bool `json:"io_uring_active,omitempty"` + IOUringMode string `json:"io_uring_mode,omitempty"` + IOUringLastError string `json:"io_uring_last_error,omitempty"` + Enabled bool `json:"enabled"` + ForwardingArmed bool `json:"forwarding_armed,omitempty"` + Capabilities UserspaceCapabilities `json:"capabilities"` + LastSnapshotGeneration uint64 `json:"last_snapshot_generation"` + LastFIBGeneration uint32 `json:"last_fib_generation,omitempty"` + LastSnapshotAt time.Time `json:"last_snapshot_at,omitempty"` + InterfaceAddresses int `json:"interface_addresses,omitempty"` + NeighborEntries int `json:"neighbor_entries,omitempty"` + NeighborGeneration uint64 `json:"neighbor_generation,omitempty"` + RouteEntries int `json:"route_entries,omitempty"` + WorkerHeartbeats []time.Time `json:"worker_heartbeats,omitempty"` // #869: per-worker busy/idle runtime telemetry. WorkerRuntime []WorkerRuntimeStatus `json:"worker_runtime,omitempty"` HAGroups []HAGroupStatus `json:"ha_groups,omitempty"` diff --git a/pkg/dataplane/userspace/snapshot.go b/pkg/dataplane/userspace/snapshot.go index 635ed2799..84db97eb2 100644 --- a/pkg/dataplane/userspace/snapshot.go +++ b/pkg/dataplane/userspace/snapshot.go @@ -21,6 +21,10 @@ import ( ) func buildSnapshot(cfg *config.Config, ucfg config.UserspaceConfig, generation uint64, fibGeneration uint32) *ConfigSnapshot { + return buildSnapshotWithSchedulerState(cfg, ucfg, generation, fibGeneration, nil) +} + +func buildSnapshotWithSchedulerState(cfg *config.Config, ucfg config.UserspaceConfig, generation uint64, fibGeneration uint32, activeState map[string]bool) *ConfigSnapshot { if cfg == nil { return &ConfigSnapshot{ Version: ProtocolVersion, @@ -50,7 +54,7 @@ func buildSnapshot(cfg *config.Config, ucfg config.UserspaceConfig, generation u Routes: buildRouteSnapshots(cfg, interfaces), Flow: buildFlowSnapshot(cfg), DefaultPolicy: policyActionString(cfg.Security.DefaultPolicy), - Policies: buildPolicySnapshots(cfg), + Policies: buildPolicySnapshotsWithSchedulerState(cfg, activeState), SourceNAT: buildSourceNATSnapshots(cfg), StaticNAT: buildStaticNATSnapshots(cfg), DestinationNAT: buildDestinationNATSnapshots(cfg), @@ -1968,6 +1972,10 @@ func buildClassOfServiceSnapshot(cfg *config.Config) *ClassOfServiceSnapshot { } func buildPolicySnapshots(cfg *config.Config) []PolicyRuleSnapshot { + return buildPolicySnapshotsWithSchedulerState(cfg, nil) +} + +func buildPolicySnapshotsWithSchedulerState(cfg *config.Config, activeState map[string]bool) []PolicyRuleSnapshot { if cfg == nil || (len(cfg.Security.Policies) == 0 && len(cfg.Security.GlobalPolicies) == 0) { return nil } @@ -1992,10 +2000,14 @@ func buildPolicySnapshots(cfg *config.Config) []PolicyRuleSnapshot { if !ok { applicationTerms = nil } + schedulerName := pol.SchedulerName out = append(out, PolicyRuleSnapshot{ + RuleID: stablePolicyRuleID(zpp.FromZone, zpp.ToZone, pol.Name), Name: pol.Name, FromZone: zpp.FromZone, ToZone: zpp.ToZone, + SchedulerName: schedulerName, + Inactive: policyRuleInactive(schedulerName, activeState), SourceAddresses: sourceAddresses, DestinationAddresses: destinationAddresses, Applications: append([]string(nil), pol.Match.Applications...), @@ -2021,10 +2033,14 @@ func buildPolicySnapshots(cfg *config.Config) []PolicyRuleSnapshot { if !ok { applicationTerms = nil } + schedulerName := pol.SchedulerName out = append(out, PolicyRuleSnapshot{ + RuleID: stablePolicyRuleID("junos-global", "junos-global", pol.Name), Name: pol.Name, FromZone: "junos-global", ToZone: "junos-global", + SchedulerName: schedulerName, + Inactive: policyRuleInactive(schedulerName, activeState), SourceAddresses: sourceAddresses, DestinationAddresses: destinationAddresses, Applications: append([]string(nil), pol.Match.Applications...), @@ -2035,6 +2051,21 @@ func buildPolicySnapshots(cfg *config.Config) []PolicyRuleSnapshot { return out } +func stablePolicyRuleID(fromZone, toZone, ruleName string) string { + return fmt.Sprintf("%s->%s/%s", fromZone, toZone, ruleName) +} + +func policyRuleInactive(schedulerName string, activeState map[string]bool) bool { + if schedulerName == "" { + return false + } + if activeState == nil { + return true + } + active, ok := activeState[schedulerName] + return !ok || !active +} + func policyActionString(action config.PolicyAction) string { switch action { case config.PolicyPermit: diff --git a/pkg/scheduler/README.md b/pkg/scheduler/README.md index 1358a376b..38522ad59 100644 --- a/pkg/scheduler/README.md +++ b/pkg/scheduler/README.md @@ -12,6 +12,9 @@ during specific windows. - `New(schedulers map[string]*config.SchedulerConfig, updateFn func(map[string]bool)) *Scheduler` — `scheduler.go`. The `updateFn` callback fires only on state change, not every tick. +- `NewPrimed(..., now)` — constructor for daemon apply paths that need the + initial active-state map without firing the callback while an external + apply semaphore is already held. - `Run(ctx context.Context)` — `scheduler.go`. - `IsActive(name string) bool` — `scheduler.go`. - `ActiveState() map[string]bool` — `scheduler.go`. Snapshot of every @@ -33,3 +36,22 @@ during specific windows. precision through this package. - `updateFn` receives the **full** active-state map, not just the changed entries. Callers compute their own diff if they care. +- Daemon callers must publish scheduler changes while holding the daemon + apply semaphore. Runtime scheduler callbacks take that semaphore before + touching dataplane state so commits and time-window flips cannot publish + hybrid policy snapshots. +- The daemon reconciler keeps an existing scheduler instance when the + committed scheduler config is byte-identical. This preserves the + monotonic/wall-clock recovery state and avoids resetting timers on + no-op commits. Runtime publishes use the daemon context when acquiring + the apply semaphore, so shutdown cancels a blocked scheduler publish + instead of leaving a goroutine parked behind a long apply. +- The scheduler uses wall-clock time only in the control plane to evaluate + Junos time windows. Packet workers must consume published active/inactive + booleans from the userspace snapshot and must not evaluate scheduler time in + the hot path. +- Wall-clock discontinuities are fail-closed. Each evaluation compares + wall elapsed time with Go's monotonic elapsed time from the previous + evaluation; backward wall steps or drift beyond the tolerance publish + all schedulers inactive for that evaluation instead of extending an + allow window with a stale wall-clock assumption. diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index d81b60793..2a299b56b 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -12,23 +12,45 @@ import ( // Scheduler periodically evaluates time windows for named schedulers // and notifies a callback when any scheduler's active state changes. type Scheduler struct { - mu sync.RWMutex - schedulers map[string]*config.SchedulerConfig - active map[string]bool - updateFn func(activeState map[string]bool) + mu sync.RWMutex + schedulers map[string]*config.SchedulerConfig + active map[string]bool + updateFn func(activeState map[string]bool) + lastEval time.Time + lastWallUnixNano int64 + unsafeUntil time.Time } -// New creates a Scheduler with the given scheduler configs and update callback. -// updateFn is called whenever any scheduler's active state changes, receiving -// the current active state of all schedulers. -func New(schedulers map[string]*config.SchedulerConfig, updateFn func(activeState map[string]bool)) *Scheduler { +const ( + wallClockDriftTolerance = 5 * time.Second + wallClockRecoveryHold = 2 * time.Minute +) + +// NewPrimed creates a Scheduler, evaluates the initial active-state map, and +// returns that map without firing updateFn from inside the constructor. Daemon +// apply paths use this when they already hold their own serialization lock and +// must publish the initial state as part of the same apply transaction. +func NewPrimed(schedulers map[string]*config.SchedulerConfig, updateFn func(activeState map[string]bool), now time.Time) (*Scheduler, map[string]bool) { s := &Scheduler{ schedulers: schedulers, active: make(map[string]bool), updateFn: updateFn, } - // Compute initial state. - s.evaluate(time.Now()) + s.evaluate(now, false) + return s, s.ActiveState() +} + +// New creates a Scheduler with the given scheduler configs and update callback. +// updateFn is called whenever any scheduler's active state changes, receiving +// the current active state of all schedulers. +func New(schedulers map[string]*config.SchedulerConfig, updateFn func(activeState map[string]bool)) *Scheduler { + s, _ := NewPrimed(schedulers, updateFn, time.Now()) + // Preserve the historical constructor contract: New notifies on initial + // state. NewPrimed is the no-notify variant for callers that publish the + // initial state under an external lock. + if len(s.active) > 0 { + s.notifyActiveState() + } return s } @@ -45,7 +67,7 @@ func (s *Scheduler) Run(ctx context.Context) { slog.Info("scheduler: stopping evaluation loop") return case t := <-ticker.C: - s.evaluate(t) + s.evaluate(t, true) } } } @@ -73,20 +95,31 @@ func (s *Scheduler) Update(schedulers map[string]*config.SchedulerConfig) { s.mu.Lock() s.schedulers = schedulers s.mu.Unlock() - s.evaluate(time.Now()) + s.evaluate(time.Now(), true) } // evaluate checks each scheduler against the current time and fires the // callback if any state changed. -func (s *Scheduler) evaluate(now time.Time) { +func (s *Scheduler) evaluate(now time.Time, notify bool) { s.mu.Lock() - defer s.mu.Unlock() changed := false newActive := make(map[string]bool, len(s.schedulers)) + wallClockDiscontinuous := s.wallClockDiscontinuousLocked(now) + wallClockUnsafe := wallClockDiscontinuous + if !wallClockUnsafe && !s.unsafeUntil.IsZero() { + if now.Before(s.unsafeUntil) { + wallClockUnsafe = true + } else { + s.unsafeUntil = time.Time{} + } + } for name, sched := range s.schedulers { - cur := isWithinWindow(now, sched) + cur := false + if !wallClockUnsafe { + cur = isWithinWindow(now, sched) + } newActive[name] = cur if prev, ok := s.active[name]; !ok || prev != cur { slog.Info("scheduler: state changed", "name", name, "active", cur) @@ -103,15 +136,68 @@ func (s *Scheduler) evaluate(now time.Time) { } s.active = newActive + if wallClockDiscontinuous { + s.unsafeUntil = now.Add(wallClockRecoveryHold) + } + s.lastEval = now + s.lastWallUnixNano = now.UnixNano() - if changed && s.updateFn != nil { - // Pass a copy so the callback cannot mutate internal state. - cp := make(map[string]bool, len(newActive)) - for k, v := range newActive { - cp[k] = v - } - s.updateFn(cp) + if !changed || !notify || s.updateFn == nil { + s.mu.Unlock() + return } + cp := copyActiveState(newActive) + updateFn := s.updateFn + s.mu.Unlock() + updateFn(cp) +} + +func (s *Scheduler) wallClockDiscontinuousLocked(now time.Time) bool { + if s.lastEval.IsZero() { + return false + } + wallElapsed := time.Duration(now.UnixNano() - s.lastWallUnixNano) + if wallElapsed < 0 { + slog.Warn("scheduler: wall clock moved backward, failing closed during recovery hold", + "previous", s.lastEval, "current", now) + return true + } + monoElapsed := now.Sub(s.lastEval) + if monoElapsed < 0 { + slog.Warn("scheduler: monotonic clock moved backward, failing closed during recovery hold", + "previous", s.lastEval, "current", now) + return true + } + delta := wallElapsed - monoElapsed + if delta < 0 { + delta = -delta + } + if delta > wallClockDriftTolerance { + slog.Warn("scheduler: wall clock drift exceeded tolerance, failing closed during recovery hold", + "wall_elapsed", wallElapsed, "monotonic_elapsed", monoElapsed, "tolerance", wallClockDriftTolerance) + return true + } + return false +} + +func (s *Scheduler) notifyActiveState() { + s.mu.RLock() + if s.updateFn == nil { + s.mu.RUnlock() + return + } + cp := copyActiveState(s.active) + updateFn := s.updateFn + s.mu.RUnlock() + updateFn(cp) +} + +func copyActiveState(in map[string]bool) map[string]bool { + out := make(map[string]bool, len(in)) + for k, v := range in { + out[k] = v + } + return out } // isWithinWindow determines whether now falls within the time window defined diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index f381c19c3..8c4459b7f 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -178,6 +178,127 @@ func TestScheduler_InitialState(t *testing.T) { } } +func TestScheduler_NewPrimedDoesNotNotifyInitialState(t *testing.T) { + var called bool + schedCfg := map[string]*config.SchedulerConfig{ + "always-on": {Name: "always-on"}, + } + + s, state := NewPrimed(schedCfg, func(activeState map[string]bool) { + called = true + }, time.Date(2026, 2, 12, 14, 30, 0, 0, time.UTC)) + + if called { + t.Fatal("NewPrimed fired callback during constructor") + } + if !state["always-on"] { + t.Fatal("initial state missing always-on=true") + } + if !s.IsActive("always-on") { + t.Fatal("IsActive should return true for always-on") + } +} + +func TestScheduler_WallClockBackwardStepFailsClosed(t *testing.T) { + var lastState map[string]bool + schedCfg := map[string]*config.SchedulerConfig{ + "business-hours": { + Name: "business-hours", + StartTime: "08:00:00", + StopTime: "17:00:00", + }, + } + now := time.Date(2026, 2, 12, 12, 0, 0, 0, time.UTC) + s, state := NewPrimed(schedCfg, func(activeState map[string]bool) { + lastState = activeState + }, now) + if !state["business-hours"] { + t.Fatal("initial state should be active") + } + + s.evaluate(now.Add(-1*time.Hour), true) + if lastState == nil { + t.Fatal("expected callback after fail-closed state change") + } + if lastState["business-hours"] { + t.Fatalf("wall-clock backward step should fail closed, got state %+v", lastState) + } + if s.IsActive("business-hours") { + t.Fatal("scheduler should remain inactive after backward wall-clock step") + } +} + +func TestScheduler_WallClockBackwardStepStaysFailClosedUntilClockRecovers(t *testing.T) { + var lastState map[string]bool + schedCfg := map[string]*config.SchedulerConfig{ + "business-hours": { + Name: "business-hours", + StartTime: "08:00:00", + StopTime: "17:00:00", + }, + } + now := time.Date(2026, 2, 12, 12, 0, 0, 0, time.UTC) + s, state := NewPrimed(schedCfg, func(activeState map[string]bool) { + lastState = activeState + }, now) + if !state["business-hours"] { + t.Fatal("initial state should be active") + } + + // Simulate the real NTP rollback shape: monotonic time advances while + // wall time appears to move backward relative to the previous wall sample. + s.mu.Lock() + s.lastEval = now + s.lastWallUnixNano = now.Add(time.Hour).UnixNano() + s.mu.Unlock() + + s.evaluate(now.Add(time.Second), true) + if lastState == nil || lastState["business-hours"] { + t.Fatalf("first backward-step evaluation should fail closed, got state %+v", lastState) + } + lastState = nil + + // The recovery hold keeps the scheduler closed for more than one tick, + // even after the new wall/monotonic samples are internally consistent. + s.evaluate(now.Add(time.Minute), true) + if lastState != nil { + t.Fatalf("second rollback evaluation should not notify without state change, got %+v", lastState) + } + if s.IsActive("business-hours") { + t.Fatal("scheduler should stay inactive during wall-clock recovery hold") + } + + s.evaluate(now.Add(3*time.Minute), true) + if lastState == nil || !lastState["business-hours"] { + t.Fatalf("scheduler should recover after hold window, got state %+v", lastState) + } +} + +func TestScheduler_MonotonicAdvanceDoesNotFailClosed(t *testing.T) { + schedCfg := map[string]*config.SchedulerConfig{ + "business-hours": { + Name: "business-hours", + StartTime: "00:00:00", + StopTime: "23:59:59", + }, + } + start := time.Now() + s, state := NewPrimed(schedCfg, func(map[string]bool) {}, start) + if !state["business-hours"] { + t.Fatal("initial state should be active") + } + + // time.Add preserves Go's monotonic reading. This exercises the real + // monotonic path; time.Date-only tests would silently skip it. + s.evaluate(start.Add(time.Minute), true) + if s.IsActive("business-hours") == false { + t.Fatal("monotonic time advance with matching wall time should stay active") + } + if !s.unsafeUntil.IsZero() { + t.Fatalf("unsafeUntil = %v, want zero for monotonic advance", s.unsafeUntil) + } +} + func TestScheduler_ActiveState(t *testing.T) { schedCfg := map[string]*config.SchedulerConfig{ "always-on": {Name: "always-on"}, diff --git a/userspace-dp/src/afxdp/test_fixtures.rs b/userspace-dp/src/afxdp/test_fixtures.rs index 6267ba57e..093ca1830 100644 --- a/userspace-dp/src/afxdp/test_fixtures.rs +++ b/userspace-dp/src/afxdp/test_fixtures.rs @@ -432,6 +432,7 @@ pub(super) fn nat_snapshot() -> ConfigSnapshot { applications: vec!["any".to_string()], application_terms: Vec::new(), action: "permit".to_string(), + ..Default::default() }], neighbors: vec![ NeighborSnapshot { @@ -529,6 +530,7 @@ pub(super) fn policy_deny_snapshot() -> ConfigSnapshot { applications: vec!["any".to_string()], application_terms: Vec::new(), action: "permit".to_string(), + ..Default::default() }], ..Default::default() } diff --git a/userspace-dp/src/afxdp/tests.rs b/userspace-dp/src/afxdp/tests.rs index ee677eb1a..815921823 100644 --- a/userspace-dp/src/afxdp/tests.rs +++ b/userspace-dp/src/afxdp/tests.rs @@ -939,6 +939,7 @@ fn post_dnat_source_nat_matches_translated_destination() { applications: vec!["any".to_string()], application_terms: Vec::new(), action: "permit".to_string(), + ..Default::default() }); let state = build_forwarding_state(&snapshot); diff --git a/userspace-dp/src/main_tests.rs b/userspace-dp/src/main_tests.rs index 108b8406c..964c975c4 100644 --- a/userspace-dp/src/main_tests.rs +++ b/userspace-dp/src/main_tests.rs @@ -884,6 +884,55 @@ fn binding_counters_snapshot_serializes_with_expected_wire_keys() { assert_eq!(round, snap); } +#[test] +fn apply_snapshot_rejects_unsupported_protocol_version() { + let (mut client, server) = std::os::unix::net::UnixStream::pair().expect("control socket pair"); + let state = Arc::new(Mutex::new(ServerState { + status: ProcessStatus::default(), + snapshot: None, + afxdp: afxdp::Coordinator::new(), + state_writer: Arc::new(StateWriter::new()), + })); + let running = Arc::new(AtomicBool::new(true)); + let state_file = format!( + "{}/xpf-policy-scheduler-version-gate-{}.json", + std::env::temp_dir().display(), + std::process::id() + ); + let handle = { + let state = state.clone(); + let running = running.clone(); + std::thread::spawn(move || handle_stream(server, &state_file, state, running)) + }; + + let request = ControlRequest { + request_type: "apply_snapshot".to_string(), + snapshot: Some(ConfigSnapshot { + version: CONFIG_SNAPSHOT_PROTOCOL_VERSION - 1, + generated_at: Utc::now(), + ..ConfigSnapshot::default() + }), + ..ControlRequest::default() + }; + serde_json::to_writer(&mut client, &request).expect("write request"); + std::io::Write::write_all(&mut client, b"\n").expect("newline"); + + let response: ControlResponse = + serde_json::from_reader(std::io::BufReader::new(client)).expect("read response"); + assert!(!response.ok); + assert!( + response + .error + .contains("unsupported snapshot protocol version"), + "unexpected error: {}", + response.error + ); + handle + .join() + .expect("handler thread") + .expect("handler result"); +} + #[test] fn binding_counters_snapshot_tolerates_pre_split_wire() { // #804: a helper snapshot that pre-dates the diff --git a/userspace-dp/src/policy.rs b/userspace-dp/src/policy.rs index f443341ea..93c7faf86 100644 --- a/userspace-dp/src/policy.rs +++ b/userspace-dp/src/policy.rs @@ -45,8 +45,11 @@ impl Default for PolicyAction { #[derive(Debug)] pub(crate) struct PolicyRule { + pub(crate) rule_id: String, pub(crate) from_zone: String, pub(crate) to_zone: String, + pub(crate) scheduler_name: String, + pub(crate) inactive: bool, /// #923: adaptive prefix set (MatchAny / Linear ≤16 / Trie >16). /// Replaces the legacy `Vec` linear scan in /// `nets_match_v4/v6`. Empty input collapses to `MatchAny`, @@ -65,8 +68,11 @@ pub(crate) struct PolicyRule { impl Default for PolicyRule { fn default() -> Self { Self { + rule_id: String::new(), from_zone: String::new(), to_zone: String::new(), + scheduler_name: String::new(), + inactive: false, source_v4: PrefixSetV4::default(), source_v6: PrefixSetV6::default(), destination_v4: PrefixSetV4::default(), @@ -85,8 +91,11 @@ impl Default for PolicyRule { impl Clone for PolicyRule { fn clone(&self) -> Self { Self { + rule_id: self.rule_id.clone(), from_zone: self.from_zone.clone(), to_zone: self.to_zone.clone(), + scheduler_name: self.scheduler_name.clone(), + inactive: self.inactive, source_v4: self.source_v4.clone(), source_v6: self.source_v6.clone(), destination_v4: self.destination_v4.clone(), @@ -228,8 +237,11 @@ pub(crate) fn parse_policy_state( parse_address(prefix, &mut dst_v4, &mut dst_v6); } let mut rule = PolicyRule { + rule_id: stable_policy_rule_id(snap), from_zone: snap.from_zone.clone(), to_zone: snap.to_zone.clone(), + scheduler_name: snap.scheduler_name.clone(), + inactive: snap.inactive, action: parse_action(&snap.action), source_v4: PrefixSetV4::from_prefixes(src_v4), source_v6: PrefixSetV6::from_prefixes(src_v6), @@ -326,6 +338,9 @@ fn try_match_rule( src_port: u16, dst_port: u16, ) -> Option { + if rule.inactive { + return None; + } if !rule.compiled_apps.matches(protocol, src_port, dst_port) { return None; } @@ -346,6 +361,13 @@ fn try_match_rule( } } +fn stable_policy_rule_id(snap: &PolicyRuleSnapshot) -> String { + if !snap.rule_id.is_empty() { + return snap.rule_id.clone(); + } + format!("{}->{}/{}", snap.from_zone, snap.to_zone, snap.name) +} + fn parse_action(action: &str) -> PolicyAction { match action { "permit" => PolicyAction::Permit, diff --git a/userspace-dp/src/policy_tests.rs b/userspace-dp/src/policy_tests.rs index 45344f2c5..5bd3bfab5 100644 --- a/userspace-dp/src/policy_tests.rs +++ b/userspace-dp/src/policy_tests.rs @@ -29,6 +29,7 @@ fn allow_all_matches_zone_pair() { applications: vec!["any".to_string()], application_terms: Vec::new(), action: "permit".to_string(), + ..Default::default() }], &test_zone_name_to_id(), ); @@ -65,6 +66,113 @@ fn default_deny_applies_without_match() { ); } +#[test] +fn evaluate_policy_skips_inactive_rules() { + let state = parse_policy_state( + "deny", + &[PolicyRuleSnapshot { + rule_id: "security-policy:lan:wan:inactive-allow".to_string(), + name: "inactive-allow".to_string(), + from_zone: "lan".to_string(), + to_zone: "wan".to_string(), + scheduler_name: "workhours".to_string(), + inactive: true, + source_addresses: vec!["any".to_string()], + destination_addresses: vec!["any".to_string()], + applications: vec!["any".to_string()], + action: "permit".to_string(), + ..Default::default() + }], + &test_zone_name_to_id(), + ); + + assert_eq!( + state.rules[0].rule_id, + "security-policy:lan:wan:inactive-allow" + ); + assert_eq!(state.rules[0].scheduler_name, "workhours"); + assert!(state.rules[0].inactive); + assert_eq!( + evaluate_policy( + &state, + TEST_LAN_ZONE_ID, + TEST_WAN_ZONE_ID, + "10.0.61.100".parse().expect("src"), + "172.16.80.200".parse().expect("dst"), + PROTO_TCP, + 12345, + 5201, + ), + PolicyAction::Deny + ); + assert_eq!( + state.rules[0] + .hit_count + .load(std::sync::atomic::Ordering::Relaxed), + 0 + ); +} + +#[test] +fn inactive_rule_falls_through_to_next_match() { + let state = parse_policy_state( + "deny", + &[ + PolicyRuleSnapshot { + name: "inactive-deny".to_string(), + from_zone: "lan".to_string(), + to_zone: "wan".to_string(), + scheduler_name: "offhours".to_string(), + inactive: true, + source_addresses: vec!["any".to_string()], + destination_addresses: vec!["any".to_string()], + applications: vec!["any".to_string()], + action: "deny".to_string(), + ..Default::default() + }, + PolicyRuleSnapshot { + rule_id: "security-policy:lan:wan:active-allow".to_string(), + name: "active-allow".to_string(), + from_zone: "lan".to_string(), + to_zone: "wan".to_string(), + source_addresses: vec!["any".to_string()], + destination_addresses: vec!["any".to_string()], + applications: vec!["any".to_string()], + action: "permit".to_string(), + ..Default::default() + }, + ], + &test_zone_name_to_id(), + ); + + assert_eq!(state.rules[0].rule_id, "lan->wan/inactive-deny"); + assert_eq!( + evaluate_policy( + &state, + TEST_LAN_ZONE_ID, + TEST_WAN_ZONE_ID, + "10.0.61.100".parse().expect("src"), + "172.16.80.200".parse().expect("dst"), + PROTO_TCP, + 12345, + 5201, + ), + PolicyAction::Permit + ); + assert_eq!( + state.rules[0] + .hit_count + .load(std::sync::atomic::Ordering::Relaxed), + 0 + ); + assert_eq!( + state.rules[1] + .hit_count + .load(std::sync::atomic::Ordering::Relaxed), + 1 + ); +} + #[test] fn cidr_matches_ipv6() { let state = parse_policy_state( @@ -78,6 +186,7 @@ fn cidr_matches_ipv6() { applications: vec!["any".to_string()], application_terms: Vec::new(), action: "permit".to_string(), + ..Default::default() }], &test_zone_name_to_id(), ); @@ -114,6 +223,7 @@ fn named_application_matches_protocol_and_port() { destination_port: "80".to_string(), }], action: "permit".to_string(), + ..Default::default() }], &test_zone_name_to_id(), ); @@ -171,6 +281,7 @@ fn application_set_matches_any_expanded_term() { }, ], action: "permit".to_string(), + ..Default::default() }], &test_zone_name_to_id(), ); @@ -202,6 +313,7 @@ fn global_policy_matches_any_zone_pair() { applications: vec!["any".to_string()], application_terms: Vec::new(), action: "permit".to_string(), + ..Default::default() }], &test_zone_name_to_id(), ); @@ -248,6 +360,7 @@ fn global_policy_evaluated_after_zone_specific() { applications: vec!["any".to_string()], application_terms: Vec::new(), action: "deny".to_string(), + ..Default::default() }, PolicyRuleSnapshot { name: "global-allow".to_string(), @@ -258,6 +371,7 @@ fn global_policy_evaluated_after_zone_specific() { applications: vec!["any".to_string()], application_terms: Vec::new(), action: "permit".to_string(), + ..Default::default() }, ], &test_zone_name_to_id(), @@ -310,6 +424,7 @@ fn evaluate_policy_unknown_zone_pair_returns_default_action() { applications: vec!["any".into()], application_terms: Vec::new(), action: "permit".into(), + ..Default::default() }], &zones, ); @@ -353,6 +468,7 @@ fn malformed_only_input_yields_match_all_via_evaluate_policy() { applications: vec!["any".into()], application_terms: Vec::new(), action: "permit".into(), + ..Default::default() }], &zones, ); diff --git a/userspace-dp/src/protocol.rs b/userspace-dp/src/protocol.rs index 0e70fdbc2..a0bfb0642 100644 --- a/userspace-dp/src/protocol.rs +++ b/userspace-dp/src/protocol.rs @@ -7,6 +7,8 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; +pub(crate) const CONFIG_SNAPSHOT_PROTOCOL_VERSION: i32 = 2; + // --------------------------------------------------------------------------- // Snapshot schema // --------------------------------------------------------------------------- @@ -593,11 +595,17 @@ pub(crate) struct FlowExportSnapshot { #[derive(Clone, Debug, Serialize, Deserialize, Default)] pub(crate) struct PolicyRuleSnapshot { + #[serde(rename = "rule_id", default)] + pub rule_id: String, pub name: String, #[serde(rename = "from_zone", default)] pub from_zone: String, #[serde(rename = "to_zone", default)] pub to_zone: String, + #[serde(rename = "scheduler_name", default)] + pub scheduler_name: String, + #[serde(default)] + pub inactive: bool, #[serde(rename = "source_addresses", default)] pub source_addresses: Vec, #[serde(rename = "destination_addresses", default)] @@ -698,6 +706,8 @@ pub(crate) struct ControlRequest { #[derive(Clone, Debug, Serialize, Deserialize, Default)] pub(crate) struct ProcessStatus { pub pid: i32, + #[serde(rename = "config_snapshot_protocol_version", default)] + pub config_snapshot_protocol_version: i32, #[serde(rename = "started_at")] pub started_at: DateTime, #[serde(rename = "control_socket")] @@ -2278,6 +2288,54 @@ mod tests { assert_eq!(back.v_min_throttles, status.v_min_throttles); } + #[test] + fn policy_rule_snapshot_scheduler_fields_round_trip() { + let snap = PolicyRuleSnapshot { + rule_id: "security-policy:trust:untrust:allow-web".into(), + name: "allow-web".into(), + from_zone: "trust".into(), + to_zone: "untrust".into(), + scheduler_name: "workhours".into(), + inactive: true, + source_addresses: vec!["any".into()], + destination_addresses: vec!["any".into()], + applications: vec!["junos-http".into()], + action: "permit".into(), + ..Default::default() + }; + + let value: serde_json::Value = + serde_json::to_value(&snap).expect("serialize PolicyRuleSnapshot to Value"); + assert_eq!(value["rule_id"], "security-policy:trust:untrust:allow-web"); + assert_eq!(value["scheduler_name"], "workhours"); + assert_eq!(value["inactive"], true); + + let back: PolicyRuleSnapshot = + serde_json::from_value(value).expect("deserialize PolicyRuleSnapshot"); + assert_eq!(back.rule_id, snap.rule_id); + assert_eq!(back.scheduler_name, snap.scheduler_name); + assert_eq!(back.inactive, snap.inactive); + } + + #[test] + fn policy_rule_snapshot_legacy_scheduler_fields_default() { + let legacy_json = r#"{ + "name": "allow-web", + "from_zone": "trust", + "to_zone": "untrust", + "source_addresses": ["any"], + "destination_addresses": ["any"], + "applications": ["junos-http"], + "action": "permit" + }"#; + + let snap: PolicyRuleSnapshot = + serde_json::from_str(legacy_json).expect("pre-#1378 PolicyRuleSnapshot decodes"); + assert_eq!(snap.rule_id, ""); + assert_eq!(snap.scheduler_name, ""); + assert_eq!(snap.inactive, false); + } + // #915 forward-compat: a pre-#915 CoSSchedulerSnapshot // payload (no `surplus_sharing` field) must decode with // `surplus_sharing == false` so the runtime sees the field diff --git a/userspace-dp/src/server/README.md b/userspace-dp/src/server/README.md index 6a819b7c1..fc89025eb 100644 --- a/userspace-dp/src/server/README.md +++ b/userspace-dp/src/server/README.md @@ -27,6 +27,18 @@ The shapes are mirrored in `pkg/dataplane/userspace/protocol.go` on the Go side; **the JSON tags ARE the contract** — changing one without updating the other breaks the helper. +`ConfigSnapshot.version` is a compatibility gate, not just documentation. +The helper accepts only the current snapshot protocol version; this prevents a +new daemon from publishing fields such as policy-scheduler inactive bits to a +helper that would silently ignore them. +The helper also reports `config_snapshot_protocol_version` in status so a new +daemon can fail closed before sending scheduled-policy snapshots to an older +helper binary that predates the gate. If the daemon detects an incompatible +helper while scheduled policies are configured, it sends +`set_forwarding_state armed=false` before returning the compile/publish error; +the old helper must not keep forwarding a stale snapshot that ignores scheduler +inactive bits. + ## Reconciliation `replan_queues` derives the binding plan from the current diff --git a/userspace-dp/src/server/handlers.rs b/userspace-dp/src/server/handlers.rs index ef5c51e3f..1d2b52e1e 100644 --- a/userspace-dp/src/server/handlers.rs +++ b/userspace-dp/src/server/handlers.rs @@ -52,51 +52,61 @@ pub(crate) fn handle_stream( "ping" | "status" => {} "apply_snapshot" => { if let Some(snapshot) = request.snapshot { - eprintln!( - "CTRL_REQ: apply_snapshot generation={} fib_generation={} forwarding_armed_before={}", - snapshot.generation, snapshot.fib_generation, guard.status.forwarding_armed - ); - guard.status.last_snapshot_generation = snapshot.generation; - guard.status.last_fib_generation = snapshot.fib_generation; - guard.status.last_snapshot_at = Some(snapshot.generated_at); - guard.status.capabilities = snapshot.capabilities.clone(); - let existing_bindings = guard.status.bindings.clone(); - let previous_snapshot = guard.snapshot.as_ref(); - let same_plan = previous_snapshot.is_some_and(|prev| { - let prev_key = snapshot_binding_plan_key(prev); - let next_key = snapshot_binding_plan_key(&snapshot); - let same = prev_key == next_key; - if !same { - eprintln!( - "CTRL_REQ: binding plan changed prev_key={} next_key={}", - prev_key, next_key - ); - } - same - }); - if same_plan { - guard.afxdp.refresh_runtime_snapshot(&snapshot); - guard.snapshot = Some(snapshot); - refresh_status(&mut guard); - persist_state = true; + if snapshot.version != CONFIG_SNAPSHOT_PROTOCOL_VERSION { + response.ok = false; + response.error = format!( + "unsupported snapshot protocol version {} (want {})", + snapshot.version, CONFIG_SNAPSHOT_PROTOCOL_VERSION + ); } else { - let defer_workers = snapshot.defer_workers; - guard.snapshot = Some(snapshot); - let replanned = replan_queues( - guard.snapshot.as_ref(), - guard.status.workers, - &existing_bindings, + eprintln!( + "CTRL_REQ: apply_snapshot generation={} fib_generation={} forwarding_armed_before={}", + snapshot.generation, + snapshot.fib_generation, + guard.status.forwarding_armed ); - guard.status.bindings = replanned; - if defer_workers { - eprintln!( - "CTRL_REQ: apply_snapshot defer_workers=true — skipping worker spawn (RETH MAC pending)" - ); + guard.status.last_snapshot_generation = snapshot.generation; + guard.status.last_fib_generation = snapshot.fib_generation; + guard.status.last_snapshot_at = Some(snapshot.generated_at); + guard.status.capabilities = snapshot.capabilities.clone(); + let existing_bindings = guard.status.bindings.clone(); + let previous_snapshot = guard.snapshot.as_ref(); + let same_plan = previous_snapshot.is_some_and(|prev| { + let prev_key = snapshot_binding_plan_key(prev); + let next_key = snapshot_binding_plan_key(&snapshot); + let same = prev_key == next_key; + if !same { + eprintln!( + "CTRL_REQ: binding plan changed prev_key={} next_key={}", + prev_key, next_key + ); + } + same + }); + if same_plan { + guard.afxdp.refresh_runtime_snapshot(&snapshot); + guard.snapshot = Some(snapshot); + refresh_status(&mut guard); + persist_state = true; } else { - reconcile_status_bindings(&mut guard); + let defer_workers = snapshot.defer_workers; + guard.snapshot = Some(snapshot); + let replanned = replan_queues( + guard.snapshot.as_ref(), + guard.status.workers, + &existing_bindings, + ); + guard.status.bindings = replanned; + if defer_workers { + eprintln!( + "CTRL_REQ: apply_snapshot defer_workers=true — skipping worker spawn (RETH MAC pending)" + ); + } else { + reconcile_status_bindings(&mut guard); + } + refresh_status(&mut guard); + persist_state = true; } - refresh_status(&mut guard); - persist_state = true; } } else { response.ok = false; diff --git a/userspace-dp/src/server/lifecycle.rs b/userspace-dp/src/server/lifecycle.rs index 3ce721721..acf7c19cd 100644 --- a/userspace-dp/src/server/lifecycle.rs +++ b/userspace-dp/src/server/lifecycle.rs @@ -73,6 +73,7 @@ pub(crate) fn run() -> Result<(), String> { let state = Arc::new(Mutex::new(ServerState { status: ProcessStatus { pid: std::process::id() as i32, + config_snapshot_protocol_version: CONFIG_SNAPSHOT_PROTOCOL_VERSION, started_at: Utc::now(), control_socket: args.control_socket.clone(), state_file: args.state_file.clone(),