diff --git a/docs/pr/1381-dataplane-interface-split/plan.md b/docs/pr/1381-dataplane-interface-split/plan.md index 017e63581..b355f32cd 100644 --- a/docs/pr/1381-dataplane-interface-split/plan.md +++ b/docs/pr/1381-dataplane-interface-split/plan.md @@ -101,8 +101,12 @@ type ApplyResult struct { FilterIDs map[string]uint32 FilterSpans map[string]FilterCounterSpan NATCounterIDs map[string]uint32 - Capabilities Capabilities - Generation uint64 + PoolIDs map[string]uint8 + PolicyNames map[uint32]string + AppNames map[uint16]string + PolicyScheduleRuleSlots []PolicyScheduleRuleSlot + Capabilities Capabilities + Generation uint64 } type FilterCounterSpan struct { @@ -135,6 +139,10 @@ through `LastCompileResult()`. It must carry at least: maps `rule-set/rule` to a counter ID via `LastCompileResult().NATCounterIDs`. That mapping is config/apply metadata, not a BPF map-writer method, so it belongs in `ApplyResult`. +- `PoolIDs`, `PolicyNames`, `AppNames`, and `PolicyScheduleRuleSlots` for NAT + display, flow/session attribution, event labels, and policy-scheduler + runtime updates. Scheduler callers must consume the compiled slots directly; + eBPF/DPDK/userspace must not recompute slots from original config indexes. - stable generation numbers for those IDs so mixed old/new metadata cannot be combined with counters from a different apply generation. @@ -266,6 +274,11 @@ type SessionDeltaSource interface { } ``` +`SessionStore.SessionDeltas()` is the bridge for this optional source. Generic +map-backed session stores return nil; the userspace backend returns an adapter +that converts helper-private DTOs into `pkg/dataplane/runtime` DTOs at the +backend boundary. + These DTOs must live in the same package as the abstract runtime interfaces or in a third leaf package imported by both `pkg/dataplane` and `pkg/dataplane/userspace`. The public interface must not reference @@ -282,6 +295,7 @@ backend boundary. type Telemetry interface { NewEventSource() (dataplane.EventSource, error) GlobalCounter(uint32) (uint64, error) + ReadFloodCounters(uint16) (dataplane.FloodState, error) InterfaceCounters(int) (dataplane.InterfaceCounterValue, error) ZoneCounters(uint16, int) (dataplane.CounterValue, error) PolicyCounters(uint32) (dataplane.CounterValue, error) @@ -534,3 +548,36 @@ Phase 1 is not complete until all of these are true: semantics as GC and has reverse-key plus DNAT/NAT64 cleanup tests; and - rollback is documented as switching the daemon back to the existing `dataplane.DataPlane` adapter without changing persistent config format. + +## Implementation note: first Phase 1 slice + +The first implementation slice keeps the legacy BPF-shaped `DataPlane` +interface in place and adds the new contract beside it: + +- `pkg/dataplane.RuntimeDataPlane`, `ConfigSink`, `ApplyResult`, + `SessionStore`, `Telemetry`, HA, and link-domain interfaces are now defined + without importing backend packages. +- `ApplyResult` carries `FilterIDs`, `FilterSpans` (`FilterID`, `RuleStart`, + `RuleCount`), `NATCounterIDs` widened to `uint32`, `PoolIDs`, + `PolicyNames`, `AppNames`, compiled `PolicyScheduleRuleSlots`, + capabilities, and a generation. eBPF, DPDK, and userspace compiles now + publish `LastApplyResult()`. +- `pkg/dataplane/runtime` owns the neutral session-delta DTOs and + `SessionDeltaSource`; `SessionStore.SessionDeltas()` exposes that optional + source and userspace adapts its helper-private + `SessionDeltaInfo`/`ProcessStatus` at the package boundary. +- eBPF, DPDK, and userspace managers now satisfy `RuntimeDataPlane` at + compile time. Shared adapters cover eBPF/DPDK HA plus generic telemetry and + session surfaces; userspace keeps backend-specific link and HA controllers so + link-cycle prepare/defer/rebind semantics and fabric-state helper sync stay + intact. +- Cluster stale-bulk reconciliation now routes through + `dataplane.SessionStore.ReconcileClusterBulk`, whose companion-delete path + owns forward, reverse, and DNAT/DNATv6 cleanup. A canary fails if + `pkg/cluster/sync.go` reintroduces local `DeleteDNATEntry*` cleanup. + +Remaining Phase 1 work is still explicit: daemon/API/gRPC/CLI callers must move +from `LastCompileResult()` and BPF map reads to `LastApplyResult()`/domain +interfaces, GC must move to `SessionStore`/`Telemetry`, and the legacy +`DataPlane` method-count canary can only flip after those callers no longer need +the BPF-shaped surface. diff --git a/go.mod b/go.mod index e3175fa5f..9af98c17b 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/insomniacslk/dhcp v0.0.0-20251020182700-175e84fbb167 github.com/mdlayher/ndp v1.1.0 github.com/prometheus/client_golang v1.23.2 + github.com/prometheus/client_model v0.6.2 github.com/vishvananda/netlink v1.3.1 golang.org/x/net v0.47.0 golang.org/x/sync v0.18.0 @@ -27,7 +28,6 @@ require ( github.com/mdlayher/socket v0.5.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pierrec/lz4/v4 v4.1.14 // indirect - github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/procfs v0.16.1 // indirect github.com/u-root/uio v0.0.0-20230220225925-ffce2a382923 // indirect diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 03d3786aa..22e402c4b 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -551,64 +551,25 @@ func (s *SessionSync) reconcileStaleSessions() { return true } var deleted int - var staleV4 []dataplane.SessionKey - v4IterStart := time.Now() - s.dp.IterateSessions(func(key dataplane.SessionKey, val dataplane.SessionValue) bool { - if val.IsReverse != 0 { - return true - } - if shouldSyncAtBulkStart(val.IngressZone) { - return true - } - if _, ok := recvV4[key]; !ok { - staleV4 = append(staleV4, key) - } - return true + store := dataplane.NewDataPlaneSessionStore(s.dp) + result, err := store.ReconcileClusterBulk(dataplane.ClusterBulkReconcileInput{ + ReceivedV4: recvV4, + ReceivedV6: recvV6, + ShouldSyncZone: shouldSyncAtBulkStart, + DeleteReason: dataplane.DeleteReasonClusterStale, }) - slog.Info("cluster sync: reconcile stale sessions iterated v4", "stale", len(staleV4), "elapsed", time.Since(v4IterStart)) - v4DeleteStart := time.Now() - for _, key := range staleV4 { - if val, err := s.dp.GetSessionV4(key); err == nil { - if val.ReverseKey.Protocol != 0 { - s.dp.DeleteSession(val.ReverseKey) - } - if val.Flags&dataplane.SessFlagSNAT != 0 && val.Flags&dataplane.SessFlagStaticNAT == 0 { - s.dp.DeleteDNATEntry(dataplane.DNATKey{Protocol: key.Protocol, DstIP: val.NATSrcIP, DstPort: val.NATSrcPort}) - } - } - s.dp.DeleteSession(key) - deleted++ - } - slog.Info("cluster sync: reconcile stale sessions deleted v4", "deleted", len(staleV4), "elapsed", time.Since(v4DeleteStart)) - var staleV6 []dataplane.SessionKeyV6 - v6IterStart := time.Now() - s.dp.IterateSessionsV6(func(key dataplane.SessionKeyV6, val dataplane.SessionValueV6) bool { - if val.IsReverse != 0 { - return true - } - if shouldSyncAtBulkStart(val.IngressZone) { - return true - } - if _, ok := recvV6[key]; !ok { - staleV6 = append(staleV6, key) - } - return true - }) - slog.Info("cluster sync: reconcile stale sessions iterated v6", "stale", len(staleV6), "elapsed", time.Since(v6IterStart)) - v6DeleteStart := time.Now() - for _, key := range staleV6 { - if val, err := s.dp.GetSessionV6(key); err == nil { - if val.ReverseKey.Protocol != 0 { - s.dp.DeleteSessionV6(val.ReverseKey) - } - if val.Flags&dataplane.SessFlagSNAT != 0 && val.Flags&dataplane.SessFlagStaticNAT == 0 { - s.dp.DeleteDNATEntryV6(dataplane.DNATKeyV6{Protocol: key.Protocol, DstIP: val.NATSrcIP, DstPort: val.NATSrcPort}) - } - } - s.dp.DeleteSessionV6(key) - deleted++ + deleted = result.DeletedV4 + result.DeletedV6 + if err != nil { + slog.Warn("cluster sync: reconcile stale sessions failed", "err", err) + s.stats.Errors.Add(1) } - slog.Info("cluster sync: reconcile stale sessions deleted v6", "deleted", len(staleV6), "elapsed", time.Since(v6DeleteStart)) + slog.Info( + "cluster sync: reconcile stale sessions applied", + "stale_v4", result.StaleV4, + "stale_v6", result.StaleV6, + "deleted_v4", result.DeletedV4, + "deleted_v6", result.DeletedV6, + ) if deleted > 0 { slog.Info("cluster sync: reconciled stale sessions", "deleted", deleted) } diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index fe99b48e1..d4c7baf94 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -5,8 +5,12 @@ import ( "encoding/binary" "errors" "fmt" + "go/ast" + "go/parser" + "go/token" "io" "net" + "path/filepath" "strings" "sync" "testing" @@ -537,6 +541,8 @@ type mockSweepDP struct { v4sessions map[dataplane.SessionKey]dataplane.SessionValue v6sessions map[dataplane.SessionKeyV6]dataplane.SessionValueV6 sessionCounter uint64 + deletedDNATV4 []dataplane.DNATKey + deletedDNATV6 []dataplane.DNATKeyV6 } func (m *mockSweepDP) ReadGlobalCounter(index uint32) (uint64, error) { @@ -610,10 +616,12 @@ func (m *mockSweepDP) DeleteSessionV6(key dataplane.SessionKeyV6) error { } func (m *mockSweepDP) DeleteDNATEntry(key dataplane.DNATKey) error { + m.deletedDNATV4 = append(m.deletedDNATV4, key) return nil } func (m *mockSweepDP) DeleteDNATEntryV6(key dataplane.DNATKeyV6) error { + m.deletedDNATV6 = append(m.deletedDNATV6, key) return nil } @@ -1456,6 +1464,125 @@ func TestReconcileStaleSessionsV6(t *testing.T) { } } +func TestReconcileStaleSessionsUsesSessionStoreCompanionDeleteV4(t *testing.T) { + freshKey := dataplane.SessionKey{SrcIP: [4]byte{10, 0, 3, 1}, DstIP: [4]byte{10, 0, 4, 1}, Protocol: 6, SrcPort: 1000, DstPort: 80} + staleKey := dataplane.SessionKey{SrcIP: [4]byte{10, 0, 3, 2}, DstIP: [4]byte{10, 0, 4, 2}, Protocol: 6, SrcPort: 2000, DstPort: 443} + reverseKey := dataplane.SessionKey{SrcIP: [4]byte{10, 0, 4, 2}, DstIP: [4]byte{10, 0, 3, 2}, Protocol: 6, SrcPort: 443, DstPort: 2000} + dp := &mockSweepDP{ + v4sessions: map[dataplane.SessionKey]dataplane.SessionValue{ + freshKey: {IsReverse: 0, IngressZone: 2}, + staleKey: { + IsReverse: 0, + IngressZone: 2, + ReverseKey: reverseKey, + Flags: dataplane.SessFlagSNAT, + NATSrcIP: 0x2c0200c0, + NATSrcPort: 40443, + }, + reverseKey: {IsReverse: 1, IngressZone: 2}, + }, + } + + ss := NewSessionSync(":0", "10.0.0.2:4785", dp) + ss.IsPrimaryFn = func() bool { return false } + ss.IsPrimaryForRGFn = func(rgID int) bool { return rgID == 1 } + ss.SetZoneRGMap(map[uint16]int{1: 1, 2: 2}) + + ss.handleMessage(nil, syncMsgBulkStart, nil) + ss.handleMessage(nil, syncMsgSessionV4, encodeSessionV4Payload(freshKey, dataplane.SessionValue{IsReverse: 0, IngressZone: 2})) + ss.handleMessage(nil, syncMsgBulkEnd, nil) + + if _, ok := dp.v4sessions[staleKey]; ok { + t.Fatal("stale forward session should be deleted") + } + if _, ok := dp.v4sessions[reverseKey]; ok { + t.Fatal("stale reverse session should be deleted") + } + wantDNAT := dataplane.DNATKey{Protocol: 6, DstIP: 0x2c0200c0, DstPort: 40443} + if len(dp.deletedDNATV4) != 1 || dp.deletedDNATV4[0] != wantDNAT { + t.Fatalf("deleted DNAT = %+v, want [%+v]", dp.deletedDNATV4, wantDNAT) + } +} + +func TestReconcileStaleSessionsUsesSessionStoreCompanionDeleteV6(t *testing.T) { + freshKey := dataplane.SessionKeyV6{Protocol: 6, SrcPort: 1000, DstPort: 80} + freshKey.SrcIP[15] = 1 + freshKey.DstIP[15] = 2 + staleKey := dataplane.SessionKeyV6{Protocol: 17, SrcPort: 2000, DstPort: 53} + staleKey.SrcIP[15] = 3 + staleKey.DstIP[15] = 4 + reverseKey := dataplane.SessionKeyV6{Protocol: 17, SrcPort: 53, DstPort: 2000} + reverseKey.SrcIP[15] = 4 + reverseKey.DstIP[15] = 3 + natIP := [16]byte{0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 99} + dp := &mockSweepDP{ + v6sessions: map[dataplane.SessionKeyV6]dataplane.SessionValueV6{ + freshKey: {IsReverse: 0, IngressZone: 2}, + staleKey: { + IsReverse: 0, + IngressZone: 2, + ReverseKey: reverseKey, + Flags: dataplane.SessFlagSNAT, + NATSrcIP: natIP, + NATSrcPort: 53000, + }, + reverseKey: {IsReverse: 1, IngressZone: 2}, + }, + } + + ss := NewSessionSync(":0", "10.0.0.2:4785", dp) + ss.IsPrimaryFn = func() bool { return false } + ss.IsPrimaryForRGFn = func(rgID int) bool { return rgID == 1 } + ss.SetZoneRGMap(map[uint16]int{1: 1, 2: 2}) + + ss.handleMessage(nil, syncMsgBulkStart, nil) + ss.handleMessage(nil, syncMsgSessionV6, encodeSessionV6Payload(freshKey, dataplane.SessionValueV6{IsReverse: 0, IngressZone: 2})) + ss.handleMessage(nil, syncMsgBulkEnd, nil) + + if _, ok := dp.v6sessions[staleKey]; ok { + t.Fatal("stale forward session should be deleted") + } + if _, ok := dp.v6sessions[reverseKey]; ok { + t.Fatal("stale reverse session should be deleted") + } + wantDNAT := dataplane.DNATKeyV6{Protocol: 17, DstIP: natIP, DstPort: 53000} + if len(dp.deletedDNATV6) != 1 || dp.deletedDNATV6[0] != wantDNAT { + t.Fatalf("deleted DNATv6 = %+v, want [%+v]", dp.deletedDNATV6, wantDNAT) + } +} + +func TestReconcileStaleSessionsHasNoLocalDNATCleanup(t *testing.T) { + t.Parallel() + + fset := token.NewFileSet() + file, err := parser.ParseFile(fset, filepath.Join(".", "sync.go"), nil, 0) + if err != nil { + t.Fatalf("parse sync.go: %v", err) + } + var reconcile ast.Node + for _, decl := range file.Decls { + fn, ok := decl.(*ast.FuncDecl) + if ok && fn.Name.Name == "reconcileStaleSessions" { + reconcile = fn.Body + break + } + } + if reconcile == nil { + t.Fatal("reconcileStaleSessions not found") + } + ast.Inspect(reconcile, func(n ast.Node) bool { + sel, ok := n.(*ast.SelectorExpr) + if !ok { + return true + } + switch sel.Sel.Name { + case "DeleteDNATEntry", "DeleteDNATEntryV6": + t.Fatalf("reconcileStaleSessions still owns local %s cleanup; use SessionStore companion delete", sel.Sel.Name) + } + return true + }) +} + func TestReconcileNoBulkInProgress(t *testing.T) { // If no bulk was in progress, reconciliation should be a no-op. key := dataplane.SessionKey{SrcIP: [4]byte{10, 0, 1, 1}, Protocol: 6} diff --git a/pkg/dataplane/apply.go b/pkg/dataplane/apply.go new file mode 100644 index 000000000..cb6b77fd1 --- /dev/null +++ b/pkg/dataplane/apply.go @@ -0,0 +1,359 @@ +package dataplane + +import ( + "context" + "errors" + "maps" + "slices" + + "github.com/psaab/xpf/pkg/config" + dpruntime "github.com/psaab/xpf/pkg/dataplane/runtime" + "github.com/psaab/xpf/pkg/networkd" +) + +// RuntimeDataPlane is the target daemon-facing dataplane shape for #1381. +// It is introduced beside the legacy BPF-shaped DataPlane while callers move +// one domain at a time. The eBPF, DPDK, and userspace managers have +// compile-time assertions against this shape. +type RuntimeDataPlane interface { + Start(context.Context) error + ConfigSink + Close() error + Teardown() error + + Link() LinkController + HA() HAController + Sessions() SessionStore + Telemetry() Telemetry + + // SessionDeltas returns the backend-neutral session-delta source used for + // HA session sync. Backends that do not support delta streaming return a + // nil source; callers must nil-check before use. + // TODO(#1381): migrate daemon HA session sync from direct userspace type + // assertions to this backend-neutral source. + SessionDeltas() dpruntime.SessionDeltaSource +} + +type ConfigSink interface { + ApplyConfig(context.Context, *config.Config) (*ApplyResult, error) + LastApplyResult() *ApplyResult +} + +type ApplyResult struct { + ZoneIDs map[string]uint16 + ManagedInterfaces []networkd.InterfaceConfig + FilterIDs map[string]uint32 + FilterSpans map[string]FilterCounterSpan + NATCounterIDs map[string]uint32 + + // Display metadata carried from CompileResult so callers can migrate from + // LastCompileResult() to LastApplyResult() without losing runtime lookups. + PoolIDs map[string]uint8 // NAT pool name -> pool ID (0-based) + PolicyNames map[uint32]string // rule_id -> policy path (zone/policy or global/policy) + AppNames map[uint16]string // app_id -> application name (structured logging) + + // PolicyScheduleRuleSlots records the compiled slots used by runtime + // scheduler updates. Callers must not recompute these slots from config + // policy positions because app-term expansion can make them diverge. + PolicyScheduleRuleSlots []PolicyScheduleRuleSlot + + Capabilities Capabilities + Generation uint64 +} + +type FilterCounterSpan struct { + FilterID uint32 + RuleStart uint32 + RuleCount uint32 +} + +type Capabilities struct { + ForwardingSupported bool + UnsupportedReasons []string +} + +type LinkController interface { + SetDeferWorkers(bool) + PrepareLinkCycle() + NotifyLinkCycle() +} + +type FabricID uint8 + +type HAController interface { + SetRGActive(context.Context, int, bool) error + SetHAWatchdog(context.Context, int, uint64) error + SetFabricForwarding(context.Context, FabricID, FabricFwdInfo) error + SyncFabricState(context.Context) error +} + +type Telemetry interface { + NewEventSource() (EventSource, error) + GlobalCounter(uint32) (uint64, error) + InterfaceCounters(int) (InterfaceCounterValue, error) + ZoneCounters(uint16, int) (CounterValue, error) + PolicyCounters(uint32) (CounterValue, error) + FilterCounters(uint32) (CounterValue, error) + NATRuleCounter(uint32) (CounterValue, error) + NATPortCounter(uint32) (uint64, error) + MapStats() []MapStats + // ReadFloodCounters returns the per-CPU aggregated flood/screen state for + // the given zone. Backends without BPF flood maps return a zero FloodState. + ReadFloodCounters(zoneID uint16) (FloodState, error) +} + +func ApplyResultFromCompileResult(result *CompileResult) *ApplyResult { + if result == nil { + return nil + } + out := &ApplyResult{ + ZoneIDs: maps.Clone(result.ZoneIDs), + ManagedInterfaces: cloneManagedInterfaces(result.ManagedInterfaces), + FilterIDs: maps.Clone(result.FilterIDs), + FilterSpans: maps.Clone(result.FilterSpans), + NATCounterIDs: make(map[string]uint32, len(result.NATCounterIDs)), + Capabilities: Capabilities{ForwardingSupported: true}, + PoolIDs: maps.Clone(result.PoolIDs), + PolicyNames: maps.Clone(result.PolicyNames), + AppNames: maps.Clone(result.AppNames), + PolicyScheduleRuleSlots: slices.Clone(result.PolicyScheduleRuleSlots), + } + for key, id := range result.NATCounterIDs { + out.NATCounterIDs[key] = uint32(id) + } + return out +} + +func (r *ApplyResult) Clone() *ApplyResult { + if r == nil { + return nil + } + out := *r + out.ZoneIDs = maps.Clone(r.ZoneIDs) + out.ManagedInterfaces = cloneManagedInterfaces(r.ManagedInterfaces) + out.FilterIDs = maps.Clone(r.FilterIDs) + out.FilterSpans = maps.Clone(r.FilterSpans) + out.NATCounterIDs = maps.Clone(r.NATCounterIDs) + out.Capabilities.UnsupportedReasons = slices.Clone(r.Capabilities.UnsupportedReasons) + out.PoolIDs = maps.Clone(r.PoolIDs) + out.PolicyNames = maps.Clone(r.PolicyNames) + out.AppNames = maps.Clone(r.AppNames) + out.PolicyScheduleRuleSlots = slices.Clone(r.PolicyScheduleRuleSlots) + return &out +} + +func cloneManagedInterfaces(in []networkd.InterfaceConfig) []networkd.InterfaceConfig { + out := slices.Clone(in) + for i := range out { + out[i].Addresses = slices.Clone(out[i].Addresses) + } + return out +} + +func (m *Manager) Start(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + return m.Load() +} + +func (m *Manager) Link() LinkController { + return NewDataPlaneLinkController(m) +} + +func (m *Manager) HA() HAController { + return NewDataPlaneHAController(m) +} + +func (m *Manager) Sessions() SessionStore { + return NewDataPlaneSessionStore(m) +} + +func (m *Manager) SessionDeltas() dpruntime.SessionDeltaSource { + return nil +} + +func (m *Manager) Telemetry() Telemetry { + return NewDataPlaneTelemetry(m) +} + +func (m *Manager) ApplyConfig(ctx context.Context, cfg *config.Config) (*ApplyResult, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + if _, err := m.Compile(cfg); err != nil { + return nil, err + } + return m.LastApplyResult(), nil +} + +func (m *Manager) LastApplyResult() *ApplyResult { + m.applyMu.Lock() + defer m.applyMu.Unlock() + return m.lastApply.Clone() +} + +func (m *Manager) recordApplyResult(result *ApplyResult) *ApplyResult { + if result == nil { + return nil + } + m.applyMu.Lock() + defer m.applyMu.Unlock() + m.applyGeneration++ + next := result.Clone() + next.Generation = m.applyGeneration + m.lastApply = next + return next.Clone() +} + +func NewDataPlaneLinkController(dp DataPlane) LinkController { + return dataPlaneLinkController{dp: dp} +} + +type dataPlaneLinkController struct { + dp DataPlane +} + +func (c dataPlaneLinkController) SetDeferWorkers(bool) {} + +func (c dataPlaneLinkController) PrepareLinkCycle() {} + +func (c dataPlaneLinkController) NotifyLinkCycle() { + if c.dp != nil { + c.dp.NotifyLinkCycle() + } +} + +func NewDataPlaneHAController(dp DataPlane) HAController { + return dataPlaneHAController{dp: dp} +} + +type dataPlaneHAController struct { + dp DataPlane +} + +func (c dataPlaneHAController) SetRGActive(ctx context.Context, rgID int, active bool) error { + if err := ctx.Err(); err != nil { + return err + } + if c.dp == nil { + return errors.New("nil dataplane") + } + return c.dp.UpdateRGActive(rgID, active) +} + +func (c dataPlaneHAController) SetHAWatchdog(ctx context.Context, rgID int, timestamp uint64) error { + if err := ctx.Err(); err != nil { + return err + } + if c.dp == nil { + return errors.New("nil dataplane") + } + return c.dp.UpdateHAWatchdog(rgID, timestamp) +} + +func (c dataPlaneHAController) SetFabricForwarding(ctx context.Context, id FabricID, info FabricFwdInfo) error { + if err := ctx.Err(); err != nil { + return err + } + if c.dp == nil { + return errors.New("nil dataplane") + } + if id == 1 { + return c.dp.UpdateFabricFwd1(info) + } + return c.dp.UpdateFabricFwd(info) +} + +func (c dataPlaneHAController) SyncFabricState(ctx context.Context) error { + if err := ctx.Err(); err != nil { + return err + } + if c.dp == nil { + return errors.New("nil dataplane") + } + c.dp.SyncFabricState() + return nil +} + +func NewDataPlaneTelemetry(dp DataPlane) Telemetry { + return dataPlaneTelemetry{dp: dp} +} + +type dataPlaneTelemetry struct { + dp DataPlane +} + +func (t dataPlaneTelemetry) NewEventSource() (EventSource, error) { + if t.dp == nil { + return nil, errors.New("nil dataplane") + } + return t.dp.NewEventSource() +} + +func (t dataPlaneTelemetry) GlobalCounter(index uint32) (uint64, error) { + if t.dp == nil { + return 0, errors.New("nil dataplane") + } + return t.dp.ReadGlobalCounter(index) +} + +func (t dataPlaneTelemetry) ReadFloodCounters(zoneID uint16) (FloodState, error) { + if t.dp == nil { + return FloodState{}, errors.New("nil dataplane") + } + return t.dp.ReadFloodCounters(zoneID) +} + +func (t dataPlaneTelemetry) InterfaceCounters(ifindex int) (InterfaceCounterValue, error) { + if t.dp == nil { + return InterfaceCounterValue{}, errors.New("nil dataplane") + } + return t.dp.ReadInterfaceCounters(ifindex) +} + +func (t dataPlaneTelemetry) ZoneCounters(zoneID uint16, direction int) (CounterValue, error) { + if t.dp == nil { + return CounterValue{}, errors.New("nil dataplane") + } + return t.dp.ReadZoneCounters(zoneID, direction) +} + +func (t dataPlaneTelemetry) PolicyCounters(policyID uint32) (CounterValue, error) { + if t.dp == nil { + return CounterValue{}, errors.New("nil dataplane") + } + return t.dp.ReadPolicyCounters(policyID) +} + +func (t dataPlaneTelemetry) FilterCounters(ruleIdx uint32) (CounterValue, error) { + if t.dp == nil { + return CounterValue{}, errors.New("nil dataplane") + } + return t.dp.ReadFilterCounters(ruleIdx) +} + +func (t dataPlaneTelemetry) NATRuleCounter(counterID uint32) (CounterValue, error) { + if t.dp == nil { + return CounterValue{}, errors.New("nil dataplane") + } + return t.dp.ReadNATRuleCounter(counterID) +} + +func (t dataPlaneTelemetry) NATPortCounter(poolID uint32) (uint64, error) { + if t.dp == nil { + return 0, errors.New("nil dataplane") + } + return t.dp.ReadNATPortCounter(poolID) +} + +func (t dataPlaneTelemetry) MapStats() []MapStats { + if t.dp == nil { + return nil + } + return t.dp.GetMapStats() +} diff --git a/pkg/dataplane/apply_test.go b/pkg/dataplane/apply_test.go new file mode 100644 index 000000000..2a2a428b6 --- /dev/null +++ b/pkg/dataplane/apply_test.go @@ -0,0 +1,160 @@ +package dataplane + +import ( + "context" + "reflect" + "testing" + + "github.com/psaab/xpf/pkg/networkd" +) + +func TestApplyResultFromCompileResultCarriesDisplayMetadata(t *testing.T) { + compileResult := &CompileResult{ + ZoneIDs: map[string]uint16{ + "trust": 1, + }, + ManagedInterfaces: []networkd.InterfaceConfig{ + {Name: "xe-0/0/0", Addresses: []string{"192.0.2.1/24"}}, + }, + FilterIDs: map[string]uint32{ + "inet:edge-in": 3, + }, + FilterSpans: map[string]FilterCounterSpan{ + "inet:edge-in": {FilterID: 3, RuleStart: 42, RuleCount: 7}, + }, + NATCounterIDs: map[string]uint16{ + "srcnat/rule-a": 9, + }, + PoolIDs: map[string]uint8{ + "snat-pool": 2, + }, + PolicyNames: map[uint32]string{ + 100: "trust/untrust/allow-all", + }, + AppNames: map[uint16]string{ + 5: "junos-http", + }, + PolicyScheduleRuleSlots: []PolicyScheduleRuleSlot{ + {PolicySetID: 1, RuleIndex: 0, RuleID: 100, PolicyName: "allow-all", SchedulerName: "biz-hours"}, + }, + } + + result := ApplyResultFromCompileResult(compileResult) + if result == nil { + t.Fatal("ApplyResultFromCompileResult returned nil") + } + if got := result.FilterIDs["inet:edge-in"]; got != 3 { + t.Fatalf("FilterIDs[inet:edge-in] = %d, want 3", got) + } + if got := result.ManagedInterfaces[0].Addresses[0]; got != "192.0.2.1/24" { + t.Fatalf("ManagedInterfaces[0].Addresses[0] = %q, want 192.0.2.1/24", got) + } + if got := result.FilterSpans["inet:edge-in"]; got != (FilterCounterSpan{FilterID: 3, RuleStart: 42, RuleCount: 7}) { + t.Fatalf("FilterSpans[inet:edge-in] = %+v", got) + } + if got := result.NATCounterIDs["srcnat/rule-a"]; got != 9 { + t.Fatalf("NATCounterIDs[srcnat/rule-a] = %d, want 9", got) + } + if !result.Capabilities.ForwardingSupported { + t.Fatal("Capabilities.ForwardingSupported = false, want true") + } + if got := result.PoolIDs["snat-pool"]; got != 2 { + t.Fatalf("PoolIDs[snat-pool] = %d, want 2", got) + } + if got := result.PolicyNames[100]; got != "trust/untrust/allow-all" { + t.Fatalf("PolicyNames[100] = %q, want trust/untrust/allow-all", got) + } + if got := result.AppNames[5]; got != "junos-http" { + t.Fatalf("AppNames[5] = %q, want junos-http", got) + } + if n := len(result.PolicyScheduleRuleSlots); n != 1 { + t.Fatalf("PolicyScheduleRuleSlots len = %d, want 1", n) + } + if got := result.PolicyScheduleRuleSlots[0].PolicyName; got != "allow-all" { + t.Fatalf("PolicyScheduleRuleSlots[0].PolicyName = %q, want allow-all", got) + } + + // Mutate source — verify ApplyResult has independent copies. + compileResult.ManagedInterfaces[0].Addresses[0] = "198.51.100.1/24" + compileResult.FilterIDs["inet:edge-in"] = 99 + compileResult.FilterSpans["inet:edge-in"] = FilterCounterSpan{} + compileResult.NATCounterIDs["srcnat/rule-a"] = 99 + compileResult.PoolIDs["snat-pool"] = 99 + compileResult.PolicyNames[100] = "mutated" + compileResult.AppNames[5] = "mutated" + compileResult.PolicyScheduleRuleSlots[0].PolicyName = "mutated" + if got := result.FilterIDs["inet:edge-in"]; got != 3 { + t.Fatalf("FilterIDs was not copied, got %d", got) + } + if got := result.ManagedInterfaces[0].Addresses[0]; got != "192.0.2.1/24" { + t.Fatalf("ManagedInterfaces nested Addresses was not copied, got %q", got) + } + if got := result.FilterSpans["inet:edge-in"].RuleStart; got != 42 { + t.Fatalf("FilterSpans was not copied, RuleStart = %d", got) + } + if got := result.NATCounterIDs["srcnat/rule-a"]; got != 9 { + t.Fatalf("NATCounterIDs was not copied, got %d", got) + } + if got := result.PoolIDs["snat-pool"]; got != 2 { + t.Fatalf("PoolIDs was not copied, got %d", got) + } + if got := result.PolicyNames[100]; got != "trust/untrust/allow-all" { + t.Fatalf("PolicyNames was not copied, got %q", got) + } + if got := result.AppNames[5]; got != "junos-http" { + t.Fatalf("AppNames was not copied, got %q", got) + } + if got := result.PolicyScheduleRuleSlots[0].PolicyName; got != "allow-all" { + t.Fatalf("PolicyScheduleRuleSlots was not copied, got %q", got) + } + + clone := result.Clone() + clone.ManagedInterfaces[0].Addresses[0] = "203.0.113.1/24" + clone.PolicyScheduleRuleSlots[0].PolicyName = "clone-mutated" + if got := result.ManagedInterfaces[0].Addresses[0]; got != "192.0.2.1/24" { + t.Fatalf("Clone shared ManagedInterfaces nested Addresses, original = %q", got) + } + if got := result.PolicyScheduleRuleSlots[0].PolicyName; got != "allow-all" { + t.Fatalf("Clone shared PolicyScheduleRuleSlots backing array, original PolicyName = %q", got) + } +} + +func TestRuntimeDataPlaneContractStaysSmallAndBackendNeutral(t *testing.T) { + typ := reflect.TypeOf((*RuntimeDataPlane)(nil)).Elem() + if got := typ.NumMethod(); got > 15 { + t.Fatalf("RuntimeDataPlane has %d methods, want <= 15", got) + } + + forbidden := map[string]bool{ + "AttachXDP": true, + "AttachTC": true, + "Map": true, + "SetZone": true, + "SetPolicyRule": true, + "SetSNATRule": true, + "SetDNATEntry": true, + "ClearDNATStatic": true, + "DeleteStaleIfaceZone": true, + "ReadFilterConfig": true, + "LastCompileResult": true, + "UpdatePolicyScheduleState": true, + } + for name := range forbidden { + if _, ok := typ.MethodByName(name); ok { + t.Fatalf("RuntimeDataPlane exposes BPF-shaped method %s", name) + } + } +} + +func TestApplyConfigHonorsCanceledContextBeforeCompile(t *testing.T) { + m := New() + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + if _, err := m.ApplyConfig(ctx, nil); err != context.Canceled { + t.Fatalf("ApplyConfig canceled error = %v, want context.Canceled", err) + } + if got := m.LastApplyResult(); got != nil { + t.Fatalf("LastApplyResult after canceled apply = %+v, want nil", got) + } +} diff --git a/pkg/dataplane/compiler.go b/pkg/dataplane/compiler.go index dce17ca60..f1ef463cb 100644 --- a/pkg/dataplane/compiler.go +++ b/pkg/dataplane/compiler.go @@ -32,6 +32,7 @@ type CompileResult struct { AppNames map[uint16]string // app_id -> application name (for structured logging) PolicySets int // number of policy sets created FilterIDs map[string]uint32 // "inet:name" or "inet6:name" -> filter_id + FilterSpans map[string]FilterCounterSpan PolicyScheduleRuleSlots []PolicyScheduleRuleSlot @@ -148,6 +149,7 @@ func CompileConfig(dp DataPlane, cfg *config.Config, isRecompile bool) (*Compile implicitSets: make(map[string]uint32), nextNATCounterID: 1, // 0 = no counter NATCounterIDs: make(map[string]uint16), + FilterSpans: make(map[string]FilterCounterSpan), Lo0FilterV4: 0xFFFFFFFF, // sentinel: no lo0 filter Lo0FilterV6: 0xFFFFFFFF, ifCache: make(map[string]*net.Interface), @@ -401,6 +403,7 @@ func (m *Manager) Compile(cfg *config.Config) (*CompileResult, error) { } } m.lastCompile = result + m.recordApplyResult(ApplyResultFromCompileResult(result)) return result, nil } diff --git a/pkg/dataplane/compiler_filter.go b/pkg/dataplane/compiler_filter.go index f40e9f9d9..86573fab9 100644 --- a/pkg/dataplane/compiler_filter.go +++ b/pkg/dataplane/compiler_filter.go @@ -95,6 +95,7 @@ func compileFirewallFilters(dp DataPlane, cfg *config.Config, result *CompileRes filterID := uint32(0) ruleIdx := uint32(0) filterIDs := make(map[string]uint32) // "inet:name" or "inet6:name" -> filter_id + filterSpans := make(map[string]FilterCounterSpan) // Compile inet filters (sorted for deterministic IDs) inetNames := make([]string, 0, len(cfg.Firewall.FiltersInet)) @@ -133,7 +134,13 @@ func compileFirewallFilters(dp DataPlane, cfg *config.Config, result *CompileRes if err := dp.SetFilterConfig(filterID, fcfg); err != nil { return fmt.Errorf("set filter config %s: %w", name, err) } - filterIDs["inet:"+name] = filterID + filterKey := "inet:" + name + filterIDs[filterKey] = filterID + filterSpans[filterKey] = FilterCounterSpan{ + FilterID: filterID, + RuleStart: startIdx, + RuleCount: numRules, + } slog.Info("compiled firewall filter", "name", name, "family", "inet", "terms", len(filter.Terms), "rules", numRules, "filter_id", filterID) @@ -177,7 +184,13 @@ func compileFirewallFilters(dp DataPlane, cfg *config.Config, result *CompileRes if err := dp.SetFilterConfig(filterID, fcfg); err != nil { return fmt.Errorf("set filter config %s: %w", name, err) } - filterIDs["inet6:"+name] = filterID + filterKey := "inet6:" + name + filterIDs[filterKey] = filterID + filterSpans[filterKey] = FilterCounterSpan{ + FilterID: filterID, + RuleStart: startIdx, + RuleCount: numRules, + } slog.Info("compiled firewall filter", "name", name, "family", "inet6", "terms", len(filter.Terms), "rules", numRules, "filter_id", filterID) @@ -323,6 +336,7 @@ func compileFirewallFilters(dp DataPlane, cfg *config.Config, result *CompileRes dp.ZeroStaleFilterConfigs(filterID) result.FilterIDs = filterIDs + result.FilterSpans = filterSpans // Resolve lo0 filter IDs for host-bound traffic filtering if cfg.System.Lo0FilterInputV4 != "" { diff --git a/pkg/dataplane/dataplane.go b/pkg/dataplane/dataplane.go index afab384c4..cc9137b61 100644 --- a/pkg/dataplane/dataplane.go +++ b/pkg/dataplane/dataplane.go @@ -11,6 +11,8 @@ import ( // Compile-time assertion that Manager implements DataPlane. var _ DataPlane = (*Manager)(nil) +var _ ConfigSink = (*Manager)(nil) +var _ RuntimeDataPlane = (*Manager)(nil) // Dataplane type constants used in system { dataplane-type ; }. const ( diff --git a/pkg/dataplane/dpdk/dpdk_cgo.go b/pkg/dataplane/dpdk/dpdk_cgo.go index 0aa740466..997b32a3c 100644 --- a/pkg/dataplane/dpdk/dpdk_cgo.go +++ b/pkg/dataplane/dpdk/dpdk_cgo.go @@ -210,6 +210,7 @@ func (m *Manager) Compile(cfg *config.Config) (*dataplane.CompileResult, error) return nil, err } m.lastCompile = result + m.recordApplyResult(dataplane.ApplyResultFromCompileResult(result)) return result, nil } diff --git a/pkg/dataplane/dpdk/dpdk_stub.go b/pkg/dataplane/dpdk/dpdk_stub.go index 3d406affd..2833773ad 100644 --- a/pkg/dataplane/dpdk/dpdk_stub.go +++ b/pkg/dataplane/dpdk/dpdk_stub.go @@ -64,6 +64,7 @@ func (m *Manager) Compile(cfg *config.Config) (*dataplane.CompileResult, error) return nil, err } m.lastCompile = result + m.recordApplyResult(dataplane.ApplyResultFromCompileResult(result)) return result, nil } @@ -309,13 +310,13 @@ func (m *Manager) SeedNATPortCounters() {} func (m *Manager) SeedSessionIDCounter(_ int) {} func (m *Manager) IncrementGlobalCounter(_ uint32, _ uint64) error { return nil } -func (m *Manager) ClearGlobalCounters() error { return nil } -func (m *Manager) ClearInterfaceCounters() error { return nil } -func (m *Manager) ClearZoneCounters() error { return nil } -func (m *Manager) ClearPolicyCounters() error { return nil } -func (m *Manager) ClearFilterCounters() error { return nil } -func (m *Manager) ClearAllCounters() error { return nil } -func (m *Manager) ClearNATRuleCounters() error { return nil } +func (m *Manager) ClearGlobalCounters() error { return nil } +func (m *Manager) ClearInterfaceCounters() error { return nil } +func (m *Manager) ClearZoneCounters() error { return nil } +func (m *Manager) ClearPolicyCounters() error { return nil } +func (m *Manager) ClearFilterCounters() error { return nil } +func (m *Manager) ClearAllCounters() error { return nil } +func (m *Manager) ClearNATRuleCounters() error { return nil } // --- Events --- @@ -324,7 +325,7 @@ func (m *Manager) NewEventSource() (dataplane.EventSource, error) { return nil, // --- FIB --- func (m *Manager) StartFIBSync(_ context.Context) {} -func (m *Manager) BumpFIBGeneration() uint32 { return 0 } +func (m *Manager) BumpFIBGeneration() uint32 { return 0 } func (m *Manager) NotifyLinkCycle() {} func (m *Manager) SyncFabricState() {} diff --git a/pkg/dataplane/dpdk/manager.go b/pkg/dataplane/dpdk/manager.go index fbc74ac97..2dbf4dc30 100644 --- a/pkg/dataplane/dpdk/manager.go +++ b/pkg/dataplane/dpdk/manager.go @@ -1,12 +1,19 @@ package dpdk import ( + "context" + "sync" + "github.com/cilium/ebpf" + "github.com/psaab/xpf/pkg/config" "github.com/psaab/xpf/pkg/dataplane" + dpruntime "github.com/psaab/xpf/pkg/dataplane/runtime" ) // Compile-time assertion. var _ dataplane.DataPlane = (*Manager)(nil) +var _ dataplane.ConfigSink = (*Manager)(nil) +var _ dataplane.RuntimeDataPlane = (*Manager)(nil) func init() { dataplane.RegisterBackend(dataplane.TypeDPDK, func() dataplane.DataPlane { @@ -16,10 +23,13 @@ func init() { // Manager is the DPDK dataplane backend (stub implementation). type Manager struct { - loaded bool - lastCompile *dataplane.CompileResult - persistentNAT *dataplane.PersistentNATTable - platform platformState + loaded bool + lastCompile *dataplane.CompileResult + applyMu sync.Mutex + applyGeneration uint64 + lastApply *dataplane.ApplyResult + persistentNAT *dataplane.PersistentNATTable + platform platformState } // New creates a new DPDK Manager. @@ -31,7 +41,66 @@ func New() *Manager { // --- Common methods (build-tag independent) --- -func (m *Manager) IsLoaded() bool { return m.loaded } -func (m *Manager) LastCompileResult() *dataplane.CompileResult { return m.lastCompile } +func (m *Manager) IsLoaded() bool { return m.loaded } +func (m *Manager) LastCompileResult() *dataplane.CompileResult { return m.lastCompile } func (m *Manager) GetPersistentNAT() *dataplane.PersistentNATTable { return m.persistentNAT } -func (m *Manager) Map(_ string) *ebpf.Map { return nil } +func (m *Manager) Map(_ string) *ebpf.Map { return nil } + +func (m *Manager) ApplyConfig(ctx context.Context, cfg *config.Config) (*dataplane.ApplyResult, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + if _, err := m.Compile(cfg); err != nil { + return nil, err + } + return m.LastApplyResult(), nil +} + +func (m *Manager) Start(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + return m.Load() +} + +func (m *Manager) Link() dataplane.LinkController { + return dataplane.NewDataPlaneLinkController(m) +} + +func (m *Manager) HA() dataplane.HAController { + return dataplane.NewDataPlaneHAController(m) +} + +func (m *Manager) Sessions() dataplane.SessionStore { + return dataplane.NewDataPlaneSessionStore(m) +} + +func (m *Manager) SessionDeltas() dpruntime.SessionDeltaSource { + return nil +} + +func (m *Manager) Telemetry() dataplane.Telemetry { + return dataplane.NewDataPlaneTelemetry(m) +} + +func (m *Manager) LastApplyResult() *dataplane.ApplyResult { + m.applyMu.Lock() + defer m.applyMu.Unlock() + return m.lastApply.Clone() +} + +func (m *Manager) recordApplyResult(result *dataplane.ApplyResult) { + if result == nil { + return + } + m.applyMu.Lock() + defer m.applyMu.Unlock() + m.applyGeneration++ + next := result.Clone() + next.Generation = m.applyGeneration + m.lastApply = next +} diff --git a/pkg/dataplane/loader.go b/pkg/dataplane/loader.go index d983cd4dc..ebca53425 100644 --- a/pkg/dataplane/loader.go +++ b/pkg/dataplane/loader.go @@ -37,18 +37,21 @@ const linkPinPath = "/sys/fs/bpf/xpf/links" // Manager manages the eBPF dataplane: programs, maps, and attachments. type Manager struct { - loaded bool - programs map[string]*ebpf.Program - maps map[string]*ebpf.Map - xdpLinks map[int]link.Link - tcLinks map[int]link.Link - lastCompile *CompileResult - PersistentNAT *PersistentNATTable - EnableCPUMap bool // Enable cpumap multi-CPU distribution (adds startup overhead) - XDPEntryProg string - VlanSubInterfaces map[int]bool // VLAN sub-interface ifindexes (skip XDP swap for these) - mu sync.Mutex // protects userspaceCounterOffsets - userspaceCounterOffsets map[uint32]uint64 // userspace counter deltas merged in ReadGlobalCounter + loaded bool + programs map[string]*ebpf.Program + maps map[string]*ebpf.Map + xdpLinks map[int]link.Link + tcLinks map[int]link.Link + lastCompile *CompileResult + applyMu sync.Mutex + applyGeneration uint64 + lastApply *ApplyResult + PersistentNAT *PersistentNATTable + EnableCPUMap bool // Enable cpumap multi-CPU distribution (adds startup overhead) + XDPEntryProg string + VlanSubInterfaces map[int]bool // VLAN sub-interface ifindexes (skip XDP swap for these) + mu sync.Mutex // protects userspaceCounterOffsets + userspaceCounterOffsets map[uint32]uint64 // userspace counter deltas merged in ReadGlobalCounter // #863: refcount of XDP-attached ifindexes that "claim" the // IFACE_FLAG_XDP_ATTACHED bit on each iface_zone_map entry. @@ -65,10 +68,10 @@ type Manager struct { // New creates a new dataplane Manager. func New() *Manager { return &Manager{ - programs: make(map[string]*ebpf.Program), - maps: make(map[string]*ebpf.Map), - xdpLinks: make(map[int]link.Link), - tcLinks: make(map[int]link.Link), + programs: make(map[string]*ebpf.Program), + maps: make(map[string]*ebpf.Map), + xdpLinks: make(map[int]link.Link), + tcLinks: make(map[int]link.Link), PersistentNAT: NewPersistentNATTable(), XDPEntryProg: "xdp_main_prog", VlanSubInterfaces: make(map[int]bool), @@ -103,10 +106,11 @@ func (m *Manager) IsLoaded() bool { // than punishing transient netlink hiccups. // // Kernel XDP attach modes (nl/link_linux.go): -// XDP_ATTACHED_NONE = 0 — no prog attached -// XDP_ATTACHED_DRV = 1 — driver (native) mode -// XDP_ATTACHED_SKB = 2 — generic (skb) mode -// XDP_ATTACHED_HW = 3 — hw offload +// +// XDP_ATTACHED_NONE = 0 — no prog attached +// XDP_ATTACHED_DRV = 1 — driver (native) mode +// XDP_ATTACHED_SKB = 2 — generic (skb) mode +// XDP_ATTACHED_HW = 3 — hw offload func xdpAttachModeMatches(ifindex int, wantGeneric bool) bool { l, err := netlink.LinkByIndex(ifindex) if err != nil || l == nil { diff --git a/pkg/dataplane/runtime/import_canary_test.go b/pkg/dataplane/runtime/import_canary_test.go new file mode 100644 index 000000000..504c02931 --- /dev/null +++ b/pkg/dataplane/runtime/import_canary_test.go @@ -0,0 +1,67 @@ +package runtime + +import ( + "go/parser" + "go/token" + "os" + "path/filepath" + "strings" + "testing" +) + +// TestRuntimePackageDoesNotImportBackendPackages enforces that the runtime +// package is backend-neutral. It must not reference any concrete dataplane +// backend (userspace, dpdk), the root pkg/dataplane package (which contains +// BPF-shaped types and would cause an import cycle once pkg/dataplane imports +// pkg/dataplane/runtime), or the cilium/ebpf library. +func TestRuntimePackageDoesNotImportBackendPackages(t *testing.T) { + t.Parallel() + + entries, err := os.ReadDir(".") + if err != nil { + t.Fatalf("read runtime package: %v", err) + } + for _, entry := range entries { + if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".go") { + continue + } + fset := token.NewFileSet() + file, err := parser.ParseFile(fset, filepath.Join(".", entry.Name()), nil, parser.ImportsOnly) + if err != nil { + t.Fatalf("parse %s: %v", entry.Name(), err) + } + for _, imp := range file.Imports { + path := strings.Trim(imp.Path.Value, `"`) + checkForbiddenImport(t, entry.Name(), path) + } + } +} + +// checkForbiddenImport fails the test if path is a forbidden backend import. +func checkForbiddenImport(t *testing.T, filename, path string) { + t.Helper() + + // Backend sub-packages — never allowed. + backendSubPkgs := []string{ + "github.com/psaab/xpf/pkg/dataplane/userspace", + "github.com/psaab/xpf/pkg/dataplane/dpdk", + } + for _, forbidden := range backendSubPkgs { + if path == forbidden || strings.HasPrefix(path, forbidden+"/") { + t.Fatalf("runtime package imports forbidden backend sub-package in %s: %s", filename, path) + } + } + + // Root pkg/dataplane — importing it would cause a cycle once pkg/dataplane + // imports pkg/dataplane/runtime, and it carries BPF-shaped types. + const rootDP = "github.com/psaab/xpf/pkg/dataplane" + if path == rootDP { + t.Fatalf("runtime package imports root dataplane package in %s (would create import cycle)", filename) + } + + // cilium/ebpf — BPF library; its presence in the runtime package would + // break non-eBPF backends that import runtime. + if path == "github.com/cilium/ebpf" || strings.HasPrefix(path, "github.com/cilium/ebpf/") { + t.Fatalf("runtime package imports cilium/ebpf in %s: %s", filename, path) + } +} diff --git a/pkg/dataplane/runtime/session_delta.go b/pkg/dataplane/runtime/session_delta.go new file mode 100644 index 000000000..4df2e17b0 --- /dev/null +++ b/pkg/dataplane/runtime/session_delta.go @@ -0,0 +1,85 @@ +package runtime + +import "time" + +type SessionFamily string + +const ( + SessionFamilyInet SessionFamily = "inet" + SessionFamilyInet6 SessionFamily = "inet6" +) + +type SessionDeltaReason string + +const ( + SessionDeltaReasonOpen SessionDeltaReason = "open" + SessionDeltaReasonClose SessionDeltaReason = "close" + SessionDeltaReasonUpdate SessionDeltaReason = "update" +) + +type SessionIdentity struct { + Protocol uint8 + SrcIP string + DstIP string + SrcPort uint16 + DstPort uint16 + IngressZone string + EgressZone string + IngressZoneID uint16 + EgressZoneID uint16 +} + +type SessionState struct { + Disposition string + Origin string + EgressIfindex int + TXIfindex int + TunnelEndpointID uint16 + TXVLANID uint16 + NextHop string + NeighborMAC string + SrcMAC string + NATSrcIP string + NATDstIP string + NATSrcPort uint16 + NATDstPort uint16 + FabricRedirect bool + FabricIngress bool +} + +type RuntimeStatus struct { + Enabled bool + ForwardingArmed bool + ForwardingSupported bool + UnsupportedReasons []string + LastSnapshotGeneration uint64 + LastFIBGeneration uint32 +} + +type SessionDelta struct { + Timestamp time.Time + Slot uint32 + QueueID uint32 + WorkerID uint32 + Interface string + Ifindex int + Family SessionFamily + Key SessionIdentity + Value SessionState + OwnerRGID int + Reason SessionDeltaReason + Generation uint64 +} + +type SessionDeltaSnapshot struct { + Deltas []SessionDelta + Status RuntimeStatus + BackendEpoch uint64 + Truncated bool +} + +type SessionDeltaSource interface { + DrainSessionDeltas(max uint32) (SessionDeltaSnapshot, error) + ExportOwnerRGSessions(rgIDs []int, max uint32) (SessionDeltaSnapshot, error) + SessionSyncSweepProfile() (enabled bool, activeInterval, idleInterval time.Duration) +} diff --git a/pkg/dataplane/session_store.go b/pkg/dataplane/session_store.go new file mode 100644 index 000000000..707fffc49 --- /dev/null +++ b/pkg/dataplane/session_store.go @@ -0,0 +1,244 @@ +package dataplane + +import ( + "errors" + + dpruntime "github.com/psaab/xpf/pkg/dataplane/runtime" +) + +type DeleteReason string + +const ( + DeleteReasonClusterStale DeleteReason = "cluster-stale" + DeleteReasonGCExpired DeleteReason = "gc-expired" +) + +type SessionStore interface { + ForEachV4(func(SessionKey, SessionValue) bool) error + ForEachV6(func(SessionKeyV6, SessionValueV6) bool) error + GetV4(SessionKey) (SessionValue, error) + GetV6(SessionKeyV6) (SessionValueV6, error) + PutClusterSyncedV4(SessionKey, SessionValue) error + PutClusterSyncedV6(SessionKeyV6, SessionValueV6) error + DeleteV4(SessionKey) error + DeleteV6(SessionKeyV6) error + DeleteWithCompanionsV4(SessionKey, DeleteReason) error + DeleteWithCompanionsV6(SessionKeyV6, DeleteReason) error + ReconcileClusterBulk(ClusterBulkReconcileInput) (ClusterBulkReconcileResult, error) + SessionDeltas() dpruntime.SessionDeltaSource + Count() (v4, v6 int) + Clear() (v4, v6 int, err error) +} + +type ClusterBulkReconcileInput struct { + ReceivedV4 map[SessionKey]struct{} + ReceivedV6 map[SessionKeyV6]struct{} + ShouldSyncZone func(uint16) bool + DeleteReason DeleteReason +} + +type ClusterBulkReconcileResult struct { + StaleV4 int + StaleV6 int + DeletedV4 int + DeletedV6 int +} + +type clusterSyncedSessionInstaller interface { + SetClusterSyncedSessionV4(SessionKey, SessionValue) error + SetClusterSyncedSessionV6(SessionKeyV6, SessionValueV6) error +} + +type dataPlaneSessionStore struct { + dp DataPlane +} + +func NewDataPlaneSessionStore(dp DataPlane) SessionStore { + return dataPlaneSessionStore{dp: dp} +} + +func (s dataPlaneSessionStore) SessionDeltas() dpruntime.SessionDeltaSource { + return nil +} + +func (s dataPlaneSessionStore) ForEachV4(fn func(SessionKey, SessionValue) bool) error { + if s.dp == nil { + return errors.New("nil dataplane") + } + return s.dp.IterateSessions(fn) +} + +func (s dataPlaneSessionStore) ForEachV6(fn func(SessionKeyV6, SessionValueV6) bool) error { + if s.dp == nil { + return errors.New("nil dataplane") + } + return s.dp.IterateSessionsV6(fn) +} + +func (s dataPlaneSessionStore) GetV4(key SessionKey) (SessionValue, error) { + if s.dp == nil { + return SessionValue{}, errors.New("nil dataplane") + } + return s.dp.GetSessionV4(key) +} + +func (s dataPlaneSessionStore) GetV6(key SessionKeyV6) (SessionValueV6, error) { + if s.dp == nil { + return SessionValueV6{}, errors.New("nil dataplane") + } + return s.dp.GetSessionV6(key) +} + +func (s dataPlaneSessionStore) PutClusterSyncedV4(key SessionKey, val SessionValue) error { + if s.dp == nil { + return errors.New("nil dataplane") + } + if installer, ok := s.dp.(clusterSyncedSessionInstaller); ok { + return installer.SetClusterSyncedSessionV4(key, val) + } + return s.dp.SetSessionV4(key, val) +} + +func (s dataPlaneSessionStore) PutClusterSyncedV6(key SessionKeyV6, val SessionValueV6) error { + if s.dp == nil { + return errors.New("nil dataplane") + } + if installer, ok := s.dp.(clusterSyncedSessionInstaller); ok { + return installer.SetClusterSyncedSessionV6(key, val) + } + return s.dp.SetSessionV6(key, val) +} + +func (s dataPlaneSessionStore) DeleteV4(key SessionKey) error { + if s.dp == nil { + return errors.New("nil dataplane") + } + return s.dp.DeleteSession(key) +} + +func (s dataPlaneSessionStore) DeleteV6(key SessionKeyV6) error { + if s.dp == nil { + return errors.New("nil dataplane") + } + return s.dp.DeleteSessionV6(key) +} + +func (s dataPlaneSessionStore) DeleteWithCompanionsV4(key SessionKey, _ DeleteReason) error { + if s.dp == nil { + return errors.New("nil dataplane") + } + var errs []error + if val, err := s.dp.GetSessionV4(key); err == nil { + if val.ReverseKey.Protocol != 0 { + errs = append(errs, s.dp.DeleteSession(val.ReverseKey)) + } + if val.Flags&SessFlagSNAT != 0 && val.Flags&SessFlagStaticNAT == 0 { + errs = append(errs, s.dp.DeleteDNATEntry(DNATKey{ + Protocol: key.Protocol, + DstIP: val.NATSrcIP, + DstPort: val.NATSrcPort, + })) + } + } + errs = append(errs, s.dp.DeleteSession(key)) + return errors.Join(errs...) +} + +func (s dataPlaneSessionStore) DeleteWithCompanionsV6(key SessionKeyV6, _ DeleteReason) error { + if s.dp == nil { + return errors.New("nil dataplane") + } + var errs []error + if val, err := s.dp.GetSessionV6(key); err == nil { + if val.ReverseKey.Protocol != 0 { + errs = append(errs, s.dp.DeleteSessionV6(val.ReverseKey)) + } + if val.Flags&SessFlagSNAT != 0 && val.Flags&SessFlagStaticNAT == 0 { + errs = append(errs, s.dp.DeleteDNATEntryV6(DNATKeyV6{ + Protocol: key.Protocol, + DstIP: val.NATSrcIP, + DstPort: val.NATSrcPort, + })) + } + } + errs = append(errs, s.dp.DeleteSessionV6(key)) + return errors.Join(errs...) +} + +func (s dataPlaneSessionStore) ReconcileClusterBulk(input ClusterBulkReconcileInput) (ClusterBulkReconcileResult, error) { + var result ClusterBulkReconcileResult + if s.dp == nil { + return result, errors.New("nil dataplane") + } + if input.ShouldSyncZone == nil { + return result, nil + } + reason := input.DeleteReason + if reason == "" { + reason = DeleteReasonClusterStale + } + + var staleV4 []SessionKey + if err := s.ForEachV4(func(key SessionKey, val SessionValue) bool { + if val.IsReverse != 0 { + return true + } + if input.ShouldSyncZone(val.IngressZone) { + return true + } + if _, ok := input.ReceivedV4[key]; !ok { + staleV4 = append(staleV4, key) + } + return true + }); err != nil { + return result, err + } + result.StaleV4 = len(staleV4) + + var errs []error + for _, key := range staleV4 { + if err := s.DeleteWithCompanionsV4(key, reason); err != nil { + errs = append(errs, err) + } + result.DeletedV4++ + } + + var staleV6 []SessionKeyV6 + if err := s.ForEachV6(func(key SessionKeyV6, val SessionValueV6) bool { + if val.IsReverse != 0 { + return true + } + if input.ShouldSyncZone(val.IngressZone) { + return true + } + if _, ok := input.ReceivedV6[key]; !ok { + staleV6 = append(staleV6, key) + } + return true + }); err != nil { + return result, errors.Join(append(errs, err)...) + } + result.StaleV6 = len(staleV6) + + for _, key := range staleV6 { + if err := s.DeleteWithCompanionsV6(key, reason); err != nil { + errs = append(errs, err) + } + result.DeletedV6++ + } + return result, errors.Join(errs...) +} + +func (s dataPlaneSessionStore) Count() (int, int) { + if s.dp == nil { + return 0, 0 + } + return s.dp.SessionCount() +} + +func (s dataPlaneSessionStore) Clear() (int, int, error) { + if s.dp == nil { + return 0, 0, errors.New("nil dataplane") + } + return s.dp.ClearAllSessions() +} diff --git a/pkg/dataplane/session_store_test.go b/pkg/dataplane/session_store_test.go new file mode 100644 index 000000000..711bde952 --- /dev/null +++ b/pkg/dataplane/session_store_test.go @@ -0,0 +1,140 @@ +package dataplane + +import ( + "fmt" + "testing" +) + +type sessionStoreTestDP struct { + DataPlane + v4 map[SessionKey]SessionValue + v6 map[SessionKeyV6]SessionValueV6 + deletedDNAT []DNATKey + deletedDNAT6 []DNATKeyV6 +} + +func (m *sessionStoreTestDP) IterateSessions(fn func(SessionKey, SessionValue) bool) error { + for key, val := range m.v4 { + if !fn(key, val) { + break + } + } + return nil +} + +func (m *sessionStoreTestDP) IterateSessionsV6(fn func(SessionKeyV6, SessionValueV6) bool) error { + for key, val := range m.v6 { + if !fn(key, val) { + break + } + } + return nil +} + +func (m *sessionStoreTestDP) GetSessionV4(key SessionKey) (SessionValue, error) { + if val, ok := m.v4[key]; ok { + return val, nil + } + return SessionValue{}, fmt.Errorf("not found") +} + +func (m *sessionStoreTestDP) GetSessionV6(key SessionKeyV6) (SessionValueV6, error) { + if val, ok := m.v6[key]; ok { + return val, nil + } + return SessionValueV6{}, fmt.Errorf("not found") +} + +func (m *sessionStoreTestDP) DeleteSession(key SessionKey) error { + delete(m.v4, key) + return nil +} + +func (m *sessionStoreTestDP) DeleteSessionV6(key SessionKeyV6) error { + delete(m.v6, key) + return nil +} + +func (m *sessionStoreTestDP) DeleteDNATEntry(key DNATKey) error { + m.deletedDNAT = append(m.deletedDNAT, key) + return nil +} + +func (m *sessionStoreTestDP) DeleteDNATEntryV6(key DNATKeyV6) error { + m.deletedDNAT6 = append(m.deletedDNAT6, key) + return nil +} + +func TestDeleteWithCompanionsV4RemovesReverseAndDNAT(t *testing.T) { + forward := SessionKey{Protocol: 6, SrcIP: [4]byte{10, 0, 0, 1}, DstIP: [4]byte{10, 0, 0, 2}, SrcPort: 1234, DstPort: 80} + reverse := SessionKey{Protocol: 6, SrcIP: [4]byte{10, 0, 0, 2}, DstIP: [4]byte{10, 0, 0, 1}, SrcPort: 80, DstPort: 1234} + dp := &sessionStoreTestDP{ + v4: map[SessionKey]SessionValue{ + forward: { + ReverseKey: reverse, + Flags: SessFlagSNAT, + NATSrcIP: 0x0a0200c0, + NATSrcPort: 40000, + }, + reverse: {IsReverse: 1}, + }, + } + store := NewDataPlaneSessionStore(dp) + + if err := store.DeleteWithCompanionsV4(forward, DeleteReasonClusterStale); err != nil { + t.Fatalf("DeleteWithCompanionsV4: %v", err) + } + if _, ok := dp.v4[forward]; ok { + t.Fatal("forward session still present") + } + if _, ok := dp.v4[reverse]; ok { + t.Fatal("reverse session still present") + } + wantDNAT := DNATKey{Protocol: 6, DstIP: 0x0a0200c0, DstPort: 40000} + if len(dp.deletedDNAT) != 1 || dp.deletedDNAT[0] != wantDNAT { + t.Fatalf("deleted DNAT = %+v, want [%+v]", dp.deletedDNAT, wantDNAT) + } +} + +func TestDataPlaneSessionStoreReportsNoRuntimeDeltaSource(t *testing.T) { + store := NewDataPlaneSessionStore(&sessionStoreTestDP{}) + if got := store.SessionDeltas(); got != nil { + t.Fatalf("SessionDeltas() = %T, want nil for generic dataplane store", got) + } +} + +func TestDeleteWithCompanionsV6RemovesReverseAndDNAT(t *testing.T) { + forward := SessionKeyV6{Protocol: 17, SrcPort: 1234, DstPort: 53} + forward.SrcIP[15] = 1 + forward.DstIP[15] = 2 + reverse := SessionKeyV6{Protocol: 17, SrcPort: 53, DstPort: 1234} + reverse.SrcIP[15] = 2 + reverse.DstIP[15] = 1 + natIP := [16]byte{0x20, 0x01, 0x0d, 0xb8} + dp := &sessionStoreTestDP{ + v6: map[SessionKeyV6]SessionValueV6{ + forward: { + ReverseKey: reverse, + Flags: SessFlagSNAT, + NATSrcIP: natIP, + NATSrcPort: 53000, + }, + reverse: {IsReverse: 1}, + }, + } + store := NewDataPlaneSessionStore(dp) + + if err := store.DeleteWithCompanionsV6(forward, DeleteReasonClusterStale); err != nil { + t.Fatalf("DeleteWithCompanionsV6: %v", err) + } + if _, ok := dp.v6[forward]; ok { + t.Fatal("forward session still present") + } + if _, ok := dp.v6[reverse]; ok { + t.Fatal("reverse session still present") + } + wantDNAT := DNATKeyV6{Protocol: 17, DstIP: natIP, DstPort: 53000} + if len(dp.deletedDNAT6) != 1 || dp.deletedDNAT6[0] != wantDNAT { + t.Fatalf("deleted DNATv6 = %+v, want [%+v]", dp.deletedDNAT6, wantDNAT) + } +} diff --git a/pkg/dataplane/userspace/manager.go b/pkg/dataplane/userspace/manager.go index a532a5870..eded17abd 100644 --- a/pkg/dataplane/userspace/manager.go +++ b/pkg/dataplane/userspace/manager.go @@ -20,11 +20,14 @@ import ( "github.com/psaab/xpf/pkg/config" "github.com/psaab/xpf/pkg/dataplane" + dpruntime "github.com/psaab/xpf/pkg/dataplane/runtime" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" ) var _ dataplane.DataPlane = (*Manager)(nil) +var _ dataplane.ConfigSink = (*Manager)(nil) +var _ dataplane.RuntimeDataPlane = (*Manager)(nil) var ErrPolicySchedulerProtocolIncompatible = errors.New("userspace policy scheduler snapshot protocol incompatible") @@ -69,6 +72,7 @@ type Manager struct { syncCancel context.CancelFunc lastStatus ProcessStatus lastSnapshot *ConfigSnapshot + lastApply *dataplane.ApplyResult policySchedulerActive map[string]bool haGroups map[int]HAGroupStatus lastIngressIfaces []uint32 @@ -159,6 +163,169 @@ func New() *Manager { } } +func (m *Manager) ApplyConfig(ctx context.Context, cfg *config.Config) (*dataplane.ApplyResult, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + if _, err := m.Compile(cfg); err != nil { + return nil, err + } + return m.LastApplyResult(), nil +} + +func (m *Manager) LastApplyResult() *dataplane.ApplyResult { + m.mu.Lock() + defer m.mu.Unlock() + return m.lastApply.Clone() +} + +func (m *Manager) RuntimeSessionDeltaSource() dpruntime.SessionDeltaSource { + return runtimeSessionDeltaSource{manager: m} +} + +func (m *Manager) Start(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + return m.Load() +} + +func (m *Manager) Link() dataplane.LinkController { + return userspaceLinkController{manager: m} +} + +func (m *Manager) HA() dataplane.HAController { + return userspaceHAController{manager: m} +} + +func (m *Manager) Sessions() dataplane.SessionStore { + return userspaceSessionStore{ + SessionStore: dataplane.NewDataPlaneSessionStore(m), + source: m.RuntimeSessionDeltaSource(), + } +} + +func (m *Manager) SessionDeltas() dpruntime.SessionDeltaSource { + return m.RuntimeSessionDeltaSource() +} + +func (m *Manager) Telemetry() dataplane.Telemetry { + return dataplane.NewDataPlaneTelemetry(m) +} + +type userspaceLinkController struct { + manager *Manager +} + +func (c userspaceLinkController) SetDeferWorkers(v bool) { + if c.manager != nil { + c.manager.SetDeferWorkers(v) + } +} + +func (c userspaceLinkController) PrepareLinkCycle() { + if c.manager != nil { + c.manager.PrepareLinkCycle() + } +} + +func (c userspaceLinkController) NotifyLinkCycle() { + if c.manager != nil { + c.manager.NotifyLinkCycle() + } +} + +type userspaceHAOps interface { + UpdateRGActive(int, bool) error + UpdateHAWatchdog(int, uint64) error + UpdateFabricFwd(dataplane.FabricFwdInfo) error + UpdateFabricFwd1(dataplane.FabricFwdInfo) error + SyncFabricState() +} + +type userspaceHAController struct { + manager userspaceHAOps +} + +func (c userspaceHAController) SetRGActive(ctx context.Context, rgID int, active bool) error { + if err := ctx.Err(); err != nil { + return err + } + if c.manager == nil { + return errors.New("nil userspace dataplane") + } + return c.manager.UpdateRGActive(rgID, active) +} + +func (c userspaceHAController) SetHAWatchdog(ctx context.Context, rgID int, timestamp uint64) error { + if err := ctx.Err(); err != nil { + return err + } + if c.manager == nil { + return errors.New("nil userspace dataplane") + } + return c.manager.UpdateHAWatchdog(rgID, timestamp) +} + +func (c userspaceHAController) SetFabricForwarding(ctx context.Context, id dataplane.FabricID, info dataplane.FabricFwdInfo) error { + if err := ctx.Err(); err != nil { + return err + } + if c.manager == nil { + return errors.New("nil userspace dataplane") + } + var err error + if id == 1 { + err = c.manager.UpdateFabricFwd1(info) + } else { + err = c.manager.UpdateFabricFwd(info) + } + if err != nil { + return err + } + // The map update is committed at this point. Always push helper fabric + // state after a successful fabric0 or fabric1 update so RuntimeDataPlane.HA + // preserves the same "fresh helper view" contract for every fabric slot. + c.manager.SyncFabricState() + return nil +} + +func (c userspaceHAController) SyncFabricState(ctx context.Context) error { + if err := ctx.Err(); err != nil { + return err + } + if c.manager == nil { + return errors.New("nil userspace dataplane") + } + c.manager.SyncFabricState() + return nil +} + +type userspaceSessionStore struct { + dataplane.SessionStore + source dpruntime.SessionDeltaSource +} + +func (s userspaceSessionStore) SessionDeltas() dpruntime.SessionDeltaSource { + return s.source +} + +func (m *Manager) recordApplyResultLocked(result *dataplane.ApplyResult, caps UserspaceCapabilities, generation uint64) { + if result == nil { + return + } + result.Capabilities = dataplane.Capabilities{ + ForwardingSupported: caps.ForwardingSupported, + UnsupportedReasons: append([]string(nil), caps.UnsupportedReasons...), + } + result.Generation = generation + m.lastApply = result.Clone() +} + func copyPolicySchedulerActiveState(activeState map[string]bool) map[string]bool { if activeState == nil { return nil @@ -349,6 +516,7 @@ func (m *Manager) Compile(cfg *config.Config) (*dataplane.CompileResult, error) } m.lastSnapshot = snap m.cfg = ucfg + m.recordApplyResultLocked(dataplane.ApplyResultFromCompileResult(result), caps, snap.Generation) slog.Info( "userspace: deferring snapshot publish during XSK startup", "generation", snap.Generation, @@ -421,6 +589,7 @@ func (m *Manager) Compile(cfg *config.Config) (*dataplane.CompileResult, error) } m.ensureStatusLoopLocked() m.cfg = ucfg + m.recordApplyResultLocked(dataplane.ApplyResultFromCompileResult(result), caps, snap.Generation) return result, nil } diff --git a/pkg/dataplane/userspace/runtime_delta.go b/pkg/dataplane/userspace/runtime_delta.go new file mode 100644 index 000000000..3f125ec7c --- /dev/null +++ b/pkg/dataplane/userspace/runtime_delta.go @@ -0,0 +1,128 @@ +package userspace + +import ( + "strings" + "time" + + dpruntime "github.com/psaab/xpf/pkg/dataplane/runtime" +) + +var _ dpruntime.SessionDeltaSource = runtimeSessionDeltaSource{} + +type runtimeSessionDeltaSource struct { + manager *Manager +} + +func (s runtimeSessionDeltaSource) DrainSessionDeltas(max uint32) (dpruntime.SessionDeltaSnapshot, error) { + deltas, status, err := s.manager.DrainSessionDeltas(max) + if err != nil { + return runtimeSessionDeltaSnapshot(deltas, status, max), err + } + return runtimeSessionDeltaSnapshot(deltas, status, max), nil +} + +func (s runtimeSessionDeltaSource) ExportOwnerRGSessions(rgIDs []int, max uint32) (dpruntime.SessionDeltaSnapshot, error) { + deltas, status, err := s.manager.ExportOwnerRGSessions(rgIDs, max) + if err != nil { + return runtimeSessionDeltaSnapshot(deltas, status, max), err + } + return runtimeSessionDeltaSnapshot(deltas, status, max), nil +} + +func (s runtimeSessionDeltaSource) SessionSyncSweepProfile() (bool, time.Duration, time.Duration) { + return s.manager.SessionSyncSweepProfile() +} + +func runtimeSessionDeltaSnapshot(deltas []SessionDeltaInfo, status ProcessStatus, max uint32) dpruntime.SessionDeltaSnapshot { + out := dpruntime.SessionDeltaSnapshot{ + Deltas: make([]dpruntime.SessionDelta, 0, len(deltas)), + Status: runtimeStatus(status), + BackendEpoch: runtimeBackendEpoch(status), + Truncated: max > 0 && uint32(len(deltas)) >= max, + } + for _, delta := range deltas { + out.Deltas = append(out.Deltas, runtimeSessionDelta(delta, status.LastSnapshotGeneration)) + } + return out +} + +func runtimeStatus(status ProcessStatus) dpruntime.RuntimeStatus { + return dpruntime.RuntimeStatus{ + Enabled: status.Enabled, + ForwardingArmed: status.ForwardingArmed, + ForwardingSupported: status.Capabilities.ForwardingSupported, + UnsupportedReasons: append([]string(nil), status.Capabilities.UnsupportedReasons...), + LastSnapshotGeneration: status.LastSnapshotGeneration, + LastFIBGeneration: status.LastFIBGeneration, + } +} + +func runtimeBackendEpoch(status ProcessStatus) uint64 { + if status.StartedAt.IsZero() { + return 0 + } + return uint64(status.StartedAt.UnixNano()) +} + +func runtimeSessionDelta(delta SessionDeltaInfo, generation uint64) dpruntime.SessionDelta { + return dpruntime.SessionDelta{ + Timestamp: delta.Timestamp, + Slot: delta.Slot, + QueueID: delta.QueueID, + WorkerID: delta.WorkerID, + Interface: delta.Interface, + Ifindex: delta.Ifindex, + Family: runtimeSessionFamily(delta.AddrFamily), + Key: dpruntime.SessionIdentity{ + Protocol: delta.Protocol, + SrcIP: delta.SrcIP, + DstIP: delta.DstIP, + SrcPort: delta.SrcPort, + DstPort: delta.DstPort, + IngressZone: delta.IngressZone, + EgressZone: delta.EgressZone, + IngressZoneID: delta.IngressZoneID, + EgressZoneID: delta.EgressZoneID, + }, + Value: dpruntime.SessionState{ + Disposition: delta.Disposition, + Origin: delta.Origin, + EgressIfindex: delta.EgressIfindex, + TXIfindex: delta.TXIfindex, + TunnelEndpointID: delta.TunnelEndpointID, + TXVLANID: delta.TXVLANID, + NextHop: delta.NextHop, + NeighborMAC: delta.NeighborMAC, + SrcMAC: delta.SrcMAC, + NATSrcIP: delta.NATSrcIP, + NATDstIP: delta.NATDstIP, + NATSrcPort: delta.NATSrcPort, + NATDstPort: delta.NATDstPort, + FabricRedirect: delta.FabricRedirect, + FabricIngress: delta.FabricIngress, + }, + OwnerRGID: delta.OwnerRGID, + Reason: runtimeSessionDeltaReason(delta.Event), + Generation: generation, + } +} + +func runtimeSessionFamily(family uint8) dpruntime.SessionFamily { + switch family { + case 6, 10: + return dpruntime.SessionFamilyInet6 + default: + return dpruntime.SessionFamilyInet + } +} + +func runtimeSessionDeltaReason(event string) dpruntime.SessionDeltaReason { + switch strings.ToLower(event) { + case "close", "closed", "delete", "deleted": + return dpruntime.SessionDeltaReasonClose + case "update", "updated": + return dpruntime.SessionDeltaReasonUpdate + default: + return dpruntime.SessionDeltaReasonOpen + } +} diff --git a/pkg/dataplane/userspace/runtime_delta_test.go b/pkg/dataplane/userspace/runtime_delta_test.go new file mode 100644 index 000000000..de75cc52e --- /dev/null +++ b/pkg/dataplane/userspace/runtime_delta_test.go @@ -0,0 +1,210 @@ +package userspace + +import ( + "context" + "errors" + "reflect" + "testing" + "time" + + "github.com/psaab/xpf/pkg/dataplane" + dpruntime "github.com/psaab/xpf/pkg/dataplane/runtime" +) + +func TestRuntimeSessionDeltaSnapshotAdaptsUserspaceDTOs(t *testing.T) { + startedAt := time.Unix(123, 456) + status := ProcessStatus{ + StartedAt: startedAt, + Enabled: true, + ForwardingArmed: true, + LastSnapshotGeneration: 77, + LastFIBGeneration: 9, + Capabilities: UserspaceCapabilities{ + ForwardingSupported: true, + UnsupportedReasons: []string{"example"}, + }, + } + deltas := []SessionDeltaInfo{{ + Timestamp: time.Unix(200, 0), + Slot: 1, + QueueID: 2, + WorkerID: 3, + Interface: "xe-0/0/0", + Ifindex: 10, + Event: "close", + AddrFamily: 10, + Protocol: 17, + SrcIP: "2001:db8::1", + DstIP: "2001:db8::2", + SrcPort: 1234, + DstPort: 53, + IngressZone: "trust", + EgressZone: "untrust", + IngressZoneID: 1, + EgressZoneID: 2, + OwnerRGID: 4, + Disposition: "expired", + Origin: "helper", + EgressIfindex: 11, + TXIfindex: 12, + TunnelEndpointID: 13, + TXVLANID: 14, + NextHop: "2001:db8::ff", + NeighborMAC: "00:11:22:33:44:55", + SrcMAC: "00:11:22:33:44:66", + NATSrcIP: "2001:db8::10", + NATDstIP: "2001:db8::20", + NATSrcPort: 40000, + NATDstPort: 53000, + FabricRedirect: true, + FabricIngress: true, + }} + + snapshot := runtimeSessionDeltaSnapshot(deltas, status, 1) + if !snapshot.Truncated { + t.Fatal("Truncated = false, want true when max equals returned deltas") + } + if snapshot.BackendEpoch != uint64(startedAt.UnixNano()) { + t.Fatalf("BackendEpoch = %d, want %d", snapshot.BackendEpoch, startedAt.UnixNano()) + } + if snapshot.Status.LastSnapshotGeneration != 77 { + t.Fatalf("LastSnapshotGeneration = %d, want 77", snapshot.Status.LastSnapshotGeneration) + } + if len(snapshot.Status.UnsupportedReasons) != 1 || snapshot.Status.UnsupportedReasons[0] != "example" { + t.Fatalf("UnsupportedReasons = %+v", snapshot.Status.UnsupportedReasons) + } + if got := len(snapshot.Deltas); got != 1 { + t.Fatalf("len(Deltas) = %d, want 1", got) + } + delta := snapshot.Deltas[0] + if delta.Family != dpruntime.SessionFamilyInet6 { + t.Fatalf("Family = %q, want inet6", delta.Family) + } + if delta.Reason != dpruntime.SessionDeltaReasonClose { + t.Fatalf("Reason = %q, want close", delta.Reason) + } + if delta.Generation != 77 { + t.Fatalf("Generation = %d, want 77", delta.Generation) + } + if delta.Key.IngressZoneID != 1 || delta.Key.EgressZoneID != 2 { + t.Fatalf("zone IDs = %d/%d, want 1/2", delta.Key.IngressZoneID, delta.Key.EgressZoneID) + } + if !delta.Value.FabricRedirect || !delta.Value.FabricIngress { + t.Fatalf("fabric flags = redirect:%t ingress:%t, want true/true", delta.Value.FabricRedirect, delta.Value.FabricIngress) + } +} + +func TestRuntimeSessionDeltaSourceAdapterSatisfiesNeutralInterface(t *testing.T) { + var _ dpruntime.SessionDeltaSource = (&Manager{}).RuntimeSessionDeltaSource() +} + +func TestRuntimeSessionsExposeUserspaceDeltaSource(t *testing.T) { + store := New().Sessions() + if store.SessionDeltas() == nil { + t.Fatal("Sessions().SessionDeltas() = nil, want userspace runtime delta source") + } +} + +type fakeUserspaceHAOps struct { + fabric0Updates int + fabric1Updates int + syncs int + updateErr error + events []string +} + +func (f *fakeUserspaceHAOps) UpdateRGActive(int, bool) error { + return nil +} + +func (f *fakeUserspaceHAOps) UpdateHAWatchdog(int, uint64) error { + return nil +} + +func (f *fakeUserspaceHAOps) UpdateFabricFwd(dataplane.FabricFwdInfo) error { + f.fabric0Updates++ + f.events = append(f.events, "fabric0") + return f.updateErr +} + +func (f *fakeUserspaceHAOps) UpdateFabricFwd1(dataplane.FabricFwdInfo) error { + f.fabric1Updates++ + f.events = append(f.events, "fabric1") + return f.updateErr +} + +func (f *fakeUserspaceHAOps) SyncFabricState() { + f.syncs++ + f.events = append(f.events, "sync") +} + +func TestRuntimeManagerHAUsesUserspaceController(t *testing.T) { + controller, ok := New().HA().(userspaceHAController) + if !ok { + t.Fatalf("Manager.HA() = %T, want userspaceHAController", New().HA()) + } + if _, ok := controller.manager.(*Manager); !ok { + t.Fatalf("Manager.HA() controller manager = %T, want *Manager", controller.manager) + } +} + +func TestRuntimeUserspaceHAControllerSyncsFabricStateAfterForwardingUpdate(t *testing.T) { + fake := &fakeUserspaceHAOps{} + controller := userspaceHAController{manager: fake} + + if err := controller.SetFabricForwarding(context.Background(), 0, dataplane.FabricFwdInfo{}); err != nil { + t.Fatalf("SetFabricForwarding fabric0: %v", err) + } + if fake.fabric0Updates != 1 || fake.fabric1Updates != 0 || fake.syncs != 1 { + t.Fatalf("fabric0 path updates/syncs = %d/%d/%d, want 1/0/1", + fake.fabric0Updates, fake.fabric1Updates, fake.syncs) + } + if want := []string{"fabric0", "sync"}; !reflect.DeepEqual(fake.events, want) { + t.Fatalf("fabric0 event order = %#v, want %#v", fake.events, want) + } + + if err := controller.SetFabricForwarding(context.Background(), 1, dataplane.FabricFwdInfo{}); err != nil { + t.Fatalf("SetFabricForwarding fabric1: %v", err) + } + if fake.fabric0Updates != 1 || fake.fabric1Updates != 1 || fake.syncs != 2 { + t.Fatalf("fabric1 path updates/syncs = %d/%d/%d, want 1/1/2", + fake.fabric0Updates, fake.fabric1Updates, fake.syncs) + } + if want := []string{"fabric0", "sync", "fabric1", "sync"}; !reflect.DeepEqual(fake.events, want) { + t.Fatalf("fabric1 event order = %#v, want %#v", fake.events, want) + } +} + +func TestRuntimeUserspaceHAControllerDoesNotSyncFabricStateAfterUpdateError(t *testing.T) { + fake := &fakeUserspaceHAOps{updateErr: errors.New("update failed")} + controller := userspaceHAController{manager: fake} + + if err := controller.SetFabricForwarding(context.Background(), 0, dataplane.FabricFwdInfo{}); err == nil { + t.Fatal("SetFabricForwarding succeeded, want update error") + } + if fake.syncs != 0 { + t.Fatalf("SyncFabricState calls = %d, want 0 after update error", fake.syncs) + } +} + +func TestRuntimeUserspaceHAControllerSyncsAfterSuccessfulUpdateDespiteCanceledContext(t *testing.T) { + fake := &fakeUserspaceHAOps{} + controller := userspaceHAController{manager: fake} + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + if err := controller.SetFabricForwarding(ctx, 0, dataplane.FabricFwdInfo{}); err == nil { + t.Fatal("SetFabricForwarding with initially canceled context succeeded, want context error") + } + if len(fake.events) != 0 { + t.Fatalf("events after initially canceled context = %#v, want none", fake.events) + } + + ctx = context.Background() + if err := controller.SetFabricForwarding(ctx, 0, dataplane.FabricFwdInfo{}); err != nil { + t.Fatalf("SetFabricForwarding after successful update: %v", err) + } + if want := []string{"fabric0", "sync"}; !reflect.DeepEqual(fake.events, want) { + t.Fatalf("event order after successful update = %#v, want %#v", fake.events, want) + } +}