Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions _Log.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## 2026-05-17

- **Timestamp**: 2026-05-17T08:30:20Z
- **Action**: PR #1394 round-10 follow-up — fixed standalone userspace event-stream callback wiring by always registering session/full-resync callbacks, and added a regression test that verifies standalone SessionOpen and FullResync frames are ACKed instead of stalling behind an unwired callback queue.
- **File(s)**: `pkg/daemon/daemon_ha_userspace.go`, `pkg/daemon/userspace_sync_test.go`, `_Log.md`
- **Validation**: `gofmt -w pkg/daemon/daemon_ha_userspace.go pkg/daemon/userspace_sync_test.go`; `go test ./pkg/daemon ./pkg/dataplane/userspace ./pkg/logging`; `git diff --check`

- **Timestamp**: 2026-05-17T05:06:00Z
- **Action**: PR #1395 cleanup — reverted an unintended `go.mod` direct/indirect dependency classification change introduced by automated tooling so the round-4 fix stays scoped to three-color policer compiler logic/tests/docs.
- **File(s)**: `go.mod`, `_Log.md`
Expand All @@ -14,6 +19,21 @@
- **Action**: PR #1395 round-4 follow-up — fixed three-color policer compiler handling for duplicate same-mode sibling blocks by iterating all `single-rate`/`two-rate` children, added hierarchical ambiguity regression coverage in parser/configstore tests, and updated filter module docs to reflect same-mode sibling merge semantics.
- **File(s)**: `pkg/config/compiler_firewall.go`, `pkg/config/parser_ast_test.go`, `pkg/configstore/store_test.go`, `userspace-dp/src/filter/README.md`, `_Log.md`

- **Timestamp**: 2026-05-17T04:58:00Z
- **Action**: PR #1379 round-4 blocker fix follow-up — removed HA startup ordering race by starting cluster comms only after event fanout wiring, made userspace event-stream callback registration concurrency-safe for post-start wiring, and exposed helper producer sent/dropped counters through status text and Prometheus.
- **File(s)**: `pkg/daemon/daemon_run.go`, `pkg/dataplane/userspace/eventstream.go`, `pkg/dataplane/userspace/eventstream_test.go`, `pkg/dataplane/userspace/protocol.go`, `pkg/dataplane/userspace/statusfmt.go`, `pkg/api/metrics.go`, `pkg/api/metrics_test.go`, `_Log.md`
- **Validation**: `gofmt -w pkg/daemon/daemon_run.go pkg/dataplane/userspace/eventstream.go pkg/dataplane/userspace/eventstream_test.go pkg/dataplane/userspace/protocol.go pkg/dataplane/userspace/statusfmt.go pkg/api/metrics.go pkg/api/metrics_test.go`; `go test ./pkg/dataplane/userspace ./pkg/logging ./pkg/api ./pkg/daemon`; `git diff --check`

- **Timestamp**: 2026-05-17T05:06:00Z
- **Action**: Addressed automated review nit on the new event-stream lifecycle regression test by replacing fixed sleep + wall-clock polling with timeout contexts and retry loops for listener readiness/connection checks.
- **File(s)**: `pkg/dataplane/userspace/eventstream_test.go`, `_Log.md`
- **Validation**: `gofmt -w pkg/dataplane/userspace/eventstream_test.go`; `go test ./pkg/dataplane/userspace ./pkg/logging ./pkg/api ./pkg/daemon`; `git diff --check`

- **Timestamp**: 2026-05-17T05:13:00Z
- **Action**: Completed the same timeout-context synchronization pattern in `TestEventStreamDataplaneEventCallbackCanBeSetAfterStart` so the new lifecycle regression test no longer relies on fixed sleeps or wall-clock deadline polling.
- **File(s)**: `pkg/dataplane/userspace/eventstream_test.go`, `_Log.md`
- **Validation**: `gofmt -w pkg/dataplane/userspace/eventstream_test.go`; `go test ./pkg/dataplane/userspace ./pkg/logging ./pkg/api ./pkg/daemon`; `git diff --check`

- **Timestamp**: 2026-05-17T01:12:00Z
- **Action**: PR #1391 post-smoke follow-up — live q10(24G)+q0(best-effort) contention on `7e7eb07e` showed serviceable-only exact suppression still let q0 drain ~15.6 GB of surplus while exact was backlogged. Reworked the gate from binary serviceability to residual-rate budgeting: non-exact surplus can consume only `root_rate - backlogged_exact_guarantee_rates`, shared exact queues publish queue masks so one queue's reservation is counted once across workers, and shared interfaces use an interface-global residual token bucket rather than per-worker residual buckets.
- **File(s)**: `userspace-dp/src/afxdp/cos/queue_service/mod.rs`, `userspace-dp/src/afxdp/cos/queue_service/tests.rs`, `userspace-dp/src/afxdp/cos/tx_completion.rs`, `userspace-dp/src/afxdp/types/shared_cos_lease.rs`, `userspace-dp/src/afxdp/types/cos.rs`, `userspace-dp/src/afxdp/cos/builders.rs`, `userspace-dp/src/afxdp/worker/cos_tests.rs`, `userspace-dp/src/afxdp/cos/README.md`, `userspace-dp/src/afxdp/types/README.md`, `_Log.md`
Expand Down
6 changes: 5 additions & 1 deletion docs/session-sync-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,11 @@ On disconnect, the helper retains its replay buffer (bounded, ~4096 events per
binding). On reconnect, it replays from the last acked sequence. If the buffer
has been trimmed past the last acked sequence (long disconnect), it sends a
special `FullResync` frame (type 9) that tells the daemon to treat this as a
fresh start and request a bulk export.
fresh start and request a bulk export. The helper retains the stale replay
window until the daemon ACKs the `FullResync`; otherwise an unacked resync could
be lost across a second reconnect. HA backup nodes ACK and ignore session
events because they are permanent non-owners, while transient primary readiness
gaps withhold ACK for replay.

### Integration with Existing Code

Expand Down
95 changes: 95 additions & 0 deletions pkg/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@ type xpfCollector struct {
workerIdleLoops *prometheus.Desc
workerCoSQueueLeaseAcquireV8Calls *prometheus.Desc
workerCoSQueueLeaseAcquireV8GrantedBytes *prometheus.Desc
// #1379: daemon-side userspace event-stream transport counters.
userspaceEventStreamFramesTotal *prometheus.Desc
userspaceEventStreamProducerFramesTotal *prometheus.Desc
userspaceEventStreamDecodeErrorsTotal *prometheus.Desc
userspaceEventStreamSequenceGapsTotal *prometheus.Desc
userspaceEventStreamDataplaneEventsTotal *prometheus.Desc
userspaceEventStreamDataplaneDropsTotal *prometheus.Desc
userspaceEventStreamUnknownDropsTotal *prometheus.Desc
// #925 Phase 2: liveness gauge for the supervisor's catch_unwind
// state. 1 = worker has panicked and the supervisor has caught it;
// 0 = healthy. Set-only in Phase 1 (cleared by daemon restart).
Expand Down Expand Up @@ -484,6 +492,41 @@ func newCollector(srv *Server) *xpfCollector {
"Bytes granted by v8 CoS queue-lease acquire calls for this worker (#1240).",
[]string{"worker_id"}, nil,
),
userspaceEventStreamFramesTotal: prometheus.NewDesc(
"xpf_userspace_event_stream_frames_total",
"Daemon-side userspace event-stream frames by direction.",
[]string{"direction"}, nil,
),
userspaceEventStreamProducerFramesTotal: prometheus.NewDesc(
"xpf_userspace_event_stream_producer_frames_total",
"Userspace helper event-stream producer counters by outcome.",
[]string{"outcome"}, nil,
),
userspaceEventStreamDecodeErrorsTotal: prometheus.NewDesc(
"xpf_userspace_event_stream_decode_errors_total",
"Daemon-side userspace event-stream decode errors.",
nil, nil,
),
userspaceEventStreamSequenceGapsTotal: prometheus.NewDesc(
"xpf_userspace_event_stream_sequence_gaps_total",
"Daemon-side userspace event-stream sequence gaps.",
nil, nil,
),
userspaceEventStreamDataplaneEventsTotal: prometheus.NewDesc(
"xpf_userspace_event_stream_dataplane_events_total",
"Decoded RT_FLOW dataplane events received over the userspace event stream.",
[]string{"type"}, nil,
),
userspaceEventStreamDataplaneDropsTotal: prometheus.NewDesc(
"xpf_userspace_event_stream_dataplane_event_drops_total",
"RT_FLOW dataplane events dropped by the userspace event-stream decoder.",
[]string{"type"}, nil,
),
userspaceEventStreamUnknownDropsTotal: prometheus.NewDesc(
"xpf_userspace_event_stream_unknown_frame_drops_total",
"Userspace event-stream frames dropped because their frame type is unknown.",
nil, nil,
),
workerDead: prometheus.NewDesc(
"xpf_userspace_worker_dead",
"1 if the userspace-dp worker thread has panicked and been "+
Expand Down Expand Up @@ -703,6 +746,13 @@ func (c *xpfCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.workerIdleLoops
ch <- c.workerCoSQueueLeaseAcquireV8Calls
ch <- c.workerCoSQueueLeaseAcquireV8GrantedBytes
ch <- c.userspaceEventStreamFramesTotal
ch <- c.userspaceEventStreamProducerFramesTotal
ch <- c.userspaceEventStreamDecodeErrorsTotal
ch <- c.userspaceEventStreamSequenceGapsTotal
ch <- c.userspaceEventStreamDataplaneEventsTotal
ch <- c.userspaceEventStreamDataplaneDropsTotal
ch <- c.userspaceEventStreamUnknownDropsTotal
ch <- c.workerDead
ch <- c.bindingActiveFlowCount
ch <- c.bindingTXCompletions
Expand Down Expand Up @@ -771,6 +821,7 @@ func (c *xpfCollector) collectUserspaceStatus(ch chan<- prometheus.Metric, dp da
c.emitCoSDrainPhaseTelemetry(ch, status)
c.emitCoSEqualFlowEnforcement(ch, status)
c.emitWorkerRuntime(ch, status)
c.emitUserspaceEventStream(ch, status)
c.emitBindingActiveFlowCount(ch, status)
c.emitBindingTXCompletionTelemetry(ch, status)
c.emitCoSActiveFlowCount(ch, status)
Expand Down Expand Up @@ -1131,6 +1182,50 @@ func (c *xpfCollector) emitWorkerRuntime(ch chan<- prometheus.Metric, status dpu
}
}

func (c *xpfCollector) emitUserspaceEventStream(ch chan<- prometheus.Metric, status dpuserspace.ProcessStatus) {
if status.EventStream == nil {
return
}
es := status.EventStream
ch <- prometheus.MustNewConstMetric(c.userspaceEventStreamFramesTotal,
prometheus.CounterValue, float64(es.FramesRead), "read")
ch <- prometheus.MustNewConstMetric(c.userspaceEventStreamFramesTotal,
prometheus.CounterValue, float64(es.FramesWritten), "written")
ch <- prometheus.MustNewConstMetric(c.userspaceEventStreamProducerFramesTotal,
prometheus.CounterValue, float64(status.EventStreamSent), "sent")
ch <- prometheus.MustNewConstMetric(c.userspaceEventStreamProducerFramesTotal,
prometheus.CounterValue, float64(status.EventStreamDropped), "dropped")
ch <- prometheus.MustNewConstMetric(c.userspaceEventStreamDecodeErrorsTotal,
prometheus.CounterValue, float64(es.DecodeErrors))
ch <- prometheus.MustNewConstMetric(c.userspaceEventStreamSequenceGapsTotal,
prometheus.CounterValue, float64(es.SeqGaps))

for _, item := range []struct {
label string
count uint64
}{
{"policy_deny", es.PolicyDenyEvents},
{"screen_drop", es.ScreenDropEvents},
{"filter_log", es.FilterLogEvents},
} {
ch <- prometheus.MustNewConstMetric(c.userspaceEventStreamDataplaneEventsTotal,
prometheus.CounterValue, float64(item.count), item.label)
}
for _, item := range []struct {
label string
count uint64
}{
{"policy_deny", es.PolicyDenyDrops},
{"screen_drop", es.ScreenDropDrops},
{"filter_log", es.FilterLogDrops},
} {
ch <- prometheus.MustNewConstMetric(c.userspaceEventStreamDataplaneDropsTotal,
prometheus.CounterValue, float64(item.count), item.label)
}
ch <- prometheus.MustNewConstMetric(c.userspaceEventStreamUnknownDropsTotal,
prometheus.CounterValue, float64(es.UnknownFrameDrops))
}

// #709: export owner-profile telemetry when the dataplane is the
// userspace-dp helper. The eBPF-only build path doesn't have this
// shape (it has no CoS scheduler), so we type-assert on the optional
Expand Down
57 changes: 57 additions & 0 deletions pkg/api/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,63 @@ func collectFromEmitWorkerRuntime(
return got
}

func TestEmitUserspaceEventStreamMetrics(t *testing.T) {
mkNoLabel := func(name string) *prometheus.Desc {
return prometheus.NewDesc(name, name, nil, nil)
}
mkOneLabel := func(name, label string) *prometheus.Desc {
return prometheus.NewDesc(name, name, []string{label}, nil)
}
c := &xpfCollector{
userspaceEventStreamFramesTotal: mkOneLabel("xpf_userspace_event_stream_frames_total", "direction"),
userspaceEventStreamProducerFramesTotal: mkOneLabel("xpf_userspace_event_stream_producer_frames_total", "outcome"),
userspaceEventStreamDecodeErrorsTotal: mkNoLabel("xpf_userspace_event_stream_decode_errors_total"),
userspaceEventStreamSequenceGapsTotal: mkNoLabel("xpf_userspace_event_stream_sequence_gaps_total"),
userspaceEventStreamDataplaneEventsTotal: mkOneLabel("xpf_userspace_event_stream_dataplane_events_total", "type"),
userspaceEventStreamDataplaneDropsTotal: mkOneLabel("xpf_userspace_event_stream_dataplane_event_drops_total", "type"),
userspaceEventStreamUnknownDropsTotal: mkNoLabel("xpf_userspace_event_stream_unknown_frame_drops_total"),
}
status := dpuserspace.ProcessStatus{
EventStreamSent: 101,
EventStreamDropped: 7,
EventStream: &dpuserspace.EventStreamStatus{
FramesRead: 11,
FramesWritten: 7,
DecodeErrors: 2,
SeqGaps: 3,
PolicyDenyEvents: 5,
ScreenDropEvents: 6,
FilterLogEvents: 8,
PolicyDenyDrops: 1,
ScreenDropDrops: 4,
FilterLogDrops: 9,
UnknownFrameDrops: 10,
},
}
ch := make(chan prometheus.Metric)
go func() {
c.emitUserspaceEventStream(ch, status)
close(ch)
}()
var got []prometheus.Metric
for m := range ch {
got = append(got, m)
}
assertCounterClose(t, got, c.userspaceEventStreamFramesTotal, map[string]string{"direction": "read"}, 11)
assertCounterClose(t, got, c.userspaceEventStreamFramesTotal, map[string]string{"direction": "written"}, 7)
assertCounterClose(t, got, c.userspaceEventStreamProducerFramesTotal, map[string]string{"outcome": "sent"}, 101)
assertCounterClose(t, got, c.userspaceEventStreamProducerFramesTotal, map[string]string{"outcome": "dropped"}, 7)
assertCounterClose(t, got, c.userspaceEventStreamDecodeErrorsTotal, nil, 2)
assertCounterClose(t, got, c.userspaceEventStreamSequenceGapsTotal, nil, 3)
assertCounterClose(t, got, c.userspaceEventStreamDataplaneEventsTotal, map[string]string{"type": "policy_deny"}, 5)
assertCounterClose(t, got, c.userspaceEventStreamDataplaneEventsTotal, map[string]string{"type": "screen_drop"}, 6)
assertCounterClose(t, got, c.userspaceEventStreamDataplaneEventsTotal, map[string]string{"type": "filter_log"}, 8)
assertCounterClose(t, got, c.userspaceEventStreamDataplaneDropsTotal, map[string]string{"type": "policy_deny"}, 1)
assertCounterClose(t, got, c.userspaceEventStreamDataplaneDropsTotal, map[string]string{"type": "screen_drop"}, 4)
assertCounterClose(t, got, c.userspaceEventStreamDataplaneDropsTotal, map[string]string{"type": "filter_log"}, 9)
assertCounterClose(t, got, c.userspaceEventStreamUnknownDropsTotal, nil, 10)
}

func metricValuesByWorker(
t *testing.T,
metrics []prometheus.Metric,
Expand Down
1 change: 1 addition & 0 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type Daemon struct {
syncReadyTimeout time.Duration
slogHandler *logging.SyslogSlogHandler
traceWriter *logging.TraceWriter
eventBuf *logging.EventBuffer
eventReader *logging.EventReader
eventEngine *eventengine.Engine
aggregator *logging.SessionAggregator
Expand Down
20 changes: 19 additions & 1 deletion pkg/daemon/daemon_ha_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,20 @@ func (d *Daemon) startClusterComms(ctx context.Context) {
}

d.sessionSync.SetVRFDevice(vrfDevice)
var streamProvider userspaceEventStreamProvider
streamCallbacksWired := false
if d.dp != nil {
if provider, ok := d.dp.(userspaceEventStreamProvider); ok {
streamProvider = provider
wireCtx, cancel := context.WithTimeout(commsCtx, 5*time.Second)
streamCallbacksWired = d.wireUserspaceEventStreamCallbacks(wireCtx, provider)
cancel()
if !streamCallbacksWired {
slog.Warn("userspace: event stream callbacks not ready before session sync start; falling back to polling until stream wires")
}
}
}

// Retry sync start: the VRF device and address binding may not
// be ready during daemon startup (networkd race).
for i := 0; i < 30; i++ {
Expand Down Expand Up @@ -709,7 +723,11 @@ func (d *Daemon) startClusterComms(ctx context.Context) {
return d.cluster != nil && d.cluster.IsLocalPrimary(rgID)
}
d.sessionSync.StartSyncSweep(commsCtx)
go d.runUserspaceEventStream(commsCtx)
if streamCallbacksWired {
go d.eventStreamFallbackLoop(commsCtx, streamProvider)
} else {
go d.runUserspaceEventStream(commsCtx)
}
}

break
Expand Down
Loading