Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 49 additions & 2 deletions docs/pr/1381-dataplane-interface-split/plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,12 @@ type ApplyResult struct {
FilterIDs map[string]uint32
FilterSpans map[string]FilterCounterSpan
NATCounterIDs map[string]uint32
Capabilities Capabilities
Generation uint64
PoolIDs map[string]uint8
PolicyNames map[uint32]string
AppNames map[uint16]string
PolicyScheduleRuleSlots []PolicyScheduleRuleSlot
Capabilities Capabilities
Generation uint64
}

type FilterCounterSpan struct {
Expand Down Expand Up @@ -135,6 +139,10 @@ through `LastCompileResult()`. It must carry at least:
maps `rule-set/rule` to a counter ID via `LastCompileResult().NATCounterIDs`.
That mapping is config/apply metadata, not a BPF map-writer method, so it
belongs in `ApplyResult`.
- `PoolIDs`, `PolicyNames`, `AppNames`, and `PolicyScheduleRuleSlots` for NAT
display, flow/session attribution, event labels, and policy-scheduler
runtime updates. Scheduler callers must consume the compiled slots directly;
eBPF/DPDK/userspace must not recompute slots from original config indexes.
- stable generation numbers for those IDs so mixed old/new metadata cannot be
combined with counters from a different apply generation.

Expand Down Expand Up @@ -266,6 +274,11 @@ type SessionDeltaSource interface {
}
```

`SessionStore.SessionDeltas()` is the bridge for this optional source. Generic
map-backed session stores return nil; the userspace backend returns an adapter
that converts helper-private DTOs into `pkg/dataplane/runtime` DTOs at the
backend boundary.

These DTOs must live in the same package as the abstract runtime interfaces or
in a third leaf package imported by both `pkg/dataplane` and
`pkg/dataplane/userspace`. The public interface must not reference
Expand All @@ -282,6 +295,7 @@ backend boundary.
type Telemetry interface {
NewEventSource() (dataplane.EventSource, error)
GlobalCounter(uint32) (uint64, error)
ReadFloodCounters(uint16) (dataplane.FloodState, error)
InterfaceCounters(int) (dataplane.InterfaceCounterValue, error)
ZoneCounters(uint16, int) (dataplane.CounterValue, error)
PolicyCounters(uint32) (dataplane.CounterValue, error)
Expand Down Expand Up @@ -534,3 +548,36 @@ Phase 1 is not complete until all of these are true:
semantics as GC and has reverse-key plus DNAT/NAT64 cleanup tests; and
- rollback is documented as switching the daemon back to the existing
`dataplane.DataPlane` adapter without changing persistent config format.

## Implementation note: first Phase 1 slice

The first implementation slice keeps the legacy BPF-shaped `DataPlane`
interface in place and adds the new contract beside it:

- `pkg/dataplane.RuntimeDataPlane`, `ConfigSink`, `ApplyResult`,
`SessionStore`, `Telemetry`, HA, and link-domain interfaces are now defined
without importing backend packages.
- `ApplyResult` carries `FilterIDs`, `FilterSpans` (`FilterID`, `RuleStart`,
`RuleCount`), `NATCounterIDs` widened to `uint32`, `PoolIDs`,
`PolicyNames`, `AppNames`, compiled `PolicyScheduleRuleSlots`,
capabilities, and a generation. eBPF, DPDK, and userspace compiles now
publish `LastApplyResult()`.
- `pkg/dataplane/runtime` owns the neutral session-delta DTOs and
`SessionDeltaSource`; `SessionStore.SessionDeltas()` exposes that optional
source and userspace adapts its helper-private
`SessionDeltaInfo`/`ProcessStatus` at the package boundary.
- eBPF, DPDK, and userspace managers now satisfy `RuntimeDataPlane` at
compile time. Shared adapters cover eBPF/DPDK HA plus generic telemetry and
session surfaces; userspace keeps backend-specific link and HA controllers so
link-cycle prepare/defer/rebind semantics and fabric-state helper sync stay
intact.
- Cluster stale-bulk reconciliation now routes through
`dataplane.SessionStore.ReconcileClusterBulk`, whose companion-delete path
owns forward, reverse, and DNAT/DNATv6 cleanup. A canary fails if
`pkg/cluster/sync.go` reintroduces local `DeleteDNATEntry*` cleanup.

Remaining Phase 1 work is still explicit: daemon/API/gRPC/CLI callers must move
from `LastCompileResult()` and BPF map reads to `LastApplyResult()`/domain
interfaces, GC must move to `SessionStore`/`Telemetry`, and the legacy
`DataPlane` method-count canary can only flip after those callers no longer need
the BPF-shaped surface.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/insomniacslk/dhcp v0.0.0-20251020182700-175e84fbb167
github.com/mdlayher/ndp v1.1.0
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/client_model v0.6.2
github.com/vishvananda/netlink v1.3.1
golang.org/x/net v0.47.0
golang.org/x/sync v0.18.0
Expand All @@ -27,7 +28,6 @@ require (
github.com/mdlayher/socket v0.5.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pierrec/lz4/v4 v4.1.14 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/u-root/uio v0.0.0-20230220225925-ffce2a382923 // indirect
Expand Down
73 changes: 17 additions & 56 deletions pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,64 +551,25 @@ func (s *SessionSync) reconcileStaleSessions() {
return true
}
var deleted int
var staleV4 []dataplane.SessionKey
v4IterStart := time.Now()
s.dp.IterateSessions(func(key dataplane.SessionKey, val dataplane.SessionValue) bool {
if val.IsReverse != 0 {
return true
}
if shouldSyncAtBulkStart(val.IngressZone) {
return true
}
if _, ok := recvV4[key]; !ok {
staleV4 = append(staleV4, key)
}
return true
store := dataplane.NewDataPlaneSessionStore(s.dp)
result, err := store.ReconcileClusterBulk(dataplane.ClusterBulkReconcileInput{
ReceivedV4: recvV4,
ReceivedV6: recvV6,
ShouldSyncZone: shouldSyncAtBulkStart,
DeleteReason: dataplane.DeleteReasonClusterStale,
})
slog.Info("cluster sync: reconcile stale sessions iterated v4", "stale", len(staleV4), "elapsed", time.Since(v4IterStart))
v4DeleteStart := time.Now()
for _, key := range staleV4 {
if val, err := s.dp.GetSessionV4(key); err == nil {
if val.ReverseKey.Protocol != 0 {
s.dp.DeleteSession(val.ReverseKey)
}
if val.Flags&dataplane.SessFlagSNAT != 0 && val.Flags&dataplane.SessFlagStaticNAT == 0 {
s.dp.DeleteDNATEntry(dataplane.DNATKey{Protocol: key.Protocol, DstIP: val.NATSrcIP, DstPort: val.NATSrcPort})
}
}
s.dp.DeleteSession(key)
deleted++
}
slog.Info("cluster sync: reconcile stale sessions deleted v4", "deleted", len(staleV4), "elapsed", time.Since(v4DeleteStart))
var staleV6 []dataplane.SessionKeyV6
v6IterStart := time.Now()
s.dp.IterateSessionsV6(func(key dataplane.SessionKeyV6, val dataplane.SessionValueV6) bool {
if val.IsReverse != 0 {
return true
}
if shouldSyncAtBulkStart(val.IngressZone) {
return true
}
if _, ok := recvV6[key]; !ok {
staleV6 = append(staleV6, key)
}
return true
})
slog.Info("cluster sync: reconcile stale sessions iterated v6", "stale", len(staleV6), "elapsed", time.Since(v6IterStart))
v6DeleteStart := time.Now()
for _, key := range staleV6 {
if val, err := s.dp.GetSessionV6(key); err == nil {
if val.ReverseKey.Protocol != 0 {
s.dp.DeleteSessionV6(val.ReverseKey)
}
if val.Flags&dataplane.SessFlagSNAT != 0 && val.Flags&dataplane.SessFlagStaticNAT == 0 {
s.dp.DeleteDNATEntryV6(dataplane.DNATKeyV6{Protocol: key.Protocol, DstIP: val.NATSrcIP, DstPort: val.NATSrcPort})
}
}
s.dp.DeleteSessionV6(key)
deleted++
deleted = result.DeletedV4 + result.DeletedV6
if err != nil {
slog.Warn("cluster sync: reconcile stale sessions failed", "err", err)
s.stats.Errors.Add(1)
}
slog.Info("cluster sync: reconcile stale sessions deleted v6", "deleted", len(staleV6), "elapsed", time.Since(v6DeleteStart))
slog.Info(
"cluster sync: reconcile stale sessions applied",
"stale_v4", result.StaleV4,
"stale_v6", result.StaleV6,
"deleted_v4", result.DeletedV4,
"deleted_v6", result.DeletedV6,
)
if deleted > 0 {
slog.Info("cluster sync: reconciled stale sessions", "deleted", deleted)
}
Expand Down
127 changes: 127 additions & 0 deletions pkg/cluster/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ import (
"encoding/binary"
"errors"
"fmt"
"go/ast"
"go/parser"
"go/token"
"io"
"net"
"path/filepath"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -537,6 +541,8 @@ type mockSweepDP struct {
v4sessions map[dataplane.SessionKey]dataplane.SessionValue
v6sessions map[dataplane.SessionKeyV6]dataplane.SessionValueV6
sessionCounter uint64
deletedDNATV4 []dataplane.DNATKey
deletedDNATV6 []dataplane.DNATKeyV6
}

func (m *mockSweepDP) ReadGlobalCounter(index uint32) (uint64, error) {
Expand Down Expand Up @@ -610,10 +616,12 @@ func (m *mockSweepDP) DeleteSessionV6(key dataplane.SessionKeyV6) error {
}

func (m *mockSweepDP) DeleteDNATEntry(key dataplane.DNATKey) error {
m.deletedDNATV4 = append(m.deletedDNATV4, key)
return nil
}

func (m *mockSweepDP) DeleteDNATEntryV6(key dataplane.DNATKeyV6) error {
m.deletedDNATV6 = append(m.deletedDNATV6, key)
return nil
}

Expand Down Expand Up @@ -1456,6 +1464,125 @@ func TestReconcileStaleSessionsV6(t *testing.T) {
}
}

func TestReconcileStaleSessionsUsesSessionStoreCompanionDeleteV4(t *testing.T) {
freshKey := dataplane.SessionKey{SrcIP: [4]byte{10, 0, 3, 1}, DstIP: [4]byte{10, 0, 4, 1}, Protocol: 6, SrcPort: 1000, DstPort: 80}
staleKey := dataplane.SessionKey{SrcIP: [4]byte{10, 0, 3, 2}, DstIP: [4]byte{10, 0, 4, 2}, Protocol: 6, SrcPort: 2000, DstPort: 443}
reverseKey := dataplane.SessionKey{SrcIP: [4]byte{10, 0, 4, 2}, DstIP: [4]byte{10, 0, 3, 2}, Protocol: 6, SrcPort: 443, DstPort: 2000}
dp := &mockSweepDP{
v4sessions: map[dataplane.SessionKey]dataplane.SessionValue{
freshKey: {IsReverse: 0, IngressZone: 2},
staleKey: {
IsReverse: 0,
IngressZone: 2,
ReverseKey: reverseKey,
Flags: dataplane.SessFlagSNAT,
NATSrcIP: 0x2c0200c0,
NATSrcPort: 40443,
},
reverseKey: {IsReverse: 1, IngressZone: 2},
},
}

ss := NewSessionSync(":0", "10.0.0.2:4785", dp)
ss.IsPrimaryFn = func() bool { return false }
ss.IsPrimaryForRGFn = func(rgID int) bool { return rgID == 1 }
ss.SetZoneRGMap(map[uint16]int{1: 1, 2: 2})

ss.handleMessage(nil, syncMsgBulkStart, nil)
ss.handleMessage(nil, syncMsgSessionV4, encodeSessionV4Payload(freshKey, dataplane.SessionValue{IsReverse: 0, IngressZone: 2}))
ss.handleMessage(nil, syncMsgBulkEnd, nil)

if _, ok := dp.v4sessions[staleKey]; ok {
t.Fatal("stale forward session should be deleted")
}
if _, ok := dp.v4sessions[reverseKey]; ok {
t.Fatal("stale reverse session should be deleted")
}
wantDNAT := dataplane.DNATKey{Protocol: 6, DstIP: 0x2c0200c0, DstPort: 40443}
if len(dp.deletedDNATV4) != 1 || dp.deletedDNATV4[0] != wantDNAT {
t.Fatalf("deleted DNAT = %+v, want [%+v]", dp.deletedDNATV4, wantDNAT)
}
}

func TestReconcileStaleSessionsUsesSessionStoreCompanionDeleteV6(t *testing.T) {
freshKey := dataplane.SessionKeyV6{Protocol: 6, SrcPort: 1000, DstPort: 80}
freshKey.SrcIP[15] = 1
freshKey.DstIP[15] = 2
staleKey := dataplane.SessionKeyV6{Protocol: 17, SrcPort: 2000, DstPort: 53}
staleKey.SrcIP[15] = 3
staleKey.DstIP[15] = 4
reverseKey := dataplane.SessionKeyV6{Protocol: 17, SrcPort: 53, DstPort: 2000}
reverseKey.SrcIP[15] = 4
reverseKey.DstIP[15] = 3
natIP := [16]byte{0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 99}
dp := &mockSweepDP{
v6sessions: map[dataplane.SessionKeyV6]dataplane.SessionValueV6{
freshKey: {IsReverse: 0, IngressZone: 2},
staleKey: {
IsReverse: 0,
IngressZone: 2,
ReverseKey: reverseKey,
Flags: dataplane.SessFlagSNAT,
NATSrcIP: natIP,
NATSrcPort: 53000,
},
reverseKey: {IsReverse: 1, IngressZone: 2},
},
}

ss := NewSessionSync(":0", "10.0.0.2:4785", dp)
ss.IsPrimaryFn = func() bool { return false }
ss.IsPrimaryForRGFn = func(rgID int) bool { return rgID == 1 }
ss.SetZoneRGMap(map[uint16]int{1: 1, 2: 2})

ss.handleMessage(nil, syncMsgBulkStart, nil)
ss.handleMessage(nil, syncMsgSessionV6, encodeSessionV6Payload(freshKey, dataplane.SessionValueV6{IsReverse: 0, IngressZone: 2}))
ss.handleMessage(nil, syncMsgBulkEnd, nil)

if _, ok := dp.v6sessions[staleKey]; ok {
t.Fatal("stale forward session should be deleted")
}
if _, ok := dp.v6sessions[reverseKey]; ok {
t.Fatal("stale reverse session should be deleted")
}
wantDNAT := dataplane.DNATKeyV6{Protocol: 17, DstIP: natIP, DstPort: 53000}
if len(dp.deletedDNATV6) != 1 || dp.deletedDNATV6[0] != wantDNAT {
t.Fatalf("deleted DNATv6 = %+v, want [%+v]", dp.deletedDNATV6, wantDNAT)
}
}

func TestReconcileStaleSessionsHasNoLocalDNATCleanup(t *testing.T) {
t.Parallel()

fset := token.NewFileSet()
file, err := parser.ParseFile(fset, filepath.Join(".", "sync.go"), nil, 0)
if err != nil {
t.Fatalf("parse sync.go: %v", err)
}
var reconcile ast.Node
for _, decl := range file.Decls {
fn, ok := decl.(*ast.FuncDecl)
if ok && fn.Name.Name == "reconcileStaleSessions" {
reconcile = fn.Body
break
}
}
if reconcile == nil {
t.Fatal("reconcileStaleSessions not found")
}
ast.Inspect(reconcile, func(n ast.Node) bool {
sel, ok := n.(*ast.SelectorExpr)
if !ok {
return true
}
switch sel.Sel.Name {
case "DeleteDNATEntry", "DeleteDNATEntryV6":
t.Fatalf("reconcileStaleSessions still owns local %s cleanup; use SessionStore companion delete", sel.Sel.Name)
}
return true
})
}

func TestReconcileNoBulkInProgress(t *testing.T) {
// If no bulk was in progress, reconciliation should be a no-op.
key := dataplane.SessionKey{SrcIP: [4]byte{10, 0, 1, 1}, Protocol: 6}
Expand Down
Loading