diff --git a/_Log.md b/_Log.md index 649c0645c..1980d3714 100644 --- a/_Log.md +++ b/_Log.md @@ -2,6 +2,11 @@ ## 2026-05-17 +- **Timestamp**: 2026-05-17T08:30:20Z + - **Action**: PR #1394 round-10 follow-up — fixed standalone userspace event-stream callback wiring by always registering session/full-resync callbacks, and added a regression test that verifies standalone SessionOpen and FullResync frames are ACKed instead of stalling behind an unwired callback queue. + - **File(s)**: `pkg/daemon/daemon_ha_userspace.go`, `pkg/daemon/userspace_sync_test.go`, `_Log.md` + - **Validation**: `gofmt -w pkg/daemon/daemon_ha_userspace.go pkg/daemon/userspace_sync_test.go`; `go test ./pkg/daemon ./pkg/dataplane/userspace ./pkg/logging`; `git diff --check` + - **Timestamp**: 2026-05-17T05:06:00Z - **Action**: PR #1395 cleanup — reverted an unintended `go.mod` direct/indirect dependency classification change introduced by automated tooling so the round-4 fix stays scoped to three-color policer compiler logic/tests/docs. - **File(s)**: `go.mod`, `_Log.md` @@ -14,6 +19,21 @@ - **Action**: PR #1395 round-4 follow-up — fixed three-color policer compiler handling for duplicate same-mode sibling blocks by iterating all `single-rate`/`two-rate` children, added hierarchical ambiguity regression coverage in parser/configstore tests, and updated filter module docs to reflect same-mode sibling merge semantics. - **File(s)**: `pkg/config/compiler_firewall.go`, `pkg/config/parser_ast_test.go`, `pkg/configstore/store_test.go`, `userspace-dp/src/filter/README.md`, `_Log.md` +- **Timestamp**: 2026-05-17T04:58:00Z + - **Action**: PR #1379 round-4 blocker fix follow-up — removed HA startup ordering race by starting cluster comms only after event fanout wiring, made userspace event-stream callback registration concurrency-safe for post-start wiring, and exposed helper producer sent/dropped counters through status text and Prometheus. + - **File(s)**: `pkg/daemon/daemon_run.go`, `pkg/dataplane/userspace/eventstream.go`, `pkg/dataplane/userspace/eventstream_test.go`, `pkg/dataplane/userspace/protocol.go`, `pkg/dataplane/userspace/statusfmt.go`, `pkg/api/metrics.go`, `pkg/api/metrics_test.go`, `_Log.md` + - **Validation**: `gofmt -w pkg/daemon/daemon_run.go pkg/dataplane/userspace/eventstream.go pkg/dataplane/userspace/eventstream_test.go pkg/dataplane/userspace/protocol.go pkg/dataplane/userspace/statusfmt.go pkg/api/metrics.go pkg/api/metrics_test.go`; `go test ./pkg/dataplane/userspace ./pkg/logging ./pkg/api ./pkg/daemon`; `git diff --check` + +- **Timestamp**: 2026-05-17T05:06:00Z + - **Action**: Addressed automated review nit on the new event-stream lifecycle regression test by replacing fixed sleep + wall-clock polling with timeout contexts and retry loops for listener readiness/connection checks. + - **File(s)**: `pkg/dataplane/userspace/eventstream_test.go`, `_Log.md` + - **Validation**: `gofmt -w pkg/dataplane/userspace/eventstream_test.go`; `go test ./pkg/dataplane/userspace ./pkg/logging ./pkg/api ./pkg/daemon`; `git diff --check` + +- **Timestamp**: 2026-05-17T05:13:00Z + - **Action**: Completed the same timeout-context synchronization pattern in `TestEventStreamDataplaneEventCallbackCanBeSetAfterStart` so the new lifecycle regression test no longer relies on fixed sleeps or wall-clock deadline polling. + - **File(s)**: `pkg/dataplane/userspace/eventstream_test.go`, `_Log.md` + - **Validation**: `gofmt -w pkg/dataplane/userspace/eventstream_test.go`; `go test ./pkg/dataplane/userspace ./pkg/logging ./pkg/api ./pkg/daemon`; `git diff --check` + - **Timestamp**: 2026-05-17T01:12:00Z - **Action**: PR #1391 post-smoke follow-up — live q10(24G)+q0(best-effort) contention on `7e7eb07e` showed serviceable-only exact suppression still let q0 drain ~15.6 GB of surplus while exact was backlogged. Reworked the gate from binary serviceability to residual-rate budgeting: non-exact surplus can consume only `root_rate - backlogged_exact_guarantee_rates`, shared exact queues publish queue masks so one queue's reservation is counted once across workers, and shared interfaces use an interface-global residual token bucket rather than per-worker residual buckets. - **File(s)**: `userspace-dp/src/afxdp/cos/queue_service/mod.rs`, `userspace-dp/src/afxdp/cos/queue_service/tests.rs`, `userspace-dp/src/afxdp/cos/tx_completion.rs`, `userspace-dp/src/afxdp/types/shared_cos_lease.rs`, `userspace-dp/src/afxdp/types/cos.rs`, `userspace-dp/src/afxdp/cos/builders.rs`, `userspace-dp/src/afxdp/worker/cos_tests.rs`, `userspace-dp/src/afxdp/cos/README.md`, `userspace-dp/src/afxdp/types/README.md`, `_Log.md` diff --git a/docs/session-sync-design.md b/docs/session-sync-design.md index bba2568d4..d3dbb6c9b 100644 --- a/docs/session-sync-design.md +++ b/docs/session-sync-design.md @@ -560,7 +560,11 @@ On disconnect, the helper retains its replay buffer (bounded, ~4096 events per binding). On reconnect, it replays from the last acked sequence. If the buffer has been trimmed past the last acked sequence (long disconnect), it sends a special `FullResync` frame (type 9) that tells the daemon to treat this as a -fresh start and request a bulk export. +fresh start and request a bulk export. The helper retains the stale replay +window until the daemon ACKs the `FullResync`; otherwise an unacked resync could +be lost across a second reconnect. HA backup nodes ACK and ignore session +events because they are permanent non-owners, while transient primary readiness +gaps withhold ACK for replay. ### Integration with Existing Code diff --git a/pkg/api/metrics.go b/pkg/api/metrics.go index 0a345ef51..c1d4d4c31 100644 --- a/pkg/api/metrics.go +++ b/pkg/api/metrics.go @@ -116,6 +116,14 @@ type xpfCollector struct { workerIdleLoops *prometheus.Desc workerCoSQueueLeaseAcquireV8Calls *prometheus.Desc workerCoSQueueLeaseAcquireV8GrantedBytes *prometheus.Desc + // #1379: daemon-side userspace event-stream transport counters. + userspaceEventStreamFramesTotal *prometheus.Desc + userspaceEventStreamProducerFramesTotal *prometheus.Desc + userspaceEventStreamDecodeErrorsTotal *prometheus.Desc + userspaceEventStreamSequenceGapsTotal *prometheus.Desc + userspaceEventStreamDataplaneEventsTotal *prometheus.Desc + userspaceEventStreamDataplaneDropsTotal *prometheus.Desc + userspaceEventStreamUnknownDropsTotal *prometheus.Desc // #925 Phase 2: liveness gauge for the supervisor's catch_unwind // state. 1 = worker has panicked and the supervisor has caught it; // 0 = healthy. Set-only in Phase 1 (cleared by daemon restart). @@ -484,6 +492,41 @@ func newCollector(srv *Server) *xpfCollector { "Bytes granted by v8 CoS queue-lease acquire calls for this worker (#1240).", []string{"worker_id"}, nil, ), + userspaceEventStreamFramesTotal: prometheus.NewDesc( + "xpf_userspace_event_stream_frames_total", + "Daemon-side userspace event-stream frames by direction.", + []string{"direction"}, nil, + ), + userspaceEventStreamProducerFramesTotal: prometheus.NewDesc( + "xpf_userspace_event_stream_producer_frames_total", + "Userspace helper event-stream producer counters by outcome.", + []string{"outcome"}, nil, + ), + userspaceEventStreamDecodeErrorsTotal: prometheus.NewDesc( + "xpf_userspace_event_stream_decode_errors_total", + "Daemon-side userspace event-stream decode errors.", + nil, nil, + ), + userspaceEventStreamSequenceGapsTotal: prometheus.NewDesc( + "xpf_userspace_event_stream_sequence_gaps_total", + "Daemon-side userspace event-stream sequence gaps.", + nil, nil, + ), + userspaceEventStreamDataplaneEventsTotal: prometheus.NewDesc( + "xpf_userspace_event_stream_dataplane_events_total", + "Decoded RT_FLOW dataplane events received over the userspace event stream.", + []string{"type"}, nil, + ), + userspaceEventStreamDataplaneDropsTotal: prometheus.NewDesc( + "xpf_userspace_event_stream_dataplane_event_drops_total", + "RT_FLOW dataplane events dropped by the userspace event-stream decoder.", + []string{"type"}, nil, + ), + userspaceEventStreamUnknownDropsTotal: prometheus.NewDesc( + "xpf_userspace_event_stream_unknown_frame_drops_total", + "Userspace event-stream frames dropped because their frame type is unknown.", + nil, nil, + ), workerDead: prometheus.NewDesc( "xpf_userspace_worker_dead", "1 if the userspace-dp worker thread has panicked and been "+ @@ -703,6 +746,13 @@ func (c *xpfCollector) Describe(ch chan<- *prometheus.Desc) { ch <- c.workerIdleLoops ch <- c.workerCoSQueueLeaseAcquireV8Calls ch <- c.workerCoSQueueLeaseAcquireV8GrantedBytes + ch <- c.userspaceEventStreamFramesTotal + ch <- c.userspaceEventStreamProducerFramesTotal + ch <- c.userspaceEventStreamDecodeErrorsTotal + ch <- c.userspaceEventStreamSequenceGapsTotal + ch <- c.userspaceEventStreamDataplaneEventsTotal + ch <- c.userspaceEventStreamDataplaneDropsTotal + ch <- c.userspaceEventStreamUnknownDropsTotal ch <- c.workerDead ch <- c.bindingActiveFlowCount ch <- c.bindingTXCompletions @@ -771,6 +821,7 @@ func (c *xpfCollector) collectUserspaceStatus(ch chan<- prometheus.Metric, dp da c.emitCoSDrainPhaseTelemetry(ch, status) c.emitCoSEqualFlowEnforcement(ch, status) c.emitWorkerRuntime(ch, status) + c.emitUserspaceEventStream(ch, status) c.emitBindingActiveFlowCount(ch, status) c.emitBindingTXCompletionTelemetry(ch, status) c.emitCoSActiveFlowCount(ch, status) @@ -1131,6 +1182,50 @@ func (c *xpfCollector) emitWorkerRuntime(ch chan<- prometheus.Metric, status dpu } } +func (c *xpfCollector) emitUserspaceEventStream(ch chan<- prometheus.Metric, status dpuserspace.ProcessStatus) { + if status.EventStream == nil { + return + } + es := status.EventStream + ch <- prometheus.MustNewConstMetric(c.userspaceEventStreamFramesTotal, + prometheus.CounterValue, float64(es.FramesRead), "read") + ch <- prometheus.MustNewConstMetric(c.userspaceEventStreamFramesTotal, + prometheus.CounterValue, float64(es.FramesWritten), "written") + ch <- prometheus.MustNewConstMetric(c.userspaceEventStreamProducerFramesTotal, + prometheus.CounterValue, float64(status.EventStreamSent), "sent") + ch <- prometheus.MustNewConstMetric(c.userspaceEventStreamProducerFramesTotal, + prometheus.CounterValue, float64(status.EventStreamDropped), "dropped") + ch <- prometheus.MustNewConstMetric(c.userspaceEventStreamDecodeErrorsTotal, + prometheus.CounterValue, float64(es.DecodeErrors)) + ch <- prometheus.MustNewConstMetric(c.userspaceEventStreamSequenceGapsTotal, + prometheus.CounterValue, float64(es.SeqGaps)) + + for _, item := range []struct { + label string + count uint64 + }{ + {"policy_deny", es.PolicyDenyEvents}, + {"screen_drop", es.ScreenDropEvents}, + {"filter_log", es.FilterLogEvents}, + } { + ch <- prometheus.MustNewConstMetric(c.userspaceEventStreamDataplaneEventsTotal, + prometheus.CounterValue, float64(item.count), item.label) + } + for _, item := range []struct { + label string + count uint64 + }{ + {"policy_deny", es.PolicyDenyDrops}, + {"screen_drop", es.ScreenDropDrops}, + {"filter_log", es.FilterLogDrops}, + } { + ch <- prometheus.MustNewConstMetric(c.userspaceEventStreamDataplaneDropsTotal, + prometheus.CounterValue, float64(item.count), item.label) + } + ch <- prometheus.MustNewConstMetric(c.userspaceEventStreamUnknownDropsTotal, + prometheus.CounterValue, float64(es.UnknownFrameDrops)) +} + // #709: export owner-profile telemetry when the dataplane is the // userspace-dp helper. The eBPF-only build path doesn't have this // shape (it has no CoS scheduler), so we type-assert on the optional diff --git a/pkg/api/metrics_test.go b/pkg/api/metrics_test.go index 7fc3c4127..d8f94e5e1 100644 --- a/pkg/api/metrics_test.go +++ b/pkg/api/metrics_test.go @@ -217,6 +217,63 @@ func collectFromEmitWorkerRuntime( return got } +func TestEmitUserspaceEventStreamMetrics(t *testing.T) { + mkNoLabel := func(name string) *prometheus.Desc { + return prometheus.NewDesc(name, name, nil, nil) + } + mkOneLabel := func(name, label string) *prometheus.Desc { + return prometheus.NewDesc(name, name, []string{label}, nil) + } + c := &xpfCollector{ + userspaceEventStreamFramesTotal: mkOneLabel("xpf_userspace_event_stream_frames_total", "direction"), + userspaceEventStreamProducerFramesTotal: mkOneLabel("xpf_userspace_event_stream_producer_frames_total", "outcome"), + userspaceEventStreamDecodeErrorsTotal: mkNoLabel("xpf_userspace_event_stream_decode_errors_total"), + userspaceEventStreamSequenceGapsTotal: mkNoLabel("xpf_userspace_event_stream_sequence_gaps_total"), + userspaceEventStreamDataplaneEventsTotal: mkOneLabel("xpf_userspace_event_stream_dataplane_events_total", "type"), + userspaceEventStreamDataplaneDropsTotal: mkOneLabel("xpf_userspace_event_stream_dataplane_event_drops_total", "type"), + userspaceEventStreamUnknownDropsTotal: mkNoLabel("xpf_userspace_event_stream_unknown_frame_drops_total"), + } + status := dpuserspace.ProcessStatus{ + EventStreamSent: 101, + EventStreamDropped: 7, + EventStream: &dpuserspace.EventStreamStatus{ + FramesRead: 11, + FramesWritten: 7, + DecodeErrors: 2, + SeqGaps: 3, + PolicyDenyEvents: 5, + ScreenDropEvents: 6, + FilterLogEvents: 8, + PolicyDenyDrops: 1, + ScreenDropDrops: 4, + FilterLogDrops: 9, + UnknownFrameDrops: 10, + }, + } + ch := make(chan prometheus.Metric) + go func() { + c.emitUserspaceEventStream(ch, status) + close(ch) + }() + var got []prometheus.Metric + for m := range ch { + got = append(got, m) + } + assertCounterClose(t, got, c.userspaceEventStreamFramesTotal, map[string]string{"direction": "read"}, 11) + assertCounterClose(t, got, c.userspaceEventStreamFramesTotal, map[string]string{"direction": "written"}, 7) + assertCounterClose(t, got, c.userspaceEventStreamProducerFramesTotal, map[string]string{"outcome": "sent"}, 101) + assertCounterClose(t, got, c.userspaceEventStreamProducerFramesTotal, map[string]string{"outcome": "dropped"}, 7) + assertCounterClose(t, got, c.userspaceEventStreamDecodeErrorsTotal, nil, 2) + assertCounterClose(t, got, c.userspaceEventStreamSequenceGapsTotal, nil, 3) + assertCounterClose(t, got, c.userspaceEventStreamDataplaneEventsTotal, map[string]string{"type": "policy_deny"}, 5) + assertCounterClose(t, got, c.userspaceEventStreamDataplaneEventsTotal, map[string]string{"type": "screen_drop"}, 6) + assertCounterClose(t, got, c.userspaceEventStreamDataplaneEventsTotal, map[string]string{"type": "filter_log"}, 8) + assertCounterClose(t, got, c.userspaceEventStreamDataplaneDropsTotal, map[string]string{"type": "policy_deny"}, 1) + assertCounterClose(t, got, c.userspaceEventStreamDataplaneDropsTotal, map[string]string{"type": "screen_drop"}, 4) + assertCounterClose(t, got, c.userspaceEventStreamDataplaneDropsTotal, map[string]string{"type": "filter_log"}, 9) + assertCounterClose(t, got, c.userspaceEventStreamUnknownDropsTotal, nil, 10) +} + func metricValuesByWorker( t *testing.T, metrics []prometheus.Metric, diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 95795b75a..174194e13 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -94,6 +94,7 @@ type Daemon struct { syncReadyTimeout time.Duration slogHandler *logging.SyslogSlogHandler traceWriter *logging.TraceWriter + eventBuf *logging.EventBuffer eventReader *logging.EventReader eventEngine *eventengine.Engine aggregator *logging.SessionAggregator diff --git a/pkg/daemon/daemon_ha_sync.go b/pkg/daemon/daemon_ha_sync.go index 879fc49fe..ff4cca213 100644 --- a/pkg/daemon/daemon_ha_sync.go +++ b/pkg/daemon/daemon_ha_sync.go @@ -676,6 +676,20 @@ func (d *Daemon) startClusterComms(ctx context.Context) { } d.sessionSync.SetVRFDevice(vrfDevice) + var streamProvider userspaceEventStreamProvider + streamCallbacksWired := false + if d.dp != nil { + if provider, ok := d.dp.(userspaceEventStreamProvider); ok { + streamProvider = provider + wireCtx, cancel := context.WithTimeout(commsCtx, 5*time.Second) + streamCallbacksWired = d.wireUserspaceEventStreamCallbacks(wireCtx, provider) + cancel() + if !streamCallbacksWired { + slog.Warn("userspace: event stream callbacks not ready before session sync start; falling back to polling until stream wires") + } + } + } + // Retry sync start: the VRF device and address binding may not // be ready during daemon startup (networkd race). for i := 0; i < 30; i++ { @@ -709,7 +723,11 @@ func (d *Daemon) startClusterComms(ctx context.Context) { return d.cluster != nil && d.cluster.IsLocalPrimary(rgID) } d.sessionSync.StartSyncSweep(commsCtx) - go d.runUserspaceEventStream(commsCtx) + if streamCallbacksWired { + go d.eventStreamFallbackLoop(commsCtx, streamProvider) + } else { + go d.runUserspaceEventStream(commsCtx) + } } break diff --git a/pkg/daemon/daemon_ha_userspace.go b/pkg/daemon/daemon_ha_userspace.go index 45afac0ed..823ec8c41 100644 --- a/pkg/daemon/daemon_ha_userspace.go +++ b/pkg/daemon/daemon_ha_userspace.go @@ -16,6 +16,7 @@ import ( "github.com/psaab/xpf/pkg/config" "github.com/psaab/xpf/pkg/dataplane" dpuserspace "github.com/psaab/xpf/pkg/dataplane/userspace" + "github.com/psaab/xpf/pkg/logging" ) // buildZoneIDs replicates the deterministic zone ID assignment from the @@ -482,12 +483,27 @@ func (d *Daemon) syncUserspaceSessionDeltas(ctx context.Context) { // is unavailable or disconnected. func (d *Daemon) runUserspaceEventStream(ctx context.Context) { provider, ok := d.dp.(userspaceEventStreamProvider) - if !ok || d.cluster == nil || d.sessionSync == nil { + if !ok { // Manager doesn't support event stream — fall back to polling. d.syncUserspaceSessionDeltas(ctx) return } + if !d.wireUserspaceEventStreamCallbacks(ctx, provider) { + return + } + if d.cluster == nil || d.sessionSync == nil { + return + } + slog.Info("userspace: event stream consumer started, polling is primary until stream connects") + + // Monitor connection. When the stream is connected, events arrive via + // callback and polling drops to 5s reconciliation. When disconnected, + // polling resumes at 100ms. + d.eventStreamFallbackLoop(ctx, provider) +} + +func (d *Daemon) wireUserspaceEventStreamCallbacks(ctx context.Context, provider userspaceEventStreamProvider) bool { // Wait for the event stream to become available (helper may not have started yet). var es *dpuserspace.EventStream for { @@ -497,44 +513,54 @@ func (d *Daemon) runUserspaceEventStream(ctx context.Context) { } select { case <-ctx.Done(): - return + return false case <-time.After(500 * time.Millisecond): } } // Wire callbacks. - es.SetOnEvent(func(eventType uint8, seq uint64, delta dpuserspace.SessionDeltaInfo) { - d.handleEventStreamDelta(eventType, delta) + es.SetOnEvent(func(eventType uint8, seq uint64, delta dpuserspace.SessionDeltaInfo) bool { + return d.handleEventStreamDelta(eventType, delta) }) - es.SetOnFullResync(func() { - d.handleEventStreamFullResync() + es.SetOnFullResync(func() bool { + return d.handleEventStreamFullResync() }) - - slog.Info("userspace: event stream consumer started, polling is primary until stream connects") - - // Monitor connection. When the stream is connected, events arrive via - // callback and polling drops to 5s reconciliation. When disconnected, - // polling resumes at 100ms. - d.eventStreamFallbackLoop(ctx, provider) + if d.eventReader != nil { + es.SetOnRawDataplaneEvent(func(seq uint64, payload []byte) { + if !d.eventReader.ProcessRawEvent(payload) { + slog.Debug("userspace event stream: dropped undecodable dataplane event", "seq", seq) + } + }) + } else { + es.SetOnDataplaneEvent(func(seq uint64, rec logging.EventRecord) { + if d.eventBuf != nil { + d.eventBuf.Add(rec) + } + }) + } + return true } -// handleEventStreamDelta processes a single session event from the event stream. -func (d *Daemon) handleEventStreamDelta(eventType uint8, delta dpuserspace.SessionDeltaInfo) { +// handleEventStreamDelta processes a single session event from the event +// stream. It returns true when the delta has been handled, including permanent +// non-owner no-op handling on HA backups. It returns false only for transient +// readiness gaps where EventStream should withhold ACK so the helper can replay. +func (d *Daemon) handleEventStreamDelta(eventType uint8, delta dpuserspace.SessionDeltaInfo) bool { if d.cluster == nil || d.sessionSync == nil { - slog.Debug("userspace delta: dropped (no cluster/sync)", "type", eventType) - return + slog.Debug("userspace delta: ignored (no cluster/sync)", "type", eventType) + return true } if !d.cluster.IsLocalPrimaryAny() { - slog.Debug("userspace delta: dropped (not primary for any RG)", "type", eventType) - return + slog.Debug("userspace delta: ignored (not primary for any RG)", "type", eventType) + return true } if !d.sessionSync.IsConnected() { slog.Debug("userspace delta: dropped (sync not connected)", "type", eventType) - return + return false } cfg := d.store.ActiveConfig() if cfg == nil { - return + return false } zoneIDs := buildZoneIDs(cfg) @@ -547,36 +573,48 @@ func (d *Daemon) handleEventStreamDelta(eventType uint8, delta dpuserspace.Sessi } d.queueUserspaceSessionDeltas(zoneIDs, []dpuserspace.SessionDeltaInfo{delta}) + return true } // handleEventStreamFullResync handles a FullResync frame from the helper. // This means the helper's replay buffer was trimmed past our last ack; we need // a one-shot bulk export to catch up. -func (d *Daemon) handleEventStreamFullResync() { +func (d *Daemon) handleEventStreamFullResync() bool { slog.Warn("userspace event stream: full resync requested, triggering bulk export") + if d.cluster == nil || d.sessionSync == nil { + slog.Debug("userspace event stream: full resync ignored (no cluster/sync)") + return true + } + if !d.cluster.IsLocalPrimaryAny() { + slog.Debug("userspace event stream: full resync ignored (not primary for any RG)") + return true + } + if !d.sessionSync.IsConnected() { + slog.Debug("userspace event stream: full resync deferred (sync not connected)") + return false + } exporter, ok := d.dp.(userspaceSessionExporter) if !ok { - return + return false } cfg := d.store.ActiveConfig() if cfg == nil { - return + return false } - // Export sessions for all RGs we're primary for. var rgIDs []int - if d.cluster != nil { - for rgID := 0; rgID < 16; rgID++ { - if d.cluster.IsLocalPrimary(rgID) { - rgIDs = append(rgIDs, rgID) - } + for rgID := 0; rgID < 16; rgID++ { + if d.cluster.IsLocalPrimary(rgID) { + rgIDs = append(rgIDs, rgID) } } if len(rgIDs) == 0 { - return + return false } if _, err := d.exportUserspaceOwnerRGSessionsWithConfig(exporter, cfg, rgIDs); err != nil { slog.Warn("userspace event stream: full resync export failed", "err", err) + return false } + return true } // eventStreamFallbackLoop monitors the event stream connection and falls back @@ -585,6 +623,9 @@ func (d *Daemon) handleEventStreamFullResync() { // when disconnected, it runs at 100ms to compensate for the lost stream. func (d *Daemon) eventStreamFallbackLoop(ctx context.Context, provider userspaceEventStreamProvider) { drainer, hasDrainer := d.dp.(userspaceSessionDeltaDrainer) + if d.cluster == nil || d.sessionSync == nil { + return + } const ( fastInterval = 100 * time.Millisecond // event stream disconnected diff --git a/pkg/daemon/daemon_run.go b/pkg/daemon/daemon_run.go index 5d65ce3dd..0bec2451f 100644 --- a/pkg/daemon/daemon_run.go +++ b/pkg/daemon/daemon_run.go @@ -274,11 +274,6 @@ func (d *Daemon) Run(ctx context.Context) error { d.reconcileBlackholeRoutes() } - // Start cluster heartbeat + sync after applyConfig (needs VRF to exist). - if d.cluster != nil { - d.startClusterComms(ctx) - } - // Handle signals for clean shutdown. // In interactive mode, only SIGTERM triggers shutdown — SIGINT is handled // by the CLI for command cancellation (Ctrl-C). @@ -293,6 +288,7 @@ func (d *Daemon) Run(ctx context.Context) error { // Create event buffer (shared between event reader and CLI) eventBuf := logging.NewEventBuffer(1000) + d.eventBuf = eventBuf // WaitGroup for coordinated shutdown of background goroutines var wg sync.WaitGroup @@ -427,6 +423,33 @@ func (d *Daemon) Run(ctx context.Context) error { d.applyFlowTrace(cfg, er) } } + if er == nil { + if _, ok := d.dp.(userspaceEventStreamProvider); ok { + er = logging.NewEventReader(nil, eventBuf) + d.eventReader = er + if cfg := d.store.ActiveConfig(); cfg != nil { + d.applySyslogConfig(er, cfg) + d.startFlowExporter(ctx, cfg, er) + d.startIPFIXExporter(ctx, cfg, er) + d.applyFlowTrace(cfg, er) + } + } + } + + if _, ok := d.dp.(userspaceEventStreamProvider); ok && d.cluster == nil { + wg.Add(1) + go func() { + defer wg.Done() + d.runUserspaceEventStream(ctx) + }() + } + } + + // Start cluster heartbeat + sync after event fanout is initialized. + // This avoids an HA startup race where runUserspaceEventStream wires a + // decode-only fallback callback before d.eventReader exists. + if d.cluster != nil { + d.startClusterComms(ctx) } // Start DHCP clients for interfaces configured with dhcp/dhcpv6. diff --git a/pkg/daemon/userspace_sync_test.go b/pkg/daemon/userspace_sync_test.go index 48f18bd08..deb2a5a3b 100644 --- a/pkg/daemon/userspace_sync_test.go +++ b/pkg/daemon/userspace_sync_test.go @@ -1,8 +1,12 @@ package daemon import ( + "context" "encoding/binary" "errors" + "io" + "net" + "path/filepath" "testing" "time" @@ -12,6 +16,79 @@ import ( dpuserspace "github.com/psaab/xpf/pkg/dataplane/userspace" ) +type fixedEventStreamProvider struct { + es *dpuserspace.EventStream +} + +const maxEventFramePayloadForWiringTest = 1 << 20 + +func (p fixedEventStreamProvider) EventStream() *dpuserspace.EventStream { return p.es } + +func buildSessionOpenFrameV4PayloadForWiringTest() []byte { + buf := make([]byte, 56) + buf[0] = 4 + buf[1] = 6 + binary.LittleEndian.PutUint16(buf[2:4], 12345) + binary.LittleEndian.PutUint16(buf[4:6], 443) + copy(buf[24:28], []byte{10, 0, 1, 2}) + copy(buf[28:32], []byte{172, 16, 0, 1}) + return buf +} + +func writeEventFrameForWiringTest(t *testing.T, conn net.Conn, typ uint8, seq uint64, payload []byte) { + t.Helper() + var hdr [dpuserspace.EventFrameHeaderSize]byte + binary.LittleEndian.PutUint32(hdr[0:4], uint32(len(payload))) + hdr[4] = typ + binary.LittleEndian.PutUint64(hdr[8:16], seq) + if _, err := conn.Write(hdr[:]); err != nil { + t.Fatalf("write frame header: %v", err) + } + if len(payload) > 0 { + if _, err := conn.Write(payload); err != nil { + t.Fatalf("write frame payload: %v", err) + } + } +} + +func waitForAckSeqForWiringTest(t *testing.T, conn net.Conn, want uint64) { + t.Helper() + deadline := time.Now().Add(2 * time.Second) + for { + if err := conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond)); err != nil { + t.Fatalf("set read deadline: %v", err) + } + var hdr [dpuserspace.EventFrameHeaderSize]byte + if _, err := io.ReadFull(conn, hdr[:]); err != nil { + var ne net.Error + if errors.As(err, &ne) && ne.Timeout() && time.Now().Before(deadline) { + continue + } + t.Fatalf("read ack frame: %v", err) + } + payloadLen := binary.LittleEndian.Uint32(hdr[0:4]) + typ := hdr[4] + seq := binary.LittleEndian.Uint64(hdr[8:16]) + if payloadLen > maxEventFramePayloadForWiringTest { + t.Fatalf("unexpected frame payload length: %d", payloadLen) + } + if payloadLen > 0 { + // Ack/control frames in this helper are header-only; drain any payload + // to keep stream framing aligned for subsequent reads. + payload := make([]byte, payloadLen) + if _, err := io.ReadFull(conn, payload); err != nil { + t.Fatalf("read frame payload: %v", err) + } + } + if typ == dpuserspace.EventTypeAck && seq >= want { + return + } + if time.Now().After(deadline) { + t.Fatalf("timed out waiting for ack seq >= %d", want) + } + } +} + type fakeUserspaceDeltaDrainer struct { batches [][]dpuserspace.SessionDeltaInfo calls int @@ -720,8 +797,9 @@ func TestUserspaceRGDemotionPrepLeaseCanBeReleasedAfterFailure(t *testing.T) { } } -// TestHandleEventStreamDeltaSkipsWhenNoCluster verifies that events are -// silently dropped when cluster is nil. +// TestHandleEventStreamDeltaSkipsWhenNoCluster verifies that permanent +// non-owner/no-cluster paths ACK and ignore events instead of asking the helper +// to replay forever. func TestHandleEventStreamDeltaSkipsWhenNoCluster(t *testing.T) { d := &Daemon{} delta := dpuserspace.SessionDeltaInfo{ @@ -732,8 +810,16 @@ func TestHandleEventStreamDeltaSkipsWhenNoCluster(t *testing.T) { SrcPort: 12345, DstPort: 443, } - // Should not panic when cluster and sessionSync are nil. - d.handleEventStreamDelta(dpuserspace.EventTypeSessionOpen, delta) + if !d.handleEventStreamDelta(dpuserspace.EventTypeSessionOpen, delta) { + t.Fatal("delta without cluster/sessionSync should be permanently ignored and ACKed") + } + backup := &Daemon{ + cluster: newClusterManager(false), + sessionSync: &cluster.SessionSync{}, + } + if !backup.handleEventStreamDelta(dpuserspace.EventTypeSessionOpen, delta) { + t.Fatal("delta on a backup should be permanently ignored and ACKed") + } } // TestHandleEventStreamDeltaMapsEventTypes verifies event type to string mapping @@ -749,6 +835,61 @@ func TestHandleEventStreamDeltaMapsEventTypes(t *testing.T) { d.handleEventStreamDelta(dpuserspace.EventTypeSessionUpdate, delta) } +func TestHandleEventStreamFullResyncRequiresHAReady(t *testing.T) { + if !(&Daemon{}).handleEventStreamFullResync() { + t.Fatal("full resync without cluster/sessionSync should be permanently ignored and ACKed") + } + backup := &Daemon{ + cluster: newClusterManager(false), + sessionSync: &cluster.SessionSync{}, + } + if !backup.handleEventStreamFullResync() { + t.Fatal("full resync on a backup should be permanently ignored and ACKed") + } + d := &Daemon{ + cluster: newClusterManager(true), + sessionSync: &cluster.SessionSync{}, + } + if d.handleEventStreamFullResync() { + t.Fatal("full resync with disconnected sessionSync should withhold ACK") + } +} + +func TestWireUserspaceEventStreamCallbacksStandaloneWiresSessionAndFullResync(t *testing.T) { + socketPath := filepath.Join(t.TempDir(), "events.sock") + es := dpuserspace.NewEventStream(socketPath) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + es.Start(ctx) + defer es.Close() + + d := &Daemon{} + if !d.wireUserspaceEventStreamCallbacks(ctx, fixedEventStreamProvider{es: es}) { + t.Fatal("expected callback wiring to succeed") + } + + var conn net.Conn + var err error + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + conn, err = net.Dial("unix", socketPath) + if err == nil { + break + } + time.Sleep(10 * time.Millisecond) + } + if err != nil { + t.Fatalf("dial event stream socket: %v", err) + } + defer conn.Close() + + writeEventFrameForWiringTest(t, conn, dpuserspace.EventTypeSessionOpen, 1, buildSessionOpenFrameV4PayloadForWiringTest()) + waitForAckSeqForWiringTest(t, conn, 1) + + writeEventFrameForWiringTest(t, conn, dpuserspace.EventTypeFullResync, 2, nil) + waitForAckSeqForWiringTest(t, conn, 2) +} + // TestUserspaceManagerImplementsEventStreamExporter verifies that the userspace // Manager satisfies the userspaceEventStreamExporter interface used by // bulkSyncViaEventStreamOrFallback. diff --git a/pkg/dataplane/userspace/eventstream.go b/pkg/dataplane/userspace/eventstream.go index 5325ba7b1..43c43053b 100644 --- a/pkg/dataplane/userspace/eventstream.go +++ b/pkg/dataplane/userspace/eventstream.go @@ -13,8 +13,20 @@ import ( "time" "github.com/psaab/xpf/pkg/dataplane" + "github.com/psaab/xpf/pkg/logging" ) +const pendingCallbackFramesLimit = 4096 +const callbackNotReadyBackoff = 100 * time.Millisecond + +type pendingCallbackFrame struct { + typ uint8 + seq uint64 + sessionDelta SessionDeltaInfo + dataplanePayload []byte + dataplaneRecord logging.EventRecord +} + // EventStream manages the daemon-side event socket for receiving session events // from the Rust helper over a persistent binary-framed Unix stream. // @@ -37,38 +49,88 @@ type EventStream struct { lastAckSeq atomic.Uint64 ackBatch atomic.Uint64 // events since last ack - // Callbacks — set before Start(), called on the reader goroutine. - onEvent func(eventType uint8, seq uint64, delta SessionDeltaInfo) - onFullResync func() + // Callbacks are invoked on the reader goroutine and may be updated + // dynamically by control-plane code. + callbackMu sync.RWMutex + onEvent func(eventType uint8, seq uint64, delta SessionDeltaInfo) bool + onDataplaneEvent func(seq uint64, rec logging.EventRecord) + onRawDataplaneEvent func(seq uint64, payload []byte) + onFullResync func() bool + + pendingFlushMu sync.Mutex + pendingMu sync.Mutex + pendingCallbackFrames []pendingCallbackFrame // DrainComplete signaling for demotion prep. drainCompleteMu sync.Mutex drainCompleteCh chan uint64 // Stats. - FramesRead atomic.Uint64 - FramesWritten atomic.Uint64 - DecodeErrors atomic.Uint64 - SeqGaps atomic.Uint64 + FramesRead atomic.Uint64 + FramesWritten atomic.Uint64 + DecodeErrors atomic.Uint64 + SeqGaps atomic.Uint64 + PolicyDenyEvents atomic.Uint64 + ScreenDropEvents atomic.Uint64 + FilterLogEvents atomic.Uint64 + PolicyDenyDrops atomic.Uint64 + ScreenDropDrops atomic.Uint64 + FilterLogDrops atomic.Uint64 + UnknownFrameDrops atomic.Uint64 } // NewEventStream creates an EventStream for the given Unix socket path. // Call Start() to begin listening. func NewEventStream(socketPath string) *EventStream { return &EventStream{ - socketPath: socketPath, + socketPath: socketPath, drainCompleteCh: make(chan uint64, 1), } } -// SetOnEvent sets the callback for session events. Must be called before Start(). -func (es *EventStream) SetOnEvent(fn func(eventType uint8, seq uint64, delta SessionDeltaInfo)) { +// SetOnEvent sets the callback for session events. The callback returns true +// only after the delta is durably handled; false withholds ACK so the helper can +// replay instead of losing an event during readiness transitions. +func (es *EventStream) SetOnEvent(fn func(eventType uint8, seq uint64, delta SessionDeltaInfo) bool) { + es.callbackMu.Lock() es.onEvent = fn + es.callbackMu.Unlock() + es.flushPendingCallbackFrames() } -// SetOnFullResync sets the callback for full resync requests. Must be called before Start(). -func (es *EventStream) SetOnFullResync(fn func()) { +// SetOnDataplaneEvent sets the callback for RT_FLOW-style dataplane events. +func (es *EventStream) SetOnDataplaneEvent(fn func(seq uint64, rec logging.EventRecord)) { + es.callbackMu.Lock() + es.onDataplaneEvent = fn + es.callbackMu.Unlock() + es.flushPendingCallbackFrames() +} + +// SetOnRawDataplaneEvent sets the callback for raw RT_FLOW dataplane events. +// It is preferred when the receiver can process the canonical dataplane.Event +// payload itself, because it preserves name resolution and syslog fanout. +func (es *EventStream) SetOnRawDataplaneEvent(fn func(seq uint64, payload []byte)) { + es.callbackMu.Lock() + es.onRawDataplaneEvent = fn + es.callbackMu.Unlock() + es.flushPendingCallbackFrames() +} + +// SetOnFullResync sets the callback for full resync requests. The callback +// returns true only after the resync request has been acted on. +func (es *EventStream) SetOnFullResync(fn func() bool) { + es.callbackMu.Lock() es.onFullResync = fn + es.callbackMu.Unlock() + es.flushPendingCallbackFrames() +} + +func (es *EventStream) dataplaneCallbacks() (func(uint64, []byte), func(uint64, logging.EventRecord)) { + es.callbackMu.RLock() + defer es.callbackMu.RUnlock() + raw := es.onRawDataplaneEvent + decoded := es.onDataplaneEvent + return raw, decoded } // Start creates the Unix socket listener and launches the accept loop. @@ -147,6 +209,22 @@ func (es *EventStream) LastAckedSequence() uint64 { return es.lastAckSeq.Load() } +func (es *EventStream) Status() EventStreamStatus { + return EventStreamStatus{ + FramesRead: es.FramesRead.Load(), + FramesWritten: es.FramesWritten.Load(), + DecodeErrors: es.DecodeErrors.Load(), + SeqGaps: es.SeqGaps.Load(), + PolicyDenyEvents: es.PolicyDenyEvents.Load(), + ScreenDropEvents: es.ScreenDropEvents.Load(), + FilterLogEvents: es.FilterLogEvents.Load(), + PolicyDenyDrops: es.PolicyDenyDrops.Load(), + ScreenDropDrops: es.ScreenDropDrops.Load(), + FilterLogDrops: es.FilterLogDrops.Load(), + UnknownFrameDrops: es.UnknownFrameDrops.Load(), + } +} + // acceptLoop listens for helper connections. Only one is active at a time. func (es *EventStream) acceptLoop(ctx context.Context) { for { @@ -176,6 +254,7 @@ func (es *EventStream) acceptLoop(ctx context.Context) { es.lastRecvSeq.Store(0) es.lastAppliedSeq.Store(0) es.lastAckSeq.Store(0) + es.clearPendingCallbackFrames() es.mu.Unlock() // Run the reader and ack loops for this connection. @@ -281,11 +360,10 @@ func (es *EventStream) readLoop(ctx context.Context) { } prevSeq = seq es.lastRecvSeq.Store(seq) - es.ackBatch.Add(1) - if es.onEvent != nil { - es.onEvent(typ, seq, delta) + if !es.dispatchOrQueueSessionFrame(typ, seq, delta) { + es.backoffCallbackNotReady(ctx) + return } - es.lastAppliedSeq.Store(seq) case EventTypeSessionClose: delta, ok := decodeSessionCloseEvent(payload) @@ -299,11 +377,10 @@ func (es *EventStream) readLoop(ctx context.Context) { } prevSeq = seq es.lastRecvSeq.Store(seq) - es.ackBatch.Add(1) - if es.onEvent != nil { - es.onEvent(typ, seq, delta) + if !es.dispatchOrQueueSessionFrame(typ, seq, delta) { + es.backoffCallbackNotReady(ctx) + return } - es.lastAppliedSeq.Store(seq) case EventTypeDrainComplete: select { @@ -313,20 +390,263 @@ func (es *EventStream) readLoop(ctx context.Context) { case EventTypeFullResync: slog.Warn("event stream: full resync requested by helper") - if es.onFullResync != nil { - es.onFullResync() + if !es.dispatchOrQueueFullResyncFrame(seq) { + es.backoffCallbackNotReady(ctx) + return } case EventTypeKeepalive: // Idle heartbeat from helper — no action needed, just keeps // the connection alive to prevent read-deadline disconnect. + continue + + case EventFrameTypePolicyDeny, EventFrameTypeScreenDrop, EventFrameTypeFilterLog: + if !dataplaneEventPayloadMatchesFrame(typ, payload) { + es.DecodeErrors.Add(1) + es.recordDataplaneEventDrop(typ) + es.markDroppedFrameApplied(seq, &prevSeq) + continue + } + rec, ok := decodeDataplaneEventPayload(payload) + if !ok { + es.DecodeErrors.Add(1) + es.recordDataplaneEventDrop(typ) + es.markDroppedFrameApplied(seq, &prevSeq) + continue + } + onRawDataplaneEvent, onDataplaneEvent := es.dataplaneCallbacks() + if seq > prevSeq+1 && prevSeq > 0 { + es.SeqGaps.Add(1) + slog.Debug("event stream: sequence gap", "expected", prevSeq+1, "got", seq) + } + prevSeq = seq + es.lastRecvSeq.Store(seq) + if !es.dispatchOrQueueDataplaneFrame(typ, seq, payload, rec, onRawDataplaneEvent, onDataplaneEvent) { + es.backoffCallbackNotReady(ctx) + return + } default: - slog.Debug("event stream: unknown frame type", "type", typ) + es.UnknownFrameDrops.Add(1) + es.markDroppedFrameApplied(seq, &prevSeq) + slog.Debug("event stream: dropped unknown frame type", "type", typ, "seq", seq) } } } +func (es *EventStream) markDroppedFrameApplied(seq uint64, prevSeq *uint64) { + if seq > *prevSeq+1 && *prevSeq > 0 { + es.SeqGaps.Add(1) + } + *prevSeq = seq + es.lastRecvSeq.Store(seq) + es.markFrameApplied(seq) +} + +func (es *EventStream) markFrameApplied(seq uint64) { + es.ackBatch.Add(1) + es.lastAppliedSeq.Store(seq) +} + +func (es *EventStream) backoffCallbackNotReady(ctx context.Context) { + timer := time.NewTimer(callbackNotReadyBackoff) + defer timer.Stop() + select { + case <-ctx.Done(): + case <-timer.C: + } +} + +func (es *EventStream) dispatchOrQueueSessionFrame(typ uint8, seq uint64, delta SessionDeltaInfo) bool { + es.callbackMu.RLock() + onEvent := es.onEvent + es.callbackMu.RUnlock() + if onEvent == nil || es.hasPendingCallbackFrames() { + if !es.enqueuePendingCallbackFrame(pendingCallbackFrame{ + typ: typ, + seq: seq, + sessionDelta: delta, + }) { + return false + } + es.flushPendingCallbackFrames() + return true + } + if !onEvent(typ, seq, delta) { + return es.enqueuePendingCallbackFrame(pendingCallbackFrame{ + typ: typ, + seq: seq, + sessionDelta: delta, + }) + } + es.markFrameApplied(seq) + return true +} + +func (es *EventStream) dispatchOrQueueFullResyncFrame(seq uint64) bool { + es.callbackMu.RLock() + onFullResync := es.onFullResync + es.callbackMu.RUnlock() + if onFullResync == nil || es.hasPendingCallbackFrames() { + if !es.enqueuePendingCallbackFrame(pendingCallbackFrame{ + typ: EventTypeFullResync, + seq: seq, + }) { + return false + } + es.flushPendingCallbackFrames() + return true + } + if !onFullResync() { + return es.enqueuePendingCallbackFrame(pendingCallbackFrame{ + typ: EventTypeFullResync, + seq: seq, + }) + } + es.markFrameApplied(seq) + return true +} + +func (es *EventStream) dispatchOrQueueDataplaneFrame( + typ uint8, + seq uint64, + payload []byte, + rec logging.EventRecord, + onRawDataplaneEvent func(uint64, []byte), + onDataplaneEvent func(uint64, logging.EventRecord), +) bool { + if onRawDataplaneEvent == nil && onDataplaneEvent == nil || es.hasPendingCallbackFrames() { + if !es.enqueuePendingCallbackFrame(pendingCallbackFrame{ + typ: typ, + seq: seq, + dataplanePayload: append([]byte(nil), payload...), + dataplaneRecord: rec, + }) { + return false + } + es.flushPendingCallbackFrames() + return true + } + if onRawDataplaneEvent != nil { + onRawDataplaneEvent(seq, payload) + } else { + onDataplaneEvent(seq, rec) + } + es.recordDataplaneEvent(typ) + es.markFrameApplied(seq) + return true +} + +func (es *EventStream) hasPendingCallbackFrames() bool { + es.pendingMu.Lock() + defer es.pendingMu.Unlock() + return len(es.pendingCallbackFrames) > 0 +} + +func (es *EventStream) enqueuePendingCallbackFrame(frame pendingCallbackFrame) bool { + es.pendingMu.Lock() + defer es.pendingMu.Unlock() + if len(es.pendingCallbackFrames) >= pendingCallbackFramesLimit { + slog.Error("event stream: pending callback queue full; closing helper stream to force replay", + "limit", pendingCallbackFramesLimit, "type", frame.typ, "seq", frame.seq) + return false + } + es.pendingCallbackFrames = append(es.pendingCallbackFrames, frame) + return true +} + +func (es *EventStream) clearPendingCallbackFrames() { + es.pendingFlushMu.Lock() + defer es.pendingFlushMu.Unlock() + es.pendingMu.Lock() + es.pendingCallbackFrames = nil + es.pendingMu.Unlock() +} + +func (es *EventStream) flushPendingCallbackFrames() { + es.pendingFlushMu.Lock() + defer es.pendingFlushMu.Unlock() + + for { + es.pendingMu.Lock() + if len(es.pendingCallbackFrames) == 0 { + es.pendingMu.Unlock() + return + } + frame := es.pendingCallbackFrames[0] + es.pendingMu.Unlock() + + switch frame.typ { + case EventTypeSessionOpen, EventTypeSessionUpdate, EventTypeSessionClose: + es.callbackMu.RLock() + onEvent := es.onEvent + es.callbackMu.RUnlock() + if onEvent == nil { + return + } + if !onEvent(frame.typ, frame.seq, frame.sessionDelta) { + return + } + case EventTypeFullResync: + es.callbackMu.RLock() + onFullResync := es.onFullResync + es.callbackMu.RUnlock() + if onFullResync == nil { + return + } + if !onFullResync() { + return + } + case EventFrameTypePolicyDeny, EventFrameTypeScreenDrop, EventFrameTypeFilterLog: + onRawDataplaneEvent, onDataplaneEvent := es.dataplaneCallbacks() + if onRawDataplaneEvent == nil && onDataplaneEvent == nil { + return + } + if onRawDataplaneEvent != nil { + onRawDataplaneEvent(frame.seq, frame.dataplanePayload) + } else { + onDataplaneEvent(frame.seq, frame.dataplaneRecord) + } + es.recordDataplaneEvent(frame.typ) + default: + slog.Warn("event stream: dropping unsupported pending callback frame", + "type", frame.typ, "seq", frame.seq) + } + + es.markFrameApplied(frame.seq) + es.pendingMu.Lock() + if len(es.pendingCallbackFrames) > 0 && es.pendingCallbackFrames[0].seq == frame.seq { + copy(es.pendingCallbackFrames, es.pendingCallbackFrames[1:]) + es.pendingCallbackFrames = es.pendingCallbackFrames[:len(es.pendingCallbackFrames)-1] + } + es.pendingMu.Unlock() + } +} + +func (es *EventStream) recordDataplaneEvent(typ uint8) { + switch typ { + case EventFrameTypePolicyDeny: + es.PolicyDenyEvents.Add(1) + case EventFrameTypeScreenDrop: + es.ScreenDropEvents.Add(1) + case EventFrameTypeFilterLog: + es.FilterLogEvents.Add(1) + } +} + +func (es *EventStream) recordDataplaneEventDrop(typ uint8) { + switch typ { + case EventFrameTypePolicyDeny: + es.PolicyDenyDrops.Add(1) + case EventFrameTypeScreenDrop: + es.ScreenDropDrops.Add(1) + case EventFrameTypeFilterLog: + es.FilterLogDrops.Add(1) + default: + es.UnknownFrameDrops.Add(1) + } +} + // ackLoop periodically sends Ack frames to the helper with the highest // consumed sequence number. func (es *EventStream) ackLoop(ctx context.Context) { @@ -339,6 +659,7 @@ func (es *EventStream) ackLoop(ctx context.Context) { return case <-ticker.C: } + es.flushPendingCallbackFrames() es.sendAckIfNeeded() } } @@ -422,6 +743,7 @@ func (es *EventStream) writeFrame(typ uint8, seq uint64, payload []byte) error { // [N..] NeighborMAC (6 bytes) // [N+6..] SrcMAC (6 bytes) // [N+12..]NextHop (4 or 16 bytes) +// // wireAFToDataplane maps the 1-byte wire encoding (4 = IPv4, 6 = IPv6 // — chosen by the Rust codec to match the protocol number; see // userspace-dp/src/event_stream/codec.rs:88) to the Linux dataplane @@ -588,6 +910,31 @@ func decodeSessionCloseEvent(payload []byte) (SessionDeltaInfo, bool) { return d, true } +// decodeDataplaneEventPayload decodes the canonical dataplane.Event RT_FLOW +// payload. Userspace-dp carries these bytes over event-stream frame types 11-13, +// but the payload itself is the same shape consumed by pkg/logging/ringbuf.go. +func decodeDataplaneEventPayload(payload []byte) (logging.EventRecord, bool) { + return logging.DecodeRawEventRecord(payload) +} + +func dataplaneEventPayloadMatchesFrame(typ uint8, payload []byte) bool { + if len(payload) <= 52 { + return false + } + var want uint8 + switch typ { + case EventFrameTypePolicyDeny: + want = dataplane.EventTypePolicyDeny + case EventFrameTypeScreenDrop: + want = dataplane.EventTypeScreenDrop + case EventFrameTypeFilterLog: + want = dataplane.EventTypeFilterLog + default: + return false + } + return payload[52] == want +} + // formatIP converts raw IP bytes to a string representation. func formatIP(b []byte, af uint8) string { if af == 4 { diff --git a/pkg/dataplane/userspace/eventstream_test.go b/pkg/dataplane/userspace/eventstream_test.go index 59618b254..aa911021e 100644 --- a/pkg/dataplane/userspace/eventstream_test.go +++ b/pkg/dataplane/userspace/eventstream_test.go @@ -1,6 +1,7 @@ package userspace import ( + "bytes" "context" "encoding/binary" "io" @@ -10,8 +11,10 @@ import ( "sync/atomic" "testing" "time" + "unsafe" "github.com/psaab/xpf/pkg/dataplane" + "github.com/psaab/xpf/pkg/logging" ) // buildSessionOpenV4Payload builds a binary SessionOpen payload for an IPv4 session. @@ -81,6 +84,36 @@ func buildSessionCloseV4Payload( return buf } +func buildDataplaneEventV4Payload( + proto uint8, + srcPort, dstPort uint16, + srcIP, dstIP [4]byte, + natSrcIP [4]byte, + natSrcPort uint16, + ingressZone, egressZone uint16, + reason uint16, + policyID uint32, + timestampNS uint64, +) []byte { + _ = reason // RT_FLOW policy-deny records carry policy identity, not the userspace-only reason field. + buf := make([]byte, int(unsafe.Sizeof(dataplane.Event{}))) + binary.LittleEndian.PutUint64(buf[0:8], timestampNS) + copy(buf[8:12], srcIP[:]) + copy(buf[24:28], dstIP[:]) + binary.BigEndian.PutUint16(buf[40:42], srcPort) + binary.BigEndian.PutUint16(buf[42:44], dstPort) + binary.LittleEndian.PutUint32(buf[44:48], policyID) + binary.LittleEndian.PutUint16(buf[48:50], ingressZone) + binary.LittleEndian.PutUint16(buf[50:52], egressZone) + buf[52] = dataplane.EventTypePolicyDeny + buf[53] = proto + buf[54] = dataplane.ActionDeny + buf[55] = dataplane.AFInet + copy(buf[72:76], natSrcIP[:]) + binary.BigEndian.PutUint16(buf[104:106], natSrcPort) + return buf +} + func writeFrame(w io.Writer, typ uint8, seq uint64, payload []byte) error { var hdr [EventFrameHeaderSize]byte binary.LittleEndian.PutUint32(hdr[0:4], uint32(len(payload))) @@ -114,19 +147,19 @@ func readFrame(r io.Reader) (typ uint8, seq uint64, payload []byte, err error) { func TestDecodeSessionEventV4(t *testing.T) { payload := buildSessionOpenV4Payload( - 6, // TCP - 12345, 443, // ports + 6, // TCP + 12345, 443, // ports [4]byte{10, 0, 1, 102}, [4]byte{172, 16, 80, 200}, // src, dst [4]byte{172, 16, 80, 8}, [4]byte{0, 0, 0, 0}, // nat src, nat dst 40000, 0, // nat ports - 1, // ownerRG + 1, // ownerRG 12, 11, // egress/tx ifindex 0, 80, // tunnel, vlan SessionEventFlagFabricRedirect, // flags - 1, 2, 0, // ingress/egress zone, disposition + 1, 2, 0, // ingress/egress zone, disposition [6]byte{0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff}, // neighbor MAC [6]byte{0x02, 0xbf, 0x72, 0x00, 0x50, 0x08}, // src MAC - [4]byte{172, 16, 80, 1}, // next hop + [4]byte{172, 16, 80, 1}, // next hop ) d, ok := decodeSessionEvent(payload) @@ -268,6 +301,60 @@ func TestDecodeSessionEventRejectsTruncated(t *testing.T) { } } +func TestDecodeDataplaneEventPolicyDenyRTFlow(t *testing.T) { + payload := buildDataplaneEventV4Payload( + 6, + 12345, 443, + [4]byte{10, 0, 1, 102}, [4]byte{172, 16, 80, 200}, + [4]byte{172, 16, 80, 8}, + 40000, + 1, 2, + 7, + 99, + 1700000000000000000, + ) + if !dataplaneEventPayloadMatchesFrame(EventFrameTypePolicyDeny, payload) { + t.Fatal("RT_FLOW payload did not match policy-deny event-stream frame type") + } + binary.LittleEndian.PutUint32(payload[56:60], 1234) + binary.LittleEndian.PutUint32(payload[60:64], 5678) + binary.LittleEndian.PutUint16(payload[64:66], 3) + payload[134] = dataplane.CloseReasonPolicy + rec, ok := decodeDataplaneEventPayload(payload) + if !ok { + t.Fatal("decodeDataplaneEventPayload returned false") + } + if rec.Type != "POLICY_DENY" || rec.Action != "deny" { + t.Fatalf("event = %s/%s, want POLICY_DENY/deny", rec.Type, rec.Action) + } + if rec.Protocol != "TCP" { + t.Fatalf("Protocol = %q, want TCP", rec.Protocol) + } + if rec.SrcAddr != "10.0.1.102:12345" { + t.Fatalf("SrcAddr = %q, want 10.0.1.102:12345", rec.SrcAddr) + } + if rec.DstAddr != "172.16.80.200:443" { + t.Fatalf("DstAddr = %q, want 172.16.80.200:443", rec.DstAddr) + } + if rec.NATSrcAddr != "172.16.80.8:40000" { + t.Fatalf("NATSrcAddr = %q, want 172.16.80.8:40000", rec.NATSrcAddr) + } + if rec.PolicyID != 99 || rec.InZone != 1 || rec.OutZone != 2 { + t.Fatalf("identity = policy %d zones %d->%d, want policy 99 zones 1->2", rec.PolicyID, rec.InZone, rec.OutZone) + } + if rec.ScreenCheck != "" { + t.Fatalf("ScreenCheck/reason = %q, want empty for policy-deny RT_FLOW", rec.ScreenCheck) + } + if rec.RuleID != 1234 || rec.TermID != 5678 || rec.OwnerRGID != 3 || rec.Reason != "Rejected by policy" { + t.Fatalf("metadata = rule %d term %d owner_rg %d reason %q, want 1234/5678/3/Rejected by policy", + rec.RuleID, rec.TermID, rec.OwnerRGID, rec.Reason) + } + if rec.SessionPkts != 0 || rec.SessionBytes != 0 { + t.Fatalf("security metadata must not leak through session counters: pkts=%d bytes=%d", + rec.SessionPkts, rec.SessionBytes) + } +} + func TestFrameRoundTrip(t *testing.T) { payload := buildSessionOpenV4Payload( 6, 1234, 80, @@ -305,9 +392,10 @@ func TestEventStreamAcceptAndRead(t *testing.T) { es := NewEventStream(sockPath) var received atomic.Int32 var lastSeq atomic.Uint64 - es.SetOnEvent(func(eventType uint8, seq uint64, delta SessionDeltaInfo) { + es.SetOnEvent(func(eventType uint8, seq uint64, delta SessionDeltaInfo) bool { received.Add(1) lastSeq.Store(seq) + return true }) ctx, cancel := context.WithCancel(context.Background()) @@ -362,12 +450,463 @@ func TestEventStreamAcceptAndRead(t *testing.T) { } } +func TestEventStreamSessionEventBeforeCallbackQueuesUntilCallback(t *testing.T) { + dir := t.TempDir() + sockPath := filepath.Join(dir, "test-events.sock") + + es := NewEventStream(sockPath) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + es.Start(ctx) + defer es.Close() + + time.Sleep(50 * time.Millisecond) + conn, err := net.Dial("unix", sockPath) + if err != nil { + t.Fatalf("dial: %v", err) + } + defer conn.Close() + deadline := time.Now().Add(2 * time.Second) + for !es.IsConnected() { + if time.Now().After(deadline) { + t.Fatal("event stream did not become connected") + } + time.Sleep(10 * time.Millisecond) + } + + payload := buildSessionOpenV4Payload( + 6, 1000, 80, + [4]byte{10, 0, 1, 1}, [4]byte{10, 0, 2, 1}, + [4]byte{}, [4]byte{}, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + [6]byte{}, [6]byte{}, [4]byte{}, + ) + if err := writeFrame(conn, EventTypeSessionOpen, 5, payload); err != nil { + t.Fatalf("write session event: %v", err) + } + _ = conn.SetReadDeadline(time.Now().Add(250 * time.Millisecond)) + if typ, seq, _, err := readFrame(conn); err == nil { + t.Fatalf("unexpected ack before callback was wired: type %d seq %d", typ, seq) + } + + got := make(chan uint64, 1) + es.SetOnEvent(func(_ uint8, seq uint64, _ SessionDeltaInfo) bool { + got <- seq + return true + }) + select { + case seq := <-got: + if seq != 5 { + t.Fatalf("callback seq = %d, want 5", seq) + } + case <-time.After(2 * time.Second): + t.Fatal("callback not invoked after late registration") + } + _ = conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + typ, seq, _, err := readFrame(conn) + if err != nil { + t.Fatalf("read ack after callback: %v", err) + } + if typ != EventTypeAck || seq != 5 { + t.Fatalf("ack after callback = type %d seq %d, want type %d seq 5", typ, seq, EventTypeAck) + } +} + +func TestEventStreamSessionCallbackFalseWithholdsAck(t *testing.T) { + dir := t.TempDir() + sockPath := filepath.Join(dir, "test-events.sock") + + es := NewEventStream(sockPath) + es.SetOnEvent(func(_ uint8, _ uint64, _ SessionDeltaInfo) bool { + return false + }) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + es.Start(ctx) + defer es.Close() + + time.Sleep(50 * time.Millisecond) + conn, err := net.Dial("unix", sockPath) + if err != nil { + t.Fatalf("dial: %v", err) + } + defer conn.Close() + deadline := time.Now().Add(2 * time.Second) + for !es.IsConnected() { + if time.Now().After(deadline) { + t.Fatal("event stream did not become connected") + } + time.Sleep(10 * time.Millisecond) + } + + payload := buildSessionOpenV4Payload( + 6, 1000, 80, + [4]byte{10, 0, 1, 1}, [4]byte{10, 0, 2, 1}, + [4]byte{}, [4]byte{}, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + [6]byte{}, [6]byte{}, [4]byte{}, + ) + if err := writeFrame(conn, EventTypeSessionOpen, 9, payload); err != nil { + t.Fatalf("write session event: %v", err) + } + + _ = conn.SetReadDeadline(time.Now().Add(300 * time.Millisecond)) + if typ, seq, _, err := readFrame(conn); err == nil { + t.Fatalf("unexpected ack for unapplied callback: type %d seq %d", typ, seq) + } + if acked := es.LastAckedSequence(); acked != 0 { + t.Fatalf("LastAckedSequence = %d, want 0 after callback returned false", acked) + } + + es.SetOnEvent(func(_ uint8, _ uint64, _ SessionDeltaInfo) bool { + return true + }) + _ = conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + typ, seq, _, err := readFrame(conn) + if err != nil { + t.Fatalf("read ack after callback became ready: %v", err) + } + if typ != EventTypeAck || seq != 9 { + t.Fatalf("ack after callback ready = type %d seq %d, want type %d seq 9", typ, seq, EventTypeAck) + } +} + +func TestEventStreamDataplaneEventAckAndCallback(t *testing.T) { + dir := t.TempDir() + sockPath := filepath.Join(dir, "test-events.sock") + + es := NewEventStream(sockPath) + type dataplaneEventResult struct { + seq uint64 + rec logging.EventRecord + } + got := make(chan dataplaneEventResult, 1) + es.SetOnDataplaneEvent(func(seq uint64, rec logging.EventRecord) { + got <- dataplaneEventResult{seq: seq, rec: rec} + }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + es.Start(ctx) + defer es.Close() + + waitCtx, waitCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer waitCancel() + var conn net.Conn + for conn == nil { + var err error + conn, err = net.Dial("unix", sockPath) + if err == nil { + break + } + select { + case <-waitCtx.Done(): + t.Fatalf("dial: %v", err) + case <-time.After(10 * time.Millisecond): + } + } + defer conn.Close() + + connectCtx, connectCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer connectCancel() + for !es.IsConnected() { + select { + case <-connectCtx.Done(): + t.Fatal("event stream did not become connected") + case <-time.After(10 * time.Millisecond): + } + } + + payload := buildDataplaneEventV4Payload( + 6, 1111, 443, + [4]byte{10, 0, 1, 5}, [4]byte{172, 16, 80, 200}, + [4]byte{172, 16, 80, 8}, + 40000, + 1, 2, + 0, 77, + 0, + ) + if err := writeFrame(conn, EventFrameTypePolicyDeny, 7, payload); err != nil { + t.Fatalf("write dataplane event: %v", err) + } + + select { + case result := <-got: + if result.seq != 7 { + t.Fatalf("seq = %d, want 7", result.seq) + } + if result.rec.Type != "POLICY_DENY" || result.rec.PolicyID != 77 { + t.Fatalf("rec = %+v, want policy deny policy 77", result.rec) + } + case <-time.After(2 * time.Second): + t.Fatal("dataplane event callback not called") + } + + _ = conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond)) + typ, seq, _, err := readFrame(conn) + if err != nil { + t.Fatalf("read ack: %v", err) + } + if typ != EventTypeAck || seq != 7 { + t.Fatalf("ack = type %d seq %d, want type %d seq 7", typ, seq, EventTypeAck) + } + if got := es.PolicyDenyEvents.Load(); got != 1 { + t.Fatalf("PolicyDenyEvents = %d, want 1", got) + } +} + +func TestEventStreamDataplaneEventRawCallbackPreferred(t *testing.T) { + dir := t.TempDir() + sockPath := filepath.Join(dir, "test-events.sock") + + es := NewEventStream(sockPath) + rawGot := make(chan []byte, 1) + decodedGot := make(chan struct{}, 1) + es.SetOnRawDataplaneEvent(func(seq uint64, payload []byte) { + if seq != 11 { + t.Fatalf("seq = %d, want 11", seq) + } + rawGot <- append([]byte(nil), payload...) + }) + es.SetOnDataplaneEvent(func(uint64, logging.EventRecord) { + decodedGot <- struct{}{} + }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + es.Start(ctx) + defer es.Close() + + waitCtx, waitCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer waitCancel() + var conn net.Conn + for conn == nil { + var err error + conn, err = net.Dial("unix", sockPath) + if err == nil { + break + } + select { + case <-waitCtx.Done(): + t.Fatalf("dial: %v", err) + case <-time.After(10 * time.Millisecond): + } + } + defer conn.Close() + + connectCtx, connectCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer connectCancel() + for !es.IsConnected() { + select { + case <-connectCtx.Done(): + t.Fatal("event stream did not become connected") + case <-time.After(10 * time.Millisecond): + } + } + + payload := buildDataplaneEventV4Payload( + 6, 1111, 443, + [4]byte{10, 0, 1, 5}, [4]byte{172, 16, 80, 200}, + [4]byte{172, 16, 80, 8}, + 40000, + 1, 2, + 0, 77, + 0, + ) + if err := writeFrame(conn, EventFrameTypePolicyDeny, 11, payload); err != nil { + t.Fatalf("write dataplane event: %v", err) + } + + select { + case got := <-rawGot: + if !bytes.Equal(got, payload) { + t.Fatalf("raw payload changed: got %d bytes, want %d", len(got), len(payload)) + } + case <-time.After(2 * time.Second): + t.Fatal("raw dataplane event callback not called") + } + select { + case <-decodedGot: + t.Fatal("decoded callback should not fire when raw callback is installed") + default: + } +} + +func TestEventStreamDataplaneEventBeforeCallbackQueuesUntilCallback(t *testing.T) { + dir := t.TempDir() + sockPath := filepath.Join(dir, "test-events.sock") + + es := NewEventStream(sockPath) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + es.Start(ctx) + defer es.Close() + + time.Sleep(50 * time.Millisecond) + + conn, err := net.Dial("unix", sockPath) + if err != nil { + t.Fatalf("dial: %v", err) + } + defer conn.Close() + + deadline := time.Now().Add(2 * time.Second) + for !es.IsConnected() { + if time.Now().After(deadline) { + t.Fatal("event stream did not become connected") + } + time.Sleep(10 * time.Millisecond) + } + + payload := buildDataplaneEventV4Payload( + 6, 1111, 443, + [4]byte{10, 0, 1, 5}, [4]byte{172, 16, 80, 200}, + [4]byte{172, 16, 80, 8}, + 40000, + 1, 2, + 0, 77, + 0, + ) + if err := writeFrame(conn, EventFrameTypePolicyDeny, 21, payload); err != nil { + t.Fatalf("write first dataplane event: %v", err) + } + _ = conn.SetReadDeadline(time.Now().Add(250 * time.Millisecond)) + if typ, seq, _, err := readFrame(conn); err == nil { + t.Fatalf("unexpected ack before callback was wired: type %d seq %d", typ, seq) + } + if got := es.PolicyDenyDrops.Load(); got != 0 { + t.Fatalf("PolicyDenyDrops = %d, want 0", got) + } + + got := make(chan uint64, 1) + es.SetOnDataplaneEvent(func(seq uint64, _ logging.EventRecord) { + got <- seq + }) + + select { + case gotSeq := <-got: + if gotSeq != 21 { + t.Fatalf("callback seq = %d, want 21", gotSeq) + } + case <-time.After(2 * time.Second): + t.Fatal("callback not invoked after late registration") + } + _ = conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + typ, seq, _, err := readFrame(conn) + if err != nil { + t.Fatalf("read ack after callback: %v", err) + } + if typ != EventTypeAck || seq != 21 { + t.Fatalf("ack after callback = type %d seq %d, want type %d seq 21", typ, seq, EventTypeAck) + } + if got := es.PolicyDenyEvents.Load(); got != 1 { + t.Fatalf("PolicyDenyEvents = %d, want 1", got) + } +} + +func TestEventStreamMalformedDataplaneEventDropsAndAcks(t *testing.T) { + dir := t.TempDir() + sockPath := filepath.Join(dir, "test-events.sock") + + es := NewEventStream(sockPath) + var callbackCalled atomic.Bool + es.SetOnDataplaneEvent(func(uint64, logging.EventRecord) { + callbackCalled.Store(true) + }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + es.Start(ctx) + defer es.Close() + + time.Sleep(50 * time.Millisecond) + + conn, err := net.Dial("unix", sockPath) + if err != nil { + t.Fatalf("dial: %v", err) + } + defer conn.Close() + + deadline := time.Now().Add(2 * time.Second) + for !es.IsConnected() { + if time.Now().After(deadline) { + t.Fatal("event stream did not become connected") + } + time.Sleep(10 * time.Millisecond) + } + + if err := writeFrame(conn, EventFrameTypeScreenDrop, 9, []byte{4, 6}); err != nil { + t.Fatalf("write malformed dataplane event: %v", err) + } + + _ = conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond)) + typ, seq, _, err := readFrame(conn) + if err != nil { + t.Fatalf("read ack: %v", err) + } + if typ != EventTypeAck || seq != 9 { + t.Fatalf("ack = type %d seq %d, want type %d seq 9", typ, seq, EventTypeAck) + } + if got := es.ScreenDropDrops.Load(); got != 1 { + t.Fatalf("ScreenDropDrops = %d, want 1", got) + } + if got := es.DecodeErrors.Load(); got != 1 { + t.Fatalf("DecodeErrors = %d, want 1", got) + } + if callbackCalled.Load() { + t.Fatal("malformed dataplane event must not call callback") + } +} + +func TestEventStreamUnknownFrameDropsAndAcks(t *testing.T) { + dir := t.TempDir() + sockPath := filepath.Join(dir, "test-events.sock") + + es := NewEventStream(sockPath) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + es.Start(ctx) + defer es.Close() + + time.Sleep(50 * time.Millisecond) + + conn, err := net.Dial("unix", sockPath) + if err != nil { + t.Fatalf("dial: %v", err) + } + defer conn.Close() + + deadline := time.Now().Add(2 * time.Second) + for !es.IsConnected() { + if time.Now().After(deadline) { + t.Fatal("event stream did not become connected") + } + time.Sleep(10 * time.Millisecond) + } + + if err := writeFrame(conn, 250, 11, nil); err != nil { + t.Fatalf("write unknown frame: %v", err) + } + + _ = conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond)) + typ, seq, _, err := readFrame(conn) + if err != nil { + t.Fatalf("read ack: %v", err) + } + if typ != EventTypeAck || seq != 11 { + t.Fatalf("ack = type %d seq %d, want type %d seq 11", typ, seq, EventTypeAck) + } + if got := es.UnknownFrameDrops.Load(); got != 1 { + t.Fatalf("UnknownFrameDrops = %d, want 1", got) + } +} + func TestEventStreamAcksSent(t *testing.T) { dir := t.TempDir() sockPath := filepath.Join(dir, "test-events.sock") es := NewEventStream(sockPath) - es.SetOnEvent(func(uint8, uint64, SessionDeltaInfo) {}) + es.SetOnEvent(func(uint8, uint64, SessionDeltaInfo) bool { return true }) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -422,7 +961,10 @@ func TestEventStreamFullResyncCallback(t *testing.T) { es := NewEventStream(sockPath) var resyncCalled atomic.Bool - es.SetOnFullResync(func() { resyncCalled.Store(true) }) + es.SetOnFullResync(func() bool { + resyncCalled.Store(true) + return true + }) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -459,14 +1001,69 @@ func TestEventStreamFullResyncCallback(t *testing.T) { } } +func TestEventStreamFullResyncBeforeCallbackQueuesUntilCallback(t *testing.T) { + dir := t.TempDir() + sockPath := filepath.Join(dir, "test-events.sock") + + es := NewEventStream(sockPath) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + es.Start(ctx) + defer es.Close() + + time.Sleep(50 * time.Millisecond) + conn, err := net.Dial("unix", sockPath) + if err != nil { + t.Fatalf("dial: %v", err) + } + defer conn.Close() + deadline := time.Now().Add(2 * time.Second) + for !es.IsConnected() { + if time.Now().After(deadline) { + t.Fatal("not connected") + } + time.Sleep(10 * time.Millisecond) + } + + if err := writeFrame(conn, EventTypeFullResync, 7, nil); err != nil { + t.Fatalf("write FullResync: %v", err) + } + _ = conn.SetReadDeadline(time.Now().Add(250 * time.Millisecond)) + if typ, seq, _, err := readFrame(conn); err == nil { + t.Fatalf("unexpected ack before FullResync callback was wired: type %d seq %d", typ, seq) + } + + var resyncCalled atomic.Bool + es.SetOnFullResync(func() bool { + resyncCalled.Store(true) + return true + }) + deadline = time.Now().Add(2 * time.Second) + for !resyncCalled.Load() { + if time.Now().After(deadline) { + t.Fatal("onFullResync not called after late registration") + } + time.Sleep(10 * time.Millisecond) + } + _ = conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + typ, seq, _, err := readFrame(conn) + if err != nil { + t.Fatalf("read ack after FullResync callback: %v", err) + } + if typ != EventTypeAck || seq != 7 { + t.Fatalf("ack after FullResync callback = type %d seq %d, want type %d seq 7", typ, seq, EventTypeAck) + } +} + func TestEventStreamDrainRequestComplete(t *testing.T) { dir := t.TempDir() sockPath := filepath.Join(dir, "test-events.sock") es := NewEventStream(sockPath) var applied atomic.Int32 - es.SetOnEvent(func(uint8, uint64, SessionDeltaInfo) { + es.SetOnEvent(func(uint8, uint64, SessionDeltaInfo) bool { applied.Add(1) + return true }) ctx, cancel := context.WithCancel(context.Background()) @@ -569,8 +1166,9 @@ func TestEventStreamDisconnectReconnect(t *testing.T) { es := NewEventStream(sockPath) var received atomic.Int32 - es.SetOnEvent(func(uint8, uint64, SessionDeltaInfo) { + es.SetOnEvent(func(uint8, uint64, SessionDeltaInfo) bool { received.Add(1) + return true }) ctx, cancel := context.WithCancel(context.Background()) @@ -650,7 +1248,7 @@ func TestEventStreamPauseResume(t *testing.T) { sockPath := filepath.Join(dir, "test-events.sock") es := NewEventStream(sockPath) - es.SetOnEvent(func(uint8, uint64, SessionDeltaInfo) {}) + es.SetOnEvent(func(uint8, uint64, SessionDeltaInfo) bool { return true }) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -711,7 +1309,7 @@ func TestEventStreamSequenceGapDetection(t *testing.T) { sockPath := filepath.Join(dir, "test-events.sock") es := NewEventStream(sockPath) - es.SetOnEvent(func(uint8, uint64, SessionDeltaInfo) {}) + es.SetOnEvent(func(uint8, uint64, SessionDeltaInfo) bool { return true }) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/pkg/dataplane/userspace/maps_sync.go b/pkg/dataplane/userspace/maps_sync.go index ae68e774a..a57ca825c 100644 --- a/pkg/dataplane/userspace/maps_sync.go +++ b/pkg/dataplane/userspace/maps_sync.go @@ -609,6 +609,10 @@ ctrlReady: status.ConfiguredMode = m.configuredMode.String() status.EntryPrograms = m.entryProgramsLocked() status.FallbackCounters = m.readFallbackStatsLocked() + if m.eventStream != nil { + es := m.eventStream.Status() + status.EventStream = &es + } m.lastStatus = *status return nil diff --git a/pkg/dataplane/userspace/protocol.go b/pkg/dataplane/userspace/protocol.go index 43e37a192..3024f50f1 100644 --- a/pkg/dataplane/userspace/protocol.go +++ b/pkg/dataplane/userspace/protocol.go @@ -465,6 +465,9 @@ type ProcessStatus struct { CoSActiveFlowCountsTruncated bool `json:"cos_active_flow_counts_truncated,omitempty"` RecentSessionDeltas []SessionDeltaInfo `json:"recent_session_deltas,omitempty"` RecentExceptions []ExceptionStatus `json:"recent_exceptions,omitempty"` + EventStream *EventStreamStatus `json:"event_stream,omitempty"` + EventStreamSent uint64 `json:"event_stream_sent,omitempty"` + EventStreamDropped uint64 `json:"event_stream_dropped,omitempty"` CoSInterfaces []CoSInterfaceStatus `json:"cos_interfaces,omitempty"` FilterTermCounters []FirewallFilterTermCounterStatus `json:"filter_term_counters,omitempty"` LastResolution *PacketResolution `json:"last_resolution,omitempty"` @@ -476,6 +479,20 @@ type ProcessStatus struct { FallbackCounters map[string]uint64 `json:"fallback_counters,omitempty"` // reason_name -> count } +type EventStreamStatus struct { + FramesRead uint64 `json:"frames_read,omitempty"` + FramesWritten uint64 `json:"frames_written,omitempty"` + DecodeErrors uint64 `json:"decode_errors,omitempty"` + SeqGaps uint64 `json:"seq_gaps,omitempty"` + PolicyDenyEvents uint64 `json:"policy_deny_events,omitempty"` + ScreenDropEvents uint64 `json:"screen_drop_events,omitempty"` + FilterLogEvents uint64 `json:"filter_log_events,omitempty"` + PolicyDenyDrops uint64 `json:"policy_deny_drops,omitempty"` + ScreenDropDrops uint64 `json:"screen_drop_drops,omitempty"` + FilterLogDrops uint64 `json:"filter_log_drops,omitempty"` + UnknownFrameDrops uint64 `json:"unknown_frame_drops,omitempty"` +} + type CoSInterfaceStatus struct { Ifindex int `json:"ifindex,omitempty"` InterfaceName string `json:"interface_name,omitempty"` @@ -1073,16 +1090,19 @@ const EventFrameHeaderSize = 16 // Event stream message types. const ( - EventTypeSessionOpen uint8 = 1 - EventTypeSessionClose uint8 = 2 - EventTypeSessionUpdate uint8 = 3 - EventTypeAck uint8 = 4 // daemon → helper - EventTypePause uint8 = 5 // daemon → helper - EventTypeResume uint8 = 6 // daemon → helper - EventTypeDrainRequest uint8 = 7 // daemon → helper (target seq in header) - EventTypeDrainComplete uint8 = 8 // helper → daemon - EventTypeFullResync uint8 = 9 // helper → daemon - EventTypeKeepalive uint8 = 10 // helper → daemon (idle heartbeat) + EventTypeSessionOpen uint8 = 1 + EventTypeSessionClose uint8 = 2 + EventTypeSessionUpdate uint8 = 3 + EventTypeAck uint8 = 4 // daemon → helper + EventTypePause uint8 = 5 // daemon → helper + EventTypeResume uint8 = 6 // daemon → helper + EventTypeDrainRequest uint8 = 7 // daemon → helper (target seq in header) + EventTypeDrainComplete uint8 = 8 // helper → daemon + EventTypeFullResync uint8 = 9 // helper → daemon + EventTypeKeepalive uint8 = 10 // helper → daemon (idle heartbeat) + EventFrameTypePolicyDeny uint8 = 11 // helper → daemon (RT_FLOW policy deny) + EventFrameTypeScreenDrop uint8 = 12 // helper → daemon (RT_FLOW screen drop) + EventFrameTypeFilterLog uint8 = 13 // helper → daemon (RT_FLOW filter log) ) // Session event flag bits in the Flags byte of SessionOpen/Update/Close payloads. diff --git a/pkg/dataplane/userspace/statusfmt.go b/pkg/dataplane/userspace/statusfmt.go index 0d250ff1b..07ebc7117 100644 --- a/pkg/dataplane/userspace/statusfmt.go +++ b/pkg/dataplane/userspace/statusfmt.go @@ -289,6 +289,17 @@ func FormatStatusSummary(status ProcessStatus) string { fmt.Fprintf(&b, " Session delta generated: %d\n", sessionDeltaGenerated) fmt.Fprintf(&b, " Session delta dropped: %d\n", sessionDeltaDropped) fmt.Fprintf(&b, " Session delta drained: %d\n", sessionDeltaDrained) + if status.EventStream != nil { + es := status.EventStream + fmt.Fprintf(&b, " Event stream frames: read=%d written=%d decode_errors=%d seq_gaps=%d\n", + es.FramesRead, es.FramesWritten, es.DecodeErrors, es.SeqGaps) + fmt.Fprintf(&b, " Event stream producer: sent=%d dropped=%d\n", + status.EventStreamSent, status.EventStreamDropped) + fmt.Fprintf(&b, " Event stream events: policy_deny=%d screen_drop=%d filter_log=%d unknown_drops=%d\n", + es.PolicyDenyEvents, es.ScreenDropEvents, es.FilterLogEvents, es.UnknownFrameDrops) + fmt.Fprintf(&b, " Event stream drops: policy_deny=%d screen_drop=%d filter_log=%d\n", + es.PolicyDenyDrops, es.ScreenDropDrops, es.FilterLogDrops) + } fmt.Fprintf(&b, " Policy denied packets: %d\n", policyDeniedPackets) fmt.Fprintf(&b, " SNAT packets: %d\n", snatPackets) fmt.Fprintf(&b, " DNAT packets: %d\n", dnatPackets) diff --git a/pkg/logging/README.md b/pkg/logging/README.md index e79f90306..5a80fd892 100644 --- a/pkg/logging/README.md +++ b/pkg/logging/README.md @@ -40,6 +40,12 @@ reports. - The binary RT_FLOW format used by Junos session logging is custom; it is not human-readable without a parser. Use the local-log facility for human-readable session events. +- Userspace event-stream telemetry enters through + `EventReader.ProcessRawEvent`, not by direct `EventBuffer.Add`, so it + gets the same name resolution, callback fanout, local writers, and + syslog delivery as eBPF ring-buffer events. `DecodeRawEventRecord` is + decode-only and must not be used as a replacement for the full reader + path when audit delivery matters. - The event buffer is bounded. If a subscriber stops draining, new events drop silently — by design. Don't wire a slow consumer to it. - The session aggregator flushes on a 5-minute timer. The flushed diff --git a/pkg/logging/eventbuf.go b/pkg/logging/eventbuf.go index a3371b0d1..0deedbbd7 100644 --- a/pkg/logging/eventbuf.go +++ b/pkg/logging/eventbuf.go @@ -15,6 +15,10 @@ type EventRecord struct { Protocol string // "TCP", "UDP" Action string // "permit", "deny" PolicyID uint32 + RuleID uint32 + TermID uint32 + Reason string + OwnerRGID int16 InZone uint16 OutZone uint16 ScreenCheck string // for SCREEN_DROP @@ -39,8 +43,8 @@ type EventBuffer struct { mu sync.RWMutex buf []EventRecord size int - head int // next write position - count int // number of events stored + head int // next write position + count int // number of events stored seq uint64 // monotonically increasing sequence number subMu sync.RWMutex diff --git a/pkg/logging/ringbuf.go b/pkg/logging/ringbuf.go index e71d002db..bc5b17c7a 100644 --- a/pkg/logging/ringbuf.go +++ b/pkg/logging/ringbuf.go @@ -225,6 +225,18 @@ func (er *EventReader) Run(ctx context.Context) { } } +// ProcessRawEvent feeds an RT_FLOW-format dataplane.Event record through the +// same enrichment, buffering, callback, and syslog/local-log fanout path used +// by the eBPF ring-buffer reader. Userspace transports should call this rather +// than adding decoded records directly to EventBuffer. +func (er *EventReader) ProcessRawEvent(data []byte) bool { + if len(data) < int(unsafe.Sizeof(dataplane.Event{})) { + return false + } + er.logEvent(data) + return true +} + func (er *EventReader) logEvent(data []byte) { var evt dataplane.Event evt.Timestamp = binary.LittleEndian.Uint64(data[0:8]) @@ -305,6 +317,14 @@ func (er *EventReader) logEvent(data []byte) { if evt.EventType == dataplane.EventTypeScreenDrop { rec.ScreenCheck = screenFlagName(evt.PolicyID) } + if evt.EventType != dataplane.EventTypeSessionClose && len(data) >= 136 { + rec.RuleID = binary.LittleEndian.Uint32(data[56:60]) + rec.TermID = binary.LittleEndian.Uint32(data[60:64]) + rec.OwnerRGID = int16(binary.LittleEndian.Uint16(data[64:66])) + if data[134] != dataplane.CloseReasonNone { + rec.Reason = closeReasonName(data[134]) + } + } // Parse extended fields (offset 112+) var closeReasonCode uint8 @@ -458,6 +478,102 @@ func (er *EventReader) logEvent(data []byte) { } } +// DecodeRawEventRecord decodes the fixed dataplane.Event RT_FLOW wire shape +// used by the eBPF ring buffer and by the userspace event-stream adapter. +// It is decode-only; transports that need EventBuffer, callback, local-log, +// and syslog fanout must call EventReader.ProcessRawEvent instead. +func DecodeRawEventRecord(data []byte) (EventRecord, bool) { + if len(data) < int(unsafe.Sizeof(dataplane.Event{})) { + return EventRecord{}, false + } + + var evt dataplane.Event + evt.Timestamp = binary.LittleEndian.Uint64(data[0:8]) + copy(evt.SrcIP[:], data[8:24]) + copy(evt.DstIP[:], data[24:40]) + evt.SrcPort = binary.BigEndian.Uint16(data[40:42]) + evt.DstPort = binary.BigEndian.Uint16(data[42:44]) + evt.PolicyID = binary.LittleEndian.Uint32(data[44:48]) + evt.IngressZone = binary.LittleEndian.Uint16(data[48:50]) + evt.EgressZone = binary.LittleEndian.Uint16(data[50:52]) + evt.EventType = data[52] + evt.Protocol = data[53] + evt.Action = data[54] + evt.AddrFamily = data[55] + evt.SessionPackets = binary.LittleEndian.Uint64(data[56:64]) + evt.SessionBytes = binary.LittleEndian.Uint64(data[64:72]) + copy(evt.NATSrcIP[:], data[72:88]) + copy(evt.NATDstIP[:], data[88:104]) + evt.NATSrcPort = binary.BigEndian.Uint16(data[104:106]) + evt.NATDstPort = binary.BigEndian.Uint16(data[106:108]) + evt.Created = binary.LittleEndian.Uint32(data[108:112]) + evt.RevPackets = binary.LittleEndian.Uint64(data[112:120]) + evt.RevBytes = binary.LittleEndian.Uint64(data[120:128]) + evt.IngressIfindex = binary.LittleEndian.Uint32(data[128:132]) + evt.AppID = binary.LittleEndian.Uint16(data[132:134]) + evt.CloseReason = data[134] + + var srcStr, dstStr, natSrcStr, natDstStr string + switch evt.AddrFamily { + case dataplane.AFInet6: + srcIP := net.IP(evt.SrcIP[:]) + dstIP := net.IP(evt.DstIP[:]) + srcStr = fmt.Sprintf("[%s]:%d", srcIP, evt.SrcPort) + dstStr = fmt.Sprintf("[%s]:%d", dstIP, evt.DstPort) + natSrcIP := net.IP(evt.NATSrcIP[:]) + natDstIP := net.IP(evt.NATDstIP[:]) + natSrcStr = fmt.Sprintf("[%s]:%d", natSrcIP, evt.NATSrcPort) + natDstStr = fmt.Sprintf("[%s]:%d", natDstIP, evt.NATDstPort) + case dataplane.AFInet: + srcIP := net.IP(evt.SrcIP[:4]) + dstIP := net.IP(evt.DstIP[:4]) + srcStr = fmt.Sprintf("%s:%d", srcIP, evt.SrcPort) + dstStr = fmt.Sprintf("%s:%d", dstIP, evt.DstPort) + natSrcIP := net.IP(evt.NATSrcIP[:4]) + natDstIP := net.IP(evt.NATDstIP[:4]) + natSrcStr = fmt.Sprintf("%s:%d", natSrcIP, evt.NATSrcPort) + natDstStr = fmt.Sprintf("%s:%d", natDstIP, evt.NATDstPort) + default: + return EventRecord{}, false + } + + rec := EventRecord{ + Time: time.Now(), + Type: eventTypeName(evt.EventType), + SrcAddr: srcStr, + DstAddr: dstStr, + Protocol: protoName(evt.Protocol), + Action: actionName(evt.Action), + PolicyID: evt.PolicyID, + InZone: evt.IngressZone, + OutZone: evt.EgressZone, + NATSrcAddr: natSrcStr, + NATDstAddr: natDstStr, + SessionPkts: evt.SessionPackets, + SessionBytes: evt.SessionBytes, + RevSessionPkts: evt.RevPackets, + RevSessionBytes: evt.RevBytes, + CloseReason: closeReasonName(evt.CloseReason), + } + if evt.EventType != dataplane.EventTypeSessionClose { + rec.SessionPkts = 0 + rec.SessionBytes = 0 + rec.RuleID = binary.LittleEndian.Uint32(data[56:60]) + rec.TermID = binary.LittleEndian.Uint32(data[60:64]) + rec.OwnerRGID = int16(binary.LittleEndian.Uint16(data[64:66])) + if evt.CloseReason != dataplane.CloseReasonNone { + rec.Reason = closeReasonName(evt.CloseReason) + } + } + if evt.Timestamp > 0 && evt.Timestamp <= uint64(1<<63-1) { + rec.Time = time.Unix(0, int64(evt.Timestamp)) + } + if evt.EventType == dataplane.EventTypeScreenDrop { + rec.ScreenCheck = screenFlagName(evt.PolicyID) + } + return rec, true +} + // eventCategory maps event types to category bitmask values. func eventCategory(eventType uint8) uint8 { switch eventType { diff --git a/userspace-dp/src/event_stream/README.md b/userspace-dp/src/event_stream/README.md index 039edacd5..135c5545e 100644 --- a/userspace-dp/src/event_stream/README.md +++ b/userspace-dp/src/event_stream/README.md @@ -14,14 +14,26 @@ periodic ACK from the daemon. payload. Message types: `MSG_SESSION_OPEN`, `MSG_SESSION_CLOSE`, `MSG_SESSION_UPDATE`, `MSG_ACK`, `MSG_PAUSE`, `MSG_RESUME`, `MSG_DRAIN_REQUEST`, `MSG_DRAIN_COMPLETE`, `MSG_FULL_RESYNC`, - `MSG_KEEPALIVE` (1..10). + `MSG_KEEPALIVE` (1..10), plus RT_FLOW dataplane telemetry frames + `MSG_POLICY_DENY`, `MSG_SCREEN_DROP`, and `MSG_FILTER_LOG` (11..13). + The telemetry frame payload is not a userspace-specific schema: it is + the same 136-byte `dataplane.Event` layout consumed by the Go ringbuf + logger, including AF values 2/10 and big-endian L4 ports. Userspace + telemetry may also populate the non-session metadata slots used by + the Go adapter for action, rule ID, term ID, reason, owner RG, + ingress ifindex, and application ID. - `codec_tests.rs`, `tests.rs` — co-located. ## Why push Polled deltas at 1 Hz were missing fast-cycling sessions (open + close between ticks). The push stream sees every transition. The Go listener -buffers and batches before forwarding to syslog / NetFlow. +feeds RT_FLOW dataplane events through the same `logging.EventReader` +path as ringbuf records, so EventBuffer, callbacks, local writers, +syslog, NetFlow/IPFIX consumers, and name resolution stay consistent +between eBPF and userspace transports. The listener is wired in both HA +cluster and standalone userspace modes; only session replication remains +cluster-scoped. ## Gotchas @@ -34,3 +46,25 @@ buffers and batches before forwarding to syslog / NetFlow. (see `protocol.rs`). Use `push_delta_lossless()` only when correctness requires every frame and the producer can tolerate back-pressure. +- The Go daemon must know every helper→daemon frame type that carries a + sequence number. For RT_FLOW-style dataplane telemetry, the daemon + decodes valid frames through the same RT_FLOW adapter used for ringbuf + records into `logging.EventRecord`; malformed or + forward-version unknown frames are explicitly counted, dropped, and + ACKed so the helper replay buffer cannot churn forever on an + unconsumable event. +- Callback-dependent frames are ACKed only after the relevant daemon + callback has consumed them. If the helper connects before session-sync + or RT_FLOW callbacks are wired, the daemon queues a bounded prefix and + withholds the cumulative ACK; overflow closes the stream so the helper + replays instead of silently losing audit or HA session events. If the + replay buffer no longer contains `acked_seq + 1`, the helper sends a + FullResync request even when `acked_seq == 0`; this covers the + boot-time queue-overflow case where seq 1 was trimmed before any ACK. +- Session callbacks and FullResync callbacks are ACK gates. A callback + that returns false means the daemon is not ready or did not complete + the side effect, so ACK remains withheld and the helper must replay. +- Daemon-side transport counters are exported as + `xpf_userspace_event_stream_*` Prometheus metrics from + `ProcessStatus.EventStream`. Helper-side send/drop counters remain in + the helper status fields. diff --git a/userspace-dp/src/event_stream/codec.rs b/userspace-dp/src/event_stream/codec.rs index dd991eae1..32ca181be 100644 --- a/userspace-dp/src/event_stream/codec.rs +++ b/userspace-dp/src/event_stream/codec.rs @@ -26,6 +26,26 @@ pub(crate) const MSG_DRAIN_COMPLETE: u8 = 8; pub(crate) const MSG_FULL_RESYNC: u8 = 9; pub(crate) const MSG_KEEPALIVE: u8 = 10; +// #1379 codec-only security event foundation. Producer wiring must add +// fixed-size non-blocking emission, rate limiting, and loss accounting. +#[allow(dead_code)] +pub(crate) const MSG_POLICY_DENY: u8 = 11; +#[allow(dead_code)] +pub(crate) const MSG_SCREEN_DROP: u8 = 12; +#[allow(dead_code)] +pub(crate) const MSG_FILTER_LOG: u8 = 13; + +#[allow(dead_code)] +pub(crate) const SECURITY_EVENT_PAYLOAD_SIZE: usize = 136; + +const RT_FLOW_AF_INET: u8 = 2; +const RT_FLOW_AF_INET6: u8 = 10; +const RT_FLOW_EVENT_POLICY_DENY: u8 = 3; +const RT_FLOW_EVENT_SCREEN_DROP: u8 = 4; +const RT_FLOW_EVENT_FILTER_LOG: u8 = 6; +const RT_FLOW_ACTION_DENY: u8 = 0; +const RT_FLOW_ACTION_PERMIT: u8 = 1; + /// Disposition encoding for the wire format. const DISP_FORWARD_CANDIDATE: u8 = 0; const DISP_LOCAL_DELIVERY: u8 = 1; @@ -42,6 +62,80 @@ pub(crate) const FLAG_FABRIC_REDIRECT: u8 = 1 << 0; pub(crate) const FLAG_FABRIC_INGRESS: u8 = 1 << 1; pub(crate) const FLAG_IS_REVERSE: u8 = 1 << 2; +#[allow(dead_code)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) enum DataplaneEventKind { + PolicyDeny, + ScreenDrop, + FilterLog, +} + +#[allow(dead_code)] +impl DataplaneEventKind { + fn msg_type(self) -> u8 { + match self { + Self::PolicyDeny => MSG_POLICY_DENY, + Self::ScreenDrop => MSG_SCREEN_DROP, + Self::FilterLog => MSG_FILTER_LOG, + } + } + + fn from_msg_type(msg_type: u8) -> Option { + match msg_type { + MSG_POLICY_DENY => Some(Self::PolicyDeny), + MSG_SCREEN_DROP => Some(Self::ScreenDrop), + MSG_FILTER_LOG => Some(Self::FilterLog), + _ => None, + } + } + + fn rt_flow_event_type(self) -> u8 { + match self { + Self::PolicyDeny => RT_FLOW_EVENT_POLICY_DENY, + Self::ScreenDrop => RT_FLOW_EVENT_SCREEN_DROP, + Self::FilterLog => RT_FLOW_EVENT_FILTER_LOG, + } + } + + fn from_rt_flow_event_type(event_type: u8) -> Option { + match event_type { + RT_FLOW_EVENT_POLICY_DENY => Some(Self::PolicyDeny), + RT_FLOW_EVENT_SCREEN_DROP => Some(Self::ScreenDrop), + RT_FLOW_EVENT_FILTER_LOG => Some(Self::FilterLog), + _ => None, + } + } +} + +#[allow(dead_code)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) struct DataplaneEventPayload { + pub(crate) kind: DataplaneEventKind, + pub(crate) addr_family: u8, + pub(crate) protocol: u8, + pub(crate) action: u8, + pub(crate) src_ip: IpAddr, + pub(crate) dst_ip: IpAddr, + pub(crate) src_port: u16, + pub(crate) dst_port: u16, + pub(crate) nat_src_ip: Option, + pub(crate) nat_dst_ip: Option, + pub(crate) nat_src_port: u16, + pub(crate) nat_dst_port: u16, + pub(crate) ingress_zone_id: u16, + pub(crate) egress_zone_id: u16, + pub(crate) ingress_ifindex: i32, + pub(crate) policy_id: u32, + pub(crate) rule_id: u32, + pub(crate) term_id: u32, + pub(crate) reason: u8, + pub(crate) owner_rg_id: i16, + pub(crate) application_id: u16, + pub(crate) filter_id: u32, + pub(crate) screen_id: u32, + pub(crate) timestamp_ns: u64, +} + // --------------------------------------------------------------------------- // EventFrame -- zero-allocation stack-buffered wire frame // --------------------------------------------------------------------------- @@ -277,10 +371,84 @@ impl EventFrame { } } + /// Encode a fixed-size security telemetry event using the existing + /// RT_FLOW dataplane.Event wire shape consumed by the Go ringbuf parser. + /// + /// Payload layout (136 bytes) matches `pkg/dataplane.Event`: timestamp, + /// 16-byte src/dst IP slots, big-endian ports, little-endian identities + /// and zones, event/protocol/action/address-family bytes, NAT slots, and + /// extended ingress-ifindex/application fields. + #[allow(dead_code)] + pub(crate) fn encode_dataplane_event(seq: u64, event: &DataplaneEventPayload) -> Self { + let mut buf = [0u8; 256]; + let base = FRAME_HEADER_SIZE; + let wire_af = rt_flow_addr_family(event.addr_family, event.src_ip); + let policy_or_reason_id = match event.kind { + DataplaneEventKind::PolicyDeny => event.policy_id, + DataplaneEventKind::ScreenDrop => event.screen_id, + DataplaneEventKind::FilterLog => event.filter_id, + }; + + buf[base..base + 8].copy_from_slice(&event.timestamp_ns.to_le_bytes()); + write_ip_16(&mut buf, base + 8, event.src_ip); + write_ip_16(&mut buf, base + 24, event.dst_ip); + buf[base + 40..base + 42].copy_from_slice(&event.src_port.to_be_bytes()); + buf[base + 42..base + 44].copy_from_slice(&event.dst_port.to_be_bytes()); + buf[base + 44..base + 48].copy_from_slice(&policy_or_reason_id.to_le_bytes()); + buf[base + 48..base + 50].copy_from_slice(&event.ingress_zone_id.to_le_bytes()); + buf[base + 50..base + 52].copy_from_slice(&event.egress_zone_id.to_le_bytes()); + buf[base + 52] = event.kind.rt_flow_event_type(); + buf[base + 53] = event.protocol; + buf[base + 54] = event.action; + buf[base + 55] = wire_af; + buf[base + 56..base + 60].copy_from_slice(&event.rule_id.to_le_bytes()); + buf[base + 60..base + 64].copy_from_slice(&event.term_id.to_le_bytes()); + buf[base + 64..base + 66].copy_from_slice(&event.owner_rg_id.to_le_bytes()); + write_ip_opt_16(&mut buf, base + 72, event.nat_src_ip); + write_ip_opt_16(&mut buf, base + 88, event.nat_dst_ip); + buf[base + 104..base + 106].copy_from_slice(&event.nat_src_port.to_be_bytes()); + buf[base + 106..base + 108].copy_from_slice(&event.nat_dst_port.to_be_bytes()); + buf[base + 128..base + 132].copy_from_slice(&event.ingress_ifindex.to_le_bytes()); + buf[base + 132..base + 134].copy_from_slice(&event.application_id.to_le_bytes()); + buf[base + 134] = event.reason; + + write_header( + &mut buf, + SECURITY_EVENT_PAYLOAD_SIZE as u32, + event.kind.msg_type(), + seq, + ); + + EventFrame { + data: buf, + len: (FRAME_HEADER_SIZE + SECURITY_EVENT_PAYLOAD_SIZE) as u16, + seq, + } + } + /// The raw bytes of this frame (header + payload). pub(crate) fn as_bytes(&self) -> &[u8] { &self.data[..self.len as usize] } + + #[allow(dead_code)] + pub(crate) fn dataplane_event_payload(&self) -> Option<&[u8]> { + DataplaneEventKind::from_msg_type(self.data[4])?; + let payload_len = u32::from_le_bytes(self.data[0..4].try_into().ok()?) as usize; + if payload_len != SECURITY_EVENT_PAYLOAD_SIZE { + return None; + } + let end = FRAME_HEADER_SIZE + payload_len; + if (self.len as usize) < end { + return None; + } + Some(&self.data[FRAME_HEADER_SIZE..end]) + } + + #[allow(dead_code)] + pub(crate) fn decode_dataplane_event(&self) -> Option { + decode_dataplane_event(self.data[4], self.dataplane_event_payload()?) + } } // --------------------------------------------------------------------------- @@ -324,6 +492,114 @@ fn write_ip_opt(buf: &mut [u8; 256], pos: usize, ip: Option, is_v6: bool } } +#[allow(dead_code)] +fn write_ip_16(buf: &mut [u8; 256], pos: usize, ip: IpAddr) -> usize { + match ip { + IpAddr::V4(v4) => buf[pos..pos + 4].copy_from_slice(&v4.octets()), + IpAddr::V6(v6) => buf[pos..pos + 16].copy_from_slice(&v6.octets()), + } + pos + 16 +} + +#[allow(dead_code)] +fn write_ip_opt_16(buf: &mut [u8; 256], pos: usize, ip: Option) -> usize { + if let Some(addr) = ip { + write_ip_16(buf, pos, addr) + } else { + pos + 16 + } +} + +#[allow(dead_code)] +pub(crate) fn decode_dataplane_event( + msg_type: u8, + payload: &[u8], +) -> Option { + let frame_kind = DataplaneEventKind::from_msg_type(msg_type)?; + if payload.len() != SECURITY_EVENT_PAYLOAD_SIZE { + return None; + } + + let event_kind = DataplaneEventKind::from_rt_flow_event_type(payload[52])?; + if event_kind != frame_kind { + return None; + } + let wire_af = payload[55]; + if wire_af != RT_FLOW_AF_INET && wire_af != RT_FLOW_AF_INET6 { + return None; + } + let policy_or_reason_id = u32::from_le_bytes(payload[44..48].try_into().ok()?); + + Some(DataplaneEventPayload { + kind: event_kind, + addr_family: if wire_af == RT_FLOW_AF_INET6 { + libc::AF_INET6 as u8 + } else { + libc::AF_INET as u8 + }, + protocol: payload[53], + action: payload[54], + src_port: u16::from_be_bytes(payload[40..42].try_into().ok()?), + dst_port: u16::from_be_bytes(payload[42..44].try_into().ok()?), + nat_src_port: u16::from_be_bytes(payload[104..106].try_into().ok()?), + nat_dst_port: u16::from_be_bytes(payload[106..108].try_into().ok()?), + ingress_zone_id: u16::from_le_bytes(payload[48..50].try_into().ok()?), + egress_zone_id: u16::from_le_bytes(payload[50..52].try_into().ok()?), + ingress_ifindex: i32::from_le_bytes(payload[128..132].try_into().ok()?), + rule_id: u32::from_le_bytes(payload[56..60].try_into().ok()?), + term_id: u32::from_le_bytes(payload[60..64].try_into().ok()?), + owner_rg_id: i16::from_le_bytes(payload[64..66].try_into().ok()?), + policy_id: if event_kind == DataplaneEventKind::PolicyDeny { + policy_or_reason_id + } else { + 0 + }, + reason: payload[134], + application_id: u16::from_le_bytes(payload[132..134].try_into().ok()?), + filter_id: if event_kind == DataplaneEventKind::FilterLog { + policy_or_reason_id + } else { + 0 + }, + screen_id: if event_kind == DataplaneEventKind::ScreenDrop { + policy_or_reason_id + } else { + 0 + }, + timestamp_ns: u64::from_le_bytes(payload[0..8].try_into().ok()?), + src_ip: read_ip_16(&payload[8..24], wire_af)?, + dst_ip: read_ip_16(&payload[24..40], wire_af)?, + nat_src_ip: read_nonzero_ip_16(&payload[72..88], wire_af), + nat_dst_ip: read_nonzero_ip_16(&payload[88..104], wire_af), + }) +} + +#[allow(dead_code)] +fn read_ip_16(bytes: &[u8], wire_af: u8) -> Option { + match wire_af { + RT_FLOW_AF_INET => Some(IpAddr::from(<[u8; 4]>::try_from(&bytes[..4]).ok()?)), + RT_FLOW_AF_INET6 => Some(IpAddr::from(<[u8; 16]>::try_from(&bytes[..16]).ok()?)), + _ => None, + } +} + +#[allow(dead_code)] +fn read_nonzero_ip_16(bytes: &[u8], wire_af: u8) -> Option { + if bytes.iter().all(|b| *b == 0) { + return None; + } + read_ip_16(bytes, wire_af) +} + +#[allow(dead_code)] +fn rt_flow_addr_family(addr_family: u8, src_ip: IpAddr) -> u8 { + if addr_family == libc::AF_INET6 as u8 || matches!(src_ip, IpAddr::V6(_)) { + RT_FLOW_AF_INET6 + } else { + RT_FLOW_AF_INET + } +} + fn encode_disposition(d: ForwardingDisposition) -> u8 { match d { ForwardingDisposition::ForwardCandidate => DISP_FORWARD_CANDIDATE, @@ -359,4 +635,3 @@ pub(crate) fn close_flags(delta: &SessionDelta) -> u8 { #[cfg(test)] #[path = "codec_tests.rs"] mod tests; - diff --git a/userspace-dp/src/event_stream/codec_tests.rs b/userspace-dp/src/event_stream/codec_tests.rs index 74f4596ad..ad40375d1 100644 --- a/userspace-dp/src/event_stream/codec_tests.rs +++ b/userspace-dp/src/event_stream/codec_tests.rs @@ -74,6 +74,215 @@ fn test_metadata() -> SessionMetadata { } } +fn test_dataplane_event_v4(kind: DataplaneEventKind) -> DataplaneEventPayload { + DataplaneEventPayload { + kind, + addr_family: libc::AF_INET as u8, + protocol: 6, + action: if kind == DataplaneEventKind::FilterLog { + RT_FLOW_ACTION_PERMIT + } else { + RT_FLOW_ACTION_DENY + }, + src_ip: IpAddr::V4(Ipv4Addr::new(192, 0, 2, 10)), + dst_ip: IpAddr::V4(Ipv4Addr::new(198, 51, 100, 20)), + src_port: 49152, + dst_port: 443, + nat_src_ip: Some(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 30))), + nat_dst_ip: None, + nat_src_port: 40000, + nat_dst_port: 8443, + ingress_zone_id: 7, + egress_zone_id: 9, + ingress_ifindex: 42, + policy_id: 101, + rule_id: 202, + term_id: 505, + reason: 5, + owner_rg_id: 2, + application_id: 303, + filter_id: 404, + screen_id: 606, + timestamp_ns: 123_456_789, + } +} + +fn test_dataplane_event_v6(kind: DataplaneEventKind) -> DataplaneEventPayload { + DataplaneEventPayload { + kind, + addr_family: libc::AF_INET6 as u8, + protocol: 17, + action: if kind == DataplaneEventKind::FilterLog { + RT_FLOW_ACTION_PERMIT + } else { + RT_FLOW_ACTION_DENY + }, + src_ip: IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 1, 2, 3, 4, 5, 6)), + dst_ip: IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 6, 5, 4, 3, 2, 1)), + src_port: 5353, + dst_port: 53, + nat_src_ip: None, + nat_dst_ip: None, + nat_src_port: 0, + nat_dst_port: 0, + ingress_zone_id: 11, + egress_zone_id: 12, + ingress_ifindex: 77, + policy_id: 0, + rule_id: 0, + term_id: 3030, + reason: 4, + owner_rg_id: 1, + application_id: 0, + filter_id: 909, + screen_id: 1102, + timestamp_ns: 987_654_321, + } +} + +fn assert_dataplane_event_round_trip(event: DataplaneEventPayload, msg_type: u8) { + let frame = EventFrame::encode_dataplane_event(321, &event); + + assert_eq!(frame.data[4], msg_type); + assert_eq!(frame.seq, 321); + assert_eq!( + u32::from_le_bytes(frame.data[0..4].try_into().unwrap()), + SECURITY_EVENT_PAYLOAD_SIZE as u32 + ); + assert_eq!( + frame.len as usize, + FRAME_HEADER_SIZE + SECURITY_EVENT_PAYLOAD_SIZE + ); + + let payload = frame + .dataplane_event_payload() + .expect("security event payload"); + assert_eq!(payload.len(), SECURITY_EVENT_PAYLOAD_SIZE); + assert_eq!( + payload[55], + if event.addr_family == libc::AF_INET6 as u8 { + RT_FLOW_AF_INET6 + } else { + RT_FLOW_AF_INET + } + ); + assert_eq!( + u16::from_be_bytes(payload[40..42].try_into().unwrap()), + event.src_port + ); + assert_eq!( + u16::from_be_bytes(payload[42..44].try_into().unwrap()), + event.dst_port + ); + assert_eq!(payload[52], event.kind.rt_flow_event_type()); + assert_eq!(payload[54], event.action); + assert_eq!( + u32::from_le_bytes(payload[56..60].try_into().unwrap()), + event.rule_id + ); + assert_eq!( + u32::from_le_bytes(payload[60..64].try_into().unwrap()), + event.term_id + ); + assert_eq!( + i16::from_le_bytes(payload[64..66].try_into().unwrap()), + event.owner_rg_id + ); + assert_eq!(payload[134], event.reason); + let decoded = decode_dataplane_event(msg_type, payload).expect("decoded security event"); + assert_eq!(decoded.kind, event.kind); + assert_eq!(decoded.addr_family, event.addr_family); + assert_eq!(decoded.protocol, event.protocol); + assert_eq!(decoded.action, event.action); + assert_eq!(decoded.src_ip, event.src_ip); + assert_eq!(decoded.dst_ip, event.dst_ip); + assert_eq!(decoded.src_port, event.src_port); + assert_eq!(decoded.dst_port, event.dst_port); + assert_eq!(decoded.nat_src_ip, event.nat_src_ip); + assert_eq!(decoded.nat_dst_ip, event.nat_dst_ip); + assert_eq!(decoded.nat_src_port, event.nat_src_port); + assert_eq!(decoded.nat_dst_port, event.nat_dst_port); + assert_eq!(decoded.ingress_zone_id, event.ingress_zone_id); + assert_eq!(decoded.egress_zone_id, event.egress_zone_id); + assert_eq!(decoded.ingress_ifindex, event.ingress_ifindex); + assert_eq!(decoded.rule_id, event.rule_id); + assert_eq!(decoded.term_id, event.term_id); + assert_eq!(decoded.reason, event.reason); + assert_eq!(decoded.owner_rg_id, event.owner_rg_id); + assert_eq!(decoded.application_id, event.application_id); + assert_eq!(decoded.timestamp_ns, event.timestamp_ns); + match event.kind { + DataplaneEventKind::PolicyDeny => assert_eq!(decoded.policy_id, event.policy_id), + DataplaneEventKind::ScreenDrop => assert_eq!(decoded.screen_id, event.screen_id), + DataplaneEventKind::FilterLog => assert_eq!(decoded.filter_id, event.filter_id), + } + assert_eq!( + frame + .decode_dataplane_event() + .expect("decoded security event frame"), + decoded + ); +} + +#[test] +fn test_event_frame_type_values_are_stable() { + assert_eq!(MSG_SESSION_OPEN, 1); + assert_eq!(MSG_SESSION_CLOSE, 2); + assert_eq!(MSG_SESSION_UPDATE, 3); + assert_eq!(MSG_ACK, 4); + assert_eq!(MSG_PAUSE, 5); + assert_eq!(MSG_RESUME, 6); + assert_eq!(MSG_DRAIN_REQUEST, 7); + assert_eq!(MSG_DRAIN_COMPLETE, 8); + assert_eq!(MSG_FULL_RESYNC, 9); + assert_eq!(MSG_KEEPALIVE, 10); + assert_eq!(MSG_POLICY_DENY, 11); + assert_eq!(MSG_SCREEN_DROP, 12); + assert_eq!(MSG_FILTER_LOG, 13); +} + +#[test] +fn test_policy_deny_dataplane_event_round_trip() { + assert_dataplane_event_round_trip( + test_dataplane_event_v4(DataplaneEventKind::PolicyDeny), + MSG_POLICY_DENY, + ); +} + +#[test] +fn test_screen_drop_dataplane_event_round_trip() { + assert_dataplane_event_round_trip( + test_dataplane_event_v6(DataplaneEventKind::ScreenDrop), + MSG_SCREEN_DROP, + ); +} + +#[test] +fn test_filter_log_dataplane_event_round_trip() { + assert_dataplane_event_round_trip( + test_dataplane_event_v4(DataplaneEventKind::FilterLog), + MSG_FILTER_LOG, + ); +} + +#[test] +fn test_filter_log_can_encode_discard_action() { + let mut event = test_dataplane_event_v4(DataplaneEventKind::FilterLog); + event.action = RT_FLOW_ACTION_DENY; + let frame = EventFrame::encode_dataplane_event(321, &event); + let payload = frame + .dataplane_event_payload() + .expect("security event payload"); + assert_eq!(payload[54], RT_FLOW_ACTION_DENY); + assert_eq!( + frame + .decode_dataplane_event() + .expect("decoded security event frame") + .action, + RT_FLOW_ACTION_DENY + ); +} + #[test] fn test_encode_session_open_v4() { let zones = test_zone_map(); @@ -154,8 +363,8 @@ fn test_encode_session_close_v4() { assert_eq!(p[1], 6); // Protocol assert_eq!(u16::from_le_bytes([p[2], p[3]]), 12345); // SrcPort assert_eq!(u16::from_le_bytes([p[4], p[5]]), 80); // DstPort - // p[6..10] SrcIP, p[10..14] DstIP - // p[14..16] OwnerRGID + // p[6..10] SrcIP, p[10..14] DstIP + // p[14..16] OwnerRGID assert_eq!(i16::from_le_bytes([p[14], p[15]]), 1); // p[16] Flags assert_eq!(p[16], FLAG_FABRIC_REDIRECT); diff --git a/userspace-dp/src/event_stream/mod.rs b/userspace-dp/src/event_stream/mod.rs index 1ca058b5a..91ff3f503 100644 --- a/userspace-dp/src/event_stream/mod.rs +++ b/userspace-dp/src/event_stream/mod.rs @@ -11,7 +11,7 @@ pub(crate) mod codec; -pub(crate) use codec::{EventFrame, close_flags}; +pub(crate) use codec::{close_flags, EventFrame}; use crate::session::{SessionDelta, SessionDeltaKind}; use codec::{FRAME_HEADER_SIZE, MSG_ACK, MSG_DRAIN_REQUEST, MSG_KEEPALIVE, MSG_PAUSE, MSG_RESUME}; @@ -19,9 +19,9 @@ use rustc_hash::FxHashMap; use std::collections::VecDeque; use std::io; use std::os::unix::net::UnixStream; -use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::mpsc::{self, SyncSender, TryRecvError}; +use std::sync::Arc; use std::thread::{self, JoinHandle}; use std::time::{Duration, Instant}; @@ -352,13 +352,13 @@ fn replay_buffered( acked_seq: u64, shared: &Arc, ) -> io::Result<()> { - // Check if replay buffer covers what we need. - // Only send FullResync if the daemon previously acked real events - // (acked_seq > 0) AND our replay buffer has a gap (can't replay - // from acked+1). On fresh start with no events, just start clean. + // Check if replay buffer covers what we need. On a true fresh start + // (acked_seq == 0 and no buffered frames), start clean. Otherwise any gap + // at acked+1 requires FullResync, including the acked_seq==0 case where an + // overrun replay buffer has already trimmed seq 1. let oldest_buffered = replay_buf.front().map(|f| f.seq).unwrap_or(0); - let has_gap = replay_buf.is_empty() || oldest_buffered > acked_seq + 1; - if acked_seq > 0 && has_gap { + let has_gap = (replay_buf.is_empty() && acked_seq > 0) || oldest_buffered > acked_seq + 1; + if has_gap { let seq = shared.next_seq.fetch_add(1, Ordering::Relaxed) + 1; let frame = EventFrame::encode_full_resync(seq); write_frame_blocking(stream, &frame)?; @@ -367,7 +367,9 @@ fn replay_buffered( "xpf-event-stream: sent FullResync (buffer gap: acked={}, oldest_buffered={})", acked_seq, oldest_buffered ); - replay_buf.clear(); + // Keep the stale replay window until the daemon ACKs the FullResync. + // Clearing here can make an acked_seq=0 reconnect look like a clean + // fresh start and permanently suppress the required bulk export. return Ok(()); } diff --git a/userspace-dp/src/event_stream/tests.rs b/userspace-dp/src/event_stream/tests.rs index b937c59ac..c5a6db943 100644 --- a/userspace-dp/src/event_stream/tests.rs +++ b/userspace-dp/src/event_stream/tests.rs @@ -3,7 +3,9 @@ // LOC threshold. Loaded as a sibling submodule via // `#[path = "tests.rs"]` from mod.rs. +use super::codec::MSG_FULL_RESYNC; use super::*; +use std::io::Read; fn build_raw_ack_frame(seq: u64) -> [u8; FRAME_HEADER_SIZE] { let mut buf = [0u8; FRAME_HEADER_SIZE]; @@ -68,6 +70,35 @@ fn test_replay_buffer_trim() { assert_eq!(replay_buf.front().unwrap().seq, 6); } +#[test] +fn test_replay_gap_at_zero_ack_sends_full_resync() { + let (mut daemon_side, helper_side) = std::os::unix::net::UnixStream::pair().unwrap(); + daemon_side + .set_read_timeout(Some(Duration::from_secs(1))) + .unwrap(); + let shared = Arc::new(EventStreamShared::new()); + let mut replay_buf: VecDeque = VecDeque::new(); + + // Simulate a replay buffer overrun before the daemon ever ACKed anything: + // seq 1 has been trimmed, so replaying seq 2.. would silently lose the + // first audit/session event unless the helper requests FullResync. + for seq in 2..=REPLAY_BUFFER_CAPACITY as u64 + 1 { + replay_buf.push_back(EventFrame::encode_drain_complete(seq)); + } + + replay_buffered(&helper_side, &mut replay_buf, 0, &shared).expect("replay gap"); + + let mut hdr = [0u8; FRAME_HEADER_SIZE]; + daemon_side.read_exact(&mut hdr).expect("full resync frame"); + assert_eq!(hdr[4], MSG_FULL_RESYNC); + assert_eq!(shared.frames_sent.load(Ordering::Relaxed), 1); + assert_eq!( + replay_buf.front().map(|f| f.seq), + Some(2), + "full resync keeps stale replay window until the daemon ACKs" + ); +} + #[test] fn test_channel_backpressure() { let (tx, _rx) = mpsc::sync_channel::(2);