userspace-dp: add dataplane event codec foundation#1394
Conversation
There was a problem hiding this comment.
Pull request overview
Foundation slice for the dataplane event stream codec that adds wire-format support for PolicyDeny/ScreenDrop/FilterLog telemetry frames in userspace-dp, in preparation for retiring the eBPF dataplane (#1373) without losing per-flow security/syslog visibility (#1379). Producer wiring, rate limiting, loss accounting, and the Go-side EventSource adapter are explicitly deferred.
Changes:
- Adds stable frame type constants
MSG_POLICY_DENY=11,MSG_SCREEN_DROP=12,MSG_FILTER_LOG=13(no renumbering of existing frames). - Introduces a 120-byte fixed-size
DataplaneEventPayloadwith encode/decode helpers covering tuple, NAT tuple, zones, ifindex, identity (policy/rule/app/filter/term/screen), reason, and timestamp. - Adds golden type-value test and round-trip tests for the three new frame kinds (V4 with NAT src; V6 plain).
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| userspace-dp/src/event_stream/codec.rs | New event kind enum, payload struct, fixed-size encoder/decoder, and IP helpers behind #[allow(dead_code)]. |
| userspace-dp/src/event_stream/codec_tests.rs | Golden frame-type assertions and three round-trip tests; minor comment reflow on existing close-frame test. |
| nat_src_ip: Some(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 30))), | ||
| nat_dst_ip: None, | ||
| nat_src_port: 40000, | ||
| nat_dst_port: 8443, |
| fn assert_dataplane_event_round_trip(event: DataplaneEventPayload, msg_type: u8) { | ||
| let frame = EventFrame::encode_dataplane_event(321, &event); | ||
|
|
||
| assert_eq!(frame.data[4], msg_type); | ||
| assert_eq!(frame.seq, 321); | ||
| assert_eq!( | ||
| u32::from_le_bytes(frame.data[0..4].try_into().unwrap()), | ||
| SECURITY_EVENT_PAYLOAD_SIZE as u32 | ||
| ); | ||
| assert_eq!( | ||
| frame.len as usize, | ||
| FRAME_HEADER_SIZE + SECURITY_EVENT_PAYLOAD_SIZE | ||
| ); | ||
|
|
||
| let payload = frame | ||
| .dataplane_event_payload() | ||
| .expect("security event payload"); | ||
| assert_eq!(payload.len(), SECURITY_EVENT_PAYLOAD_SIZE); | ||
| assert_eq!( | ||
| decode_dataplane_event(msg_type, payload).expect("decoded security event"), | ||
| event | ||
| ); | ||
| assert_eq!( | ||
| frame | ||
| .decode_dataplane_event() | ||
| .expect("decoded security event frame"), | ||
| event | ||
| ); | ||
| } |
| fn write_ip_16(buf: &mut [u8; 256], pos: usize, ip: IpAddr) -> usize { | ||
| match ip { | ||
| IpAddr::V4(v4) => buf[pos..pos + 4].copy_from_slice(&v4.octets()), | ||
| IpAddr::V6(v6) => buf[pos..pos + 16].copy_from_slice(&v6.octets()), | ||
| } | ||
| pos + 16 | ||
| } |
| if addr_family == libc::AF_INET6 as u8 || matches!(src_ip, IpAddr::V6(_)) { | ||
| 6 | ||
| } else { | ||
| 4 | ||
| } |
| #[allow(dead_code)] | ||
| pub(crate) fn dataplane_event_payload(&self) -> Option<&[u8]> { | ||
| DataplaneEventKind::from_msg_type(self.data[4])?; | ||
| let payload_len = u32::from_le_bytes(self.data[0..4].try_into().ok()?) as usize; | ||
| if payload_len != SECURITY_EVENT_PAYLOAD_SIZE { | ||
| return None; | ||
| } | ||
| let end = FRAME_HEADER_SIZE + payload_len; | ||
| if (self.len as usize) < end { | ||
| return None; | ||
| } | ||
| Some(&self.data[FRAME_HEADER_SIZE..end]) | ||
| } |
Claude round-1 review on
|
Claude round-1 self-correction on
|
Round-1 review consolidated on
|
| Reviewer | Verdict |
|---|---|
| Claude | MERGE-READY → MERGE-NEEDS-MAJOR (self-corrected) |
| Codex | MERGE-NEEDS-MAJOR |
| Copilot | (no review yet on this HEAD) |
Codex MAJORs
- Type ID 11-13 added in Rust, Go daemon only knows 1-10. Unknown frames skipped without ACK; replay loop on reconnect. (codec.rs:29, eventstream.go:265)
- Per-event-type loss accounting missing. Only aggregate
frames_dropped. Required by #1379 refined contract ("event-loss accounting per event-type"). - Wire format not RT_FLOW shaped. AF
4/6vs eBPF's2/10; LE ports vs BE; different offsets; missing action/session-id/name fields. Required by Feature gap: dataplane events (PolicyDeny, ScreenDrop, FilterLog) not emitted by userspace-dp #1379 ("identity stability — same RT_FLOW-shaped fields"). xpf_common.h:521 is the contract.
Codex MINOR
- Decoder rejects non-120-byte payloads but no test pins the contract.
Self-correction
Same pattern as #1395/#1396: I labeled "foundation only" without verifying compatibility with the consumer side (Go daemon ACK loop) and the existing RT_FLOW format. Codex correctly framed: "codec sketch, not merge-ready implementation of #1379's contract."
Recommendation
Block on:
- Type ID coordination — Go daemon must register types 11-13 before Rust emits them, OR move new event types into the 1-10 subtype space.
- Per-event-type drop counters at the encode site.
- Wire format compatibility with RT_FLOW (AF encoding, port endianness, field offsets, action/session-id/name fields) OR commit to an adapter plan.
Strongly consider: truncation/malformed-payload test.
Codex task: task-mp955ays-vo2nst. Not merging — author's decision.
|
Round-2 update pushed in 71e0f8a. Closed the review blockers by adding the daemon-side half of the event codec:
Validation:
|
Claude round-2 review on
|
Claude round-2 self-correction on
|
Round-2 quad-review consolidated synthesis on
|
| Reviewer | Verdict |
|---|---|
| Claude | MERGE-READY-pending → MERGE-NEEDS-MAJOR (self-corrected) |
| Codex | MERGE-NEEDS-MAJOR |
| Gemini Pro 3 | MERGE-NEEDS-MAJOR |
| Copilot | (no fresh review on this HEAD) |
What's fixed (round-1 → round-2)
- ✓ Type IDs 11-13 now decoded + ACKed (replay loop closed). Both Codex and Gemini verified at eventstream.go:340-359.
- ✓ Per-event-type counter fields added on
EventStreamstruct at eventstream.go:54-62.
What's still broken — convergent across both reviewers
RT_FLOW wire shape STILL wrong. PR claim "RT_FLOW-shaped decode" is factually false. Both reviewers verified independently:
- Codex: "AF as
4/6, ports as little-endian at offsets4/6/8/10, IPs at56/72/88/104." - Gemini: "Address Families are encoded as
4/6instead of standard2/10. Ports are decoded as Little-Endian instead of Big-Endian." - eBPF spec at xpf_common.h:521-549: "AF is
2/10at offset55, ports are BE at40/42/104/106, timestamp is offset0, src/dst IP at8/24."
The custom 120-byte layout introduced in this PR cannot be consumed by the existing RT_FLOW formatter. Existing syslog consumers will see nothing.
No adapter to pkg/logging/ringbuf.go (Codex). Events buffered via eventBuf.Add(rec) at daemon_ha_userspace.go:510-513 bypass the existing ringbuf decoder, name resolution, session-id assignment, syslog/local-log fanout, and callbacks. The EventRecord name/session fields are empty.
Drop counters not exposed (Codex). Counters at eventstream.go:54-62 are private fields, NOT in userspace status JSON, daemon status, or Prometheus.
Tests pin the wrong format. Both reviewers caught this: TestDecodeDataplaneEventPolicyDenyRTFlow at eventstream_test.go:210 asserts the LittleEndian custom layout. A test that pinned the RT_FLOW shape would have caught the bug.
Self-correction
My round-2 review said "MERGE-READY-pending" deferring verification to Codex/Gemini. That's not the methodology — Claude is the 4th hostile reviewer, not a "wait and see" reviewer. I should have grepped xpf_common.h:521-549 to confirm the RT_FLOW shape. Same miss pattern.
Recommendation
Block on:
- RT_FLOW wire format compatibility — either match
xpf_common.h:521-549(AF 2/10, BE ports at 40/42/104/106, timestamp offset 0, src/dst IP at 8/24, action/session-id/name fields), OR add explicit adapter atpkg/logging/ringbuf.goto convert. - Event routing through
pkg/logging/ringbuf.go— events must reach syslog/local/callbacks fanout, not justeventBuf.Add. - Expose per-event-type drop counters via userspace status JSON + Prometheus.
- Replace test fixtures with RT_FLOW byte-layout tests.
Codex task: task-mp96mm3s-58v1p8. Gemini Pro 3 task: task-mp96nabo-19jowo. Not merging — author's decision.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (2)
userspace-dp/src/event_stream/codec.rs:444
u16::try_from(event.application_id).unwrap_or(0)silently zeroes out anyapplication_idgreater thanu16::MAX. A high application ID would go on the wire as 0 rather than being truncated (as u16) or causing an explicit error. The round-trip test masks with& 0xffffso it does not catch this — forapplication_id = 0x10000the encoder would emit 0 instead of 0. Either useas u16to truncate consistently with the decoder's... as u32widening, or return an error / clamp visibly. The current silent-to-zero behavior loses information without signaling it.
let app_id = u16::try_from(event.application_id).unwrap_or(0);
buf[base + 132..base + 134].copy_from_slice(&app_id.to_le_bytes());
userspace-dp/src/event_stream/codec.rs:136
DataplaneEventPayloadexposesowner_rg_id,reason,rule_id,term_id, the high 16 bits ofapplication_id, and the per-kind ID fields that aren't selected bykind(e.g.screen_idfor a PolicyDeny payload). None of these are written byencode_dataplane_event, anddecode_dataplane_eventreturns 0/None for them. Callers who populate these fields will see them silently disappear on round-trip — the struct API promises more than the wire format carries.
Two options worth considering: (a) drop the fields from the struct (or move them behind a kind-specific variant) so the API reflects what actually travels on the wire, or (b) extend the wire format / dataplane.Event shape to carry them. As-is, this is a future foot-gun for the producer wiring slice mentioned in the PR description, and the round-trip tests don't catch it because they only assert the encoded subset.
pub(crate) struct DataplaneEventPayload {
pub(crate) kind: DataplaneEventKind,
pub(crate) addr_family: u8,
pub(crate) protocol: u8,
pub(crate) src_ip: IpAddr,
pub(crate) dst_ip: IpAddr,
pub(crate) src_port: u16,
pub(crate) dst_port: u16,
pub(crate) nat_src_ip: Option<IpAddr>,
pub(crate) nat_dst_ip: Option<IpAddr>,
pub(crate) nat_src_port: u16,
pub(crate) nat_dst_port: u16,
pub(crate) ingress_zone_id: u16,
pub(crate) egress_zone_id: u16,
pub(crate) ingress_ifindex: i32,
pub(crate) owner_rg_id: i16,
pub(crate) reason: u16,
pub(crate) policy_id: u32,
pub(crate) rule_id: u32,
pub(crate) application_id: u32,
pub(crate) filter_id: u32,
pub(crate) term_id: u32,
pub(crate) screen_id: u32,
pub(crate) timestamp_ns: u64,
}
| pub(crate) const MSG_FILTER_LOG: u8 = 13; | ||
|
|
||
| #[allow(dead_code)] | ||
| pub(crate) const SECURITY_EVENT_PAYLOAD_SIZE: usize = 136; |
| reason uint16, | ||
| policyID uint32, | ||
| timestampNS uint64, | ||
| ) []byte { | ||
| _ = reason // RT_FLOW policy-deny records carry policy identity, not the userspace-only reason field. |
| func DecodeRawEventRecord(data []byte) (EventRecord, bool) { | ||
| if len(data) < int(unsafe.Sizeof(dataplane.Event{})) { | ||
| return EventRecord{}, false | ||
| } |
Round-3 fix updatePushed What changed:
Validation:
The cargo run still emits the repo's existing warning noise; the targeted event-stream codec tests are 11/11 passing. |
Claude round-3 review on
|
| Field | Offset | Endianness | Match xpf_common.h |
|---|---|---|---|
| Timestamp | 0-8 | LE | ✓ |
| SrcIP | 8-24 | bytes | ✓ |
| DstIP | 24-40 | bytes | ✓ |
| SrcPort | 40-42 | BE | ✓ |
| DstPort | 42-44 | BE | ✓ |
| PolicyID | 44-48 | LE | ✓ |
| EventType | 52 | u8 | ✓ |
| Protocol | 53 | u8 | ✓ |
| Action | 54 | u8 | ✓ |
| AddrFamily | 55 | u8 (AF_INET=2, AF_INET6=10) | ✓ |
| NATSrcPort | 104-106 | BE | ✓ |
| NATDstPort | 106-108 | BE | ✓ |
This decoder produces a proper EventRecord that flows through the existing ringbuf consumer (syslog/local/callbacks) — eliminates the round-2 "custom 120-byte layout that no one can consume" gap.
Round-2 MAJOR 2 (adapter to ringbuf) — CLOSED ✓
DecodeRawEventRecord is the adapter Codex demanded. Same event shape, same EventRecord output as the BPF ringbuf path.
Round-2 MAJOR 3 (counter exposure) — CLOSED ✓
statusfmt.go +9 exposes per-event-type counters. Codex/Gemini will verify these flow to status JSON.
Round-2 MAJOR 4 (tests pin wrong format) — needs verification
Will verify tests now check the RT_FLOW byte layout, not the custom 120-byte layout from round-2.
Awaiting Codex (task-mp99djah-o6qm3i) and Gemini Pro 3 (task-mp99e5zu-fmc38b).
Claude round-3 self-correction on
|
Round-3 quad-review consolidated synthesis on
|
| Reviewer | Verdict |
|---|---|
| Claude | MERGE-READY → MERGE-NEEDS-MAJOR (self-corrected) |
| Codex | MERGE-NEEDS-MAJOR |
| Gemini Pro 3 | MERGE-NEEDS-MAJOR |
Round-2 MAJORs — partially closed
| Round-2 MAJOR | Status |
|---|---|
| 1. RT_FLOW wire shape | CLOSED ✓ — Codex+Gemini both verified byte layout matches xpf_common.h:521-549 |
| 2. No adapter to ringbuf | STILL OPEN — DecodeRawEventRecord is decode-only, daemon still calls eventBuf.Add not EventReader.logEvent |
| 3. Drop counters not exposed | PARTIAL — status JSON ✓, status text ✓, Prometheus missing |
| 4. Tests pin wrong format | CLOSED ✓ |
Converged MAJOR — fanout still bypassed (Codex+Gemini)
Both reviewers caught the same gap independently. The new DecodeRawEventRecord adapter at ringbuf.go:460-465 explicitly says: "intentionally skips name resolution and syslog fanout."
Daemon path: eventstream.go:355-380 → DecodeRawEventRecord → onDataplaneEvent → daemon_ha_userspace.go:510-513 d.eventBuf.Add(rec). Never reaches EventReader.logEvent()'s syslog clients, local writers, name resolution, session-id assignment (ringbuf.go:331-457).
Codex new MAJOR — HA-only event delivery scope
"
runUserspaceEventStream()is only started from the cluster session-sync path (daemon_ha_sync.go:700-713) and returns to polling ifd.clusterord.sessionSyncis nil (daemon_ha_userspace.go:485-489)."
Worked failure: standalone (non-clustered) userspace dataplane consumes/ACKs helper frames but never wires the dataplane event callback. Non-HA deployments see NO policy/screen/filter events.
What's closed ✓
- Wire format: byte-exact match to xpf_common.h (timestamp@0, IPs@8/24, BE ports@40-44 + 104-108, AF@55)
- Type IDs 11-13 consumed and ACKed (replay loop closed)
- Tests now pin the RT_FLOW layout
Recommendation
Block on:
- Route decoded events through
EventReader.logEvent()so syslog/local/callbacks/name-resolution/session-id fanout fires. - Wire
runUserspaceEventStream()for standalone userspace dataplane (non-cluster, no session-sync). Currently only HA-clustered nodes emit events. - Prometheus metrics for per-event-type counters + producer-side
event_stream_dropped.
Strongly consider: session-id assignment for the userspace event path (wire shape doesn't carry it; existing ringbuf.go:318-329 adds it after decode).
Codex task: task-mp99djah-o6qm3i. Gemini Pro 3 task: task-mp99e5zu-fmc38b. Not merging — author's decision.
Round-4 blocker fixes pushed:
|
Claude round-4 review on
|
Claude round-4 self-correction on
|
Round-4 quad-review consolidated synthesis on
|
| Reviewer | Verdict |
|---|---|
| Claude | MERGE-NEEDS-MAJOR (self-corrected from READY-pending after Codex MAJOR) |
| Codex | MERGE-NEEDS-MAJOR |
| Gemini Pro 3 | MERGE-NEEDS-MAJOR |
| Copilot | Re-review requested; still on stale commit bc854c0c |
Codex + Gemini converge on identical 3 MAJORs with quote-grounded evidence. Block.
MAJOR 1 — HA startup race bypasses fanout (Codex + Gemini)
daemon_run.go:277-296 starts cluster comms BEFORE d.eventReader is created (initialization at er = logging.NewEventReader(evSrc, eventBuf); d.eventReader = er happens later in Run). startClusterComms spawns goroutine calling runUserspaceEventStream, which hits daemon_ha_userspace.go:512-523:
if d.eventReader != nil {
es.SetOnRawDataplaneEvent(...)
} else {
es.SetOnDataplaneEvent(func(seq uint64, rec logging.EventRecord) {
if d.eventBuf != nil { d.eventBuf.Add(rec) }
}) // legacy fallback installed FOREVER
}Impact: On HA clusters, EventReader.logEvent fanout (local file, syslog, NetFlow, IPFIX, trace) is silently bypassed because the callback is wired before the reader is constructed.
MAJOR 2 — Prometheus producer drops invisible (Codex + Gemini)
Rust exports the counters at userspace-dp/src/protocol.rs:801-804:
#[serde(rename = "event_stream_sent", default)] pub event_stream_sent: u64,
#[serde(rename = "event_stream_dropped", default)] pub event_stream_dropped: u64,Go pkg/dataplane/userspace/protocol.go:412-480 ProcessStatus struct omits both fields. pkg/api/metrics.go:494-520 + 1178-1215 exposes daemon-side frames_written/read but the helper's event_stream_dropped is black-holed.
MAJOR 3 — Callback-before-Start contract violation (Codex + Gemini)
pkg/dataplane/userspace/eventstream.go:74-88 documents: "Must be called before Start()."
pkg/dataplane/userspace/process.go:55-58:
es := NewEventStream(evtPath)
esCtx, esCancel := context.WithCancel(context.Background())
es.Start(esCtx) // <-- Start() called
m.eventStream = esCallbacks are wired later from daemon_ha_userspace.go:507-525. The readLoop goroutine can accept/decode/ack/mark-applied frames with nil callbacks — silently loses early events + unsynchronized function-pointer write/read hazard.
Test coverage gap (both reviewers)
New tests in eventstream_test.go and metrics_test.go cover decode/format/buffer in isolation. ZERO integration tests for:
- (a) Daemon fanout from
EventReader.logEvent - (b) HA vs standalone startup timing
- (c) Prometheus scrape exposure of producer drops
Block on
- Move
eventBuf+d.eventReadercreation BEFOREstartClusterComms()indaemon_run.go. - Wire callbacks BEFORE
Start()per stream's documented contract. - Add Go-side
EventStreamProducerStats { sent, dropped }mirroring Rust, surface via Prometheus collector. - Add HA-startup-race integration test that fails the current code.
- Add Prometheus-scrape test that verifies helper
event_stream_droppedreaches scrape output.
Defer to follow-up
None — all findings tied to PR scope.
Round-4 claim trace
- Codex
task-mp9a4vzs-3p9nxo— 3 MAJORs verified - Gemini
task-mp9adbpn-2fvaak— independently caught same 3 MAJORs withgit showquotes - Copilot
bc854c0c— stale; re-review @copilot requested
Not merging — author's decision.
Claude round-7 review on
|
Re-reviewed current HEAD |
|
@copilot review Round-7 commit 485baf4 addresses round-6 findings:
Re-review please. |
Re-reviewed on |
Claude round-7 self-correction on
|
Round-7 quad-review consolidated synthesis on
|
| Reviewer | Verdict |
|---|---|
| Claude | MERGE-NEEDS-MAJOR (self-corrected from READY after Codex+Gemini findings) |
| Codex | MERGE-NEEDS-MAJOR (3 MAJORs) |
| Gemini Pro 3 | MERGE-NEEDS-MAJOR (1 MAJOR: FullResync queue bypass) |
| Copilot | Still stale at 2ec682c4; no r7 review yet |
Triple converge MERGE-NEEDS-MAJOR. Substantive r6 fixes ARE in (bounded queue, codec contract restored, hardcoded action byte removed, producer counters wired). But the queue + reconnect + downstream-readiness interaction has 4 distinct unaddressed issues.
Round-6 wins (all reviewers agree)
- Audit-loss: drop-and-ack replaced with bounded 4096-frame queue
- Codec contract:
action/rule_id/term_id/reason/owner_rg_idrestored AND wired encode_dataplane_eventhardcoded action byte →event.actionEventStream{Sent,Dropped}populated fromhelpers.rs:91-96(Codex r6 was right; Copilot r6 was wrong on this point)SetOnFullResyncno longer wired unconditionally (Gemini r6 finding addressed)
MAJOR 1 — Queue overflow is NOT lossless (Codex)
Go queue 4096 (eventstream.go:19) + Rust producer replay buffer 4096 (userspace-dp/src/event_stream/mod.rs:35-36) + FullResync suppressed when acked_seq == 0 (mod.rs:431-435, :356-371).
Worked failure:
- 5000 frames arrive before callbacks wire
- Go queues 1..4096; at 4097,
enqueuePendingCallbackFramereturns false → readLoop exits lastAckSeq == 0(no callback flush, no ACK yet)- Stream reconnects via acceptLoop;
clearPendingCallbackFrameswipes Go queue - Rust replay covers seq 2..4097 (evicted seq 1)
FullResyncsuppressed becauseacked_seq == 0- Seq 1 permanently lost
Fix paths: producer back-pressure (pause writes when Go approaches limit) OR FullResync contract that fires on overflow regardless of acked_seq == 0.
MAJOR 2 — clearPendingCallbackFrames races active flush (Codex)
flushPendingCallbackFrames serializes via pendingFlushMu at :519-563. clearPendingCallbackFrames from acceptLoop at :249-252 bypasses the flush mutex.
Worked race:
- Connection A queues frames;
SetOnEventfires → flush starts, acquirespendingFlushMu, mid-dispatch - Connection A drops; acceptLoop spins to next iter
clearPendingCallbackFrames(no mutex) wipes the queue mid-flush- New connection B starts; seq counter resets to 0
- Mid-flight flush calls
markFrameApplied(stale_seq)against connection B → bogus ACK position
Fix: clearPendingCallbackFrames must take pendingFlushMu. Add reconnect-generation token so flushes from prior connection no-op.
Note: Gemini r7 disagreed, calling the mutex layout "clean/correct". Codex's worked race is line-level concrete; Claude's read agrees with Codex.
MAJOR 3 — Downstream callback "not ready" still ACKs (Codex)
Queue only covers callbacks being nil. Once onEvent is wired, handleEventStreamDelta at daemon_ha_userspace.go:523-524, :547-559 can DROP because:
d.cluster == nild.sessionSync == nil- Not primary for owner RG
- Not connected
cfg == nil
EventStream still calls markFrameApplied and ACKs because callback returned (without error). Acknowledged-before-applied invariant violated.
Fix: push the queue ABOVE handleEventStreamDelta — queue until cluster/sessionSync ready AND primary check passes. Or: have callback return a "drop/not-ready" signal that suppresses ACK.
MAJOR 4 — FullResync not queued (Gemini)
EventTypeFullResync at eventstream.go is dispatched INLINE, NOT through the queue:
case EventTypeFullResync:
slog.Warn("event stream: full resync requested by helper")
es.callbackMu.RLock()
onFullResync := es.onFullResync
es.callbackMu.RUnlock()
if onFullResync != nil { onFullResync() }Worked race (Gemini):
- Helper connects, requests
MSG_FULL_RESYNC wireUserspaceEventStreamCallbacksis still in its 500ms wait loop OR betweenSetOnEventandSetOnFullResync- Frame arrives →
onFullResync == nil→ silently dropped - Helper permanently stuck without startup state until secondary disruption triggers reconnect
Fix: route EventTypeFullResync through the same pendingCallbackFrames queue.
MINOR — frames_sent semantics still wrong (Codex, carried)
server/helpers.rs:91-96 assigns the counter, but event_stream/mod.rs:446-458 increments per socket write, not per frame in write_buf. Steady-state undercount. Defer to follow-up.
Codex+Gemini disagreement notes
- Queue overflow lossy vs acceptable: Codex traced the Rust replay buffer + FullResync suppression; verified lossy. Gemini called it acceptable but did NOT examine the replay buffer interaction. Codex's analysis stands.
- Clear-vs-flush race: Codex demonstrated; Gemini called the mutex layout clean. Codex's worked race shows the bypass; mutex layout is moot if
clearPendingCallbackFramesbypassespendingFlushMu.
Recommendation
Block on:
- Queue overflow contract: producer back-pressure or
FullResyncon overflow regardless ofacked_seq == 0(Codex) clearPendingCallbackFramesmutex coverage + reconnect-generation token (Codex)- Downstream "not ready" callback signaling that suppresses ACK (Codex)
- Route
FullResyncthrough the queue (Gemini)
MINOR follow-up:
frames_sentsemantics (write-call vs frame count)- Verify
handleEventStreamFullResyncstandalone safety
Round-7 claim trace
- Codex
task-mp9eflxu-tlcpp1— 3 MAJORs (overflow, clear race, downstream ACK) - Gemini Pro 3
task-mp9egfb3-ov5e8i— 1 MAJOR (FullResync queue bypass) - Copilot — stale at
2ec682c4; re-trigger if needed
Not merging — author's decision.
|
@copilot review Round-7 commit 485baf4 addresses round-6 findings. Major changes:
Re-review please. Note Codex r7 flagged 3 new MAJORs (overflow lossy + Rust replay buffer interaction, clear-vs-flush mutex race, downstream callback 'not ready' still ACKs) and Gemini r7 flagged 1 MAJOR (FullResync not queued). |
Re-reviewed on |
Round-8 fix update on
|
|
@copilot review Round-8 head |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 20 changed files in this pull request and generated 8 comments.
Comments suppressed due to low confidence (4)
pkg/dataplane/userspace/eventstream.go:497
- Operator-precedence here is correct (
&&binds tighter than||) but the intent is non-obvious. The condition reads as "queue if (both callbacks nil) OR (pending frames exist)". Adding explicit parentheses around(onRawDataplaneEvent == nil && onDataplaneEvent == nil)would make the readiness gate unambiguous and avoid a future regression if the conditions are extended.
if onRawDataplaneEvent == nil && onDataplaneEvent == nil || es.hasPendingCallbackFrames() {
pkg/dataplane/userspace/eventstream.go:595
- In
flushPendingCallbackFrames, thedefaultbranch logs a warning and then falls through tomarkFrameApplied(frame.seq)(ACKing the frame) and popping it. Today no frame type can reach this branch (only the three explicitly-handled type groups are ever enqueued), but if a future code path enqueues a new frame type and forgets to handle it here, the queued frame would be silently ACKed without invoking any callback. Consider either making this branch panic in debug builds or asserting that all enqueued types must match acasearm, to avoid silently dropping audit-grade frames on a future contract change.
default:
slog.Warn("event stream: dropping unsupported pending callback frame",
"type", frame.typ, "seq", frame.seq)
}
es.markFrameApplied(frame.seq)
pkg/dataplane/userspace/eventstream.go:601
flushPendingCallbackFramesremoves the head element bycopy(es.pendingCallbackFrames, es.pendingCallbackFrames[1:])and shrinking; this is O(n) per pop, so flushing a full 4096-entry pending queue is O(n²) in slice copies plus retains the underlying backing array (which holds 4096[]bytepayloads viadataplanePayload) until the slice is fully drained and capped — preventing GC of any already-copied payload bytes. Consider tracking a head index and reslicing, orpending = pending[1:](and periodically compacting), so callback-pause recovery scales linearly and frees payload memory as frames are drained.
es.markFrameApplied(frame.seq)
es.pendingMu.Lock()
if len(es.pendingCallbackFrames) > 0 && es.pendingCallbackFrames[0].seq == frame.seq {
copy(es.pendingCallbackFrames, es.pendingCallbackFrames[1:])
es.pendingCallbackFrames = es.pendingCallbackFrames[:len(es.pendingCallbackFrames)-1]
}
es.pendingMu.Unlock()
pkg/dataplane/userspace/eventstream_test.go:657
bytesis imported but only used inTestEventStreamDataplaneEventRawCallbackPreferred. Verify it is required (it is —bytes.Equal), but note this test also re-importsloggingeven though the package already uses it elsewhere; no action needed for the import itself. However, the assertionif seq != 11 { t.Fatalf(...) }is performed inside a goroutine callback — callingt.Fatalffrom a non-test goroutine in Go logs the failure but does not stop the test, which means the test may proceed and produce a confusing secondary failure. Prefer pushing the seq value to the channel and asserting on the main goroutine.
es.SetOnRawDataplaneEvent(func(seq uint64, payload []byte) {
if seq != 11 {
t.Fatalf("seq = %d, want 11", seq)
}
rawGot <- append([]byte(nil), payload...)
})
es.SetOnDataplaneEvent(func(uint64, logging.EventRecord) {
decodedGot <- struct{}{}
})
| const pendingCallbackFramesLimit = 4096 | ||
|
|
||
| type pendingCallbackFrame struct { | ||
| typ uint8 | ||
| seq uint64 | ||
| sessionDelta SessionDeltaInfo | ||
| dataplanePayload []byte | ||
| dataplaneRecord logging.EventRecord | ||
| } | ||
|
|
| pub(crate) const MSG_FILTER_LOG: u8 = 13; | ||
|
|
||
| #[allow(dead_code)] | ||
| pub(crate) const SECURITY_EVENT_PAYLOAD_SIZE: usize = 136; |
| SessionPkts: evt.SessionPackets, | ||
| SessionBytes: evt.SessionBytes, | ||
| RevSessionPkts: evt.RevPackets, | ||
| RevSessionBytes: evt.RevBytes, | ||
| CloseReason: closeReasonName(evt.CloseReason), | ||
| } | ||
| if evt.EventType != dataplane.EventTypeSessionClose { |
| EventTypeSessionOpen uint8 = 1 | ||
| EventTypeSessionClose uint8 = 2 | ||
| EventTypeSessionUpdate uint8 = 3 | ||
| EventTypeAck uint8 = 4 // daemon → helper | ||
| EventTypePause uint8 = 5 // daemon → helper | ||
| EventTypeResume uint8 = 6 // daemon → helper | ||
| EventTypeDrainRequest uint8 = 7 // daemon → helper (target seq in header) | ||
| EventTypeDrainComplete uint8 = 8 // helper → daemon | ||
| EventTypeFullResync uint8 = 9 // helper → daemon | ||
| EventTypeKeepalive uint8 = 10 // helper → daemon (idle heartbeat) | ||
| EventFrameTypePolicyDeny uint8 = 11 // helper → daemon (RT_FLOW policy deny) | ||
| EventFrameTypeScreenDrop uint8 = 12 // helper → daemon (RT_FLOW screen drop) | ||
| EventFrameTypeFilterLog uint8 = 13 // helper → daemon (RT_FLOW filter log) |
| reason uint16, | ||
| policyID uint32, | ||
| timestampNS uint64, | ||
| ) []byte { | ||
| _ = reason // RT_FLOW policy-deny records carry policy identity, not the userspace-only reason field. |
| // SetOnEvent sets the callback for session events. The callback returns true | ||
| // only after the delta is durably handled; false withholds ACK so the helper can | ||
| // replay instead of losing an event during readiness transitions. | ||
| func (es *EventStream) SetOnEvent(fn func(eventType uint8, seq uint64, delta SessionDeltaInfo) bool) { | ||
| es.callbackMu.Lock() | ||
| es.onEvent = fn | ||
| es.callbackMu.Unlock() | ||
| es.flushPendingCallbackFrames() | ||
| } | ||
|
|
||
| // SetOnFullResync sets the callback for full resync requests. Must be called before Start(). | ||
| func (es *EventStream) SetOnFullResync(fn func()) { | ||
| // SetOnDataplaneEvent sets the callback for RT_FLOW-style dataplane events. | ||
| func (es *EventStream) SetOnDataplaneEvent(fn func(seq uint64, rec logging.EventRecord)) { | ||
| es.callbackMu.Lock() | ||
| es.onDataplaneEvent = fn | ||
| es.callbackMu.Unlock() | ||
| es.flushPendingCallbackFrames() | ||
| } | ||
|
|
||
| // SetOnRawDataplaneEvent sets the callback for raw RT_FLOW dataplane events. | ||
| // It is preferred when the receiver can process the canonical dataplane.Event | ||
| // payload itself, because it preserves name resolution and syslog fanout. | ||
| func (es *EventStream) SetOnRawDataplaneEvent(fn func(seq uint64, payload []byte)) { | ||
| es.callbackMu.Lock() | ||
| es.onRawDataplaneEvent = fn | ||
| es.callbackMu.Unlock() | ||
| es.flushPendingCallbackFrames() | ||
| } | ||
|
|
||
| // SetOnFullResync sets the callback for full resync requests. The callback | ||
| // returns true only after the resync request has been acted on. | ||
| func (es *EventStream) SetOnFullResync(fn func() bool) { | ||
| es.callbackMu.Lock() | ||
| es.onFullResync = fn | ||
| es.callbackMu.Unlock() | ||
| es.flushPendingCallbackFrames() | ||
| } |
| // ProcessRawEvent feeds an RT_FLOW-format dataplane.Event record through the | ||
| // same enrichment, buffering, callback, and syslog/local-log fanout path used | ||
| // by the eBPF ring-buffer reader. Userspace transports should call this rather | ||
| // than adding decoded records directly to EventBuffer. | ||
| func (er *EventReader) ProcessRawEvent(data []byte) bool { | ||
| if len(data) < int(unsafe.Sizeof(dataplane.Event{})) { | ||
| return false | ||
| } | ||
| er.logEvent(data) | ||
| return true | ||
| } |
| if evt.EventType != dataplane.EventTypeSessionClose && len(data) >= 136 { | ||
| rec.RuleID = binary.LittleEndian.Uint32(data[56:60]) | ||
| rec.TermID = binary.LittleEndian.Uint32(data[60:64]) | ||
| rec.OwnerRGID = int16(binary.LittleEndian.Uint16(data[64:66])) | ||
| if data[134] != dataplane.CloseReasonNone { | ||
| rec.Reason = closeReasonName(data[134]) | ||
| } | ||
| } |
Re-reviewed on |
Claude round-8 review on
|
|
@copilot review Round-8 commit 450a60b addresses all round-7 findings:
Re-review please. |
Re-reviewed on |
Claude round-8 self-correction on
|
Round-8 quad-review consolidated synthesis on
|
| Reviewer | Verdict |
|---|---|
| Claude | MERGE-NEEDS-MAJOR (self-corrected from READY after Codex+Gemini findings) |
| Codex | MERGE-NEEDS-MAJOR (2 MAJORs) |
| Gemini Pro 3 | MERGE-NEEDS-MAJOR (1 new MAJOR: CPU spin) |
| Copilot | MERGE-NEEDS-MAJOR (1 fresh MAJOR + 7 MINORs on 450a60bc) |
Quad-converged MERGE-NEEDS-MAJOR with 4 distinct issues across reviewers.
Round-7 wins (all reviewers agree)
- ✓ FullResync queueing mechanism (Gemini r7 MAJOR closed)
- ✓
clearPendingCallbackFramesmutex lock-order fix (Codex r7 MAJOR closed) - ✓ Downstream callback
not readyno longer ACKs (Codex r7 MAJOR closed — for the direct callback path) - ✓ Rust replay buffer overflow trigger for FullResync when
acked_seq == 0(Codex r7 MAJOR partially closed — see below)
MAJOR 1 — FullResync not lossless if not ACKed (Codex)
userspace-dp/src/event_stream/mod.rs:370 clears replay_buf AFTER writing FullResync but BEFORE the daemon ACKs it.
Worked failure (Codex):
- 5000 frames arrive, Go overflows at 4096 → readLoop returns
- Stream reconnects; Rust detects gap, sends FullResync, clears
replay_buf - Daemon closes BEFORE ACKing FullResync (helper crash, network blip, restart)
- Reconnect:
acked_seqstill 0,replay_bufempty - New condition
(replay_buf.is_empty() && acked_seq > 0) || oldest_buffered > acked_seq + 1:replay_buf.is_empty() && acked_seq > 0=true && false= falsereplay_bufempty →oldest_buffered = 0→0 > 1= false- has_gap = false → no FullResync sent
- Treated as fresh start. Seq 1 still permanently lost.
The new test only proves the FIRST FullResync is sent, not that it's retried until ACK.
Fix paths: "resync-pending" flag in shared state (clear only on ACK), OR retain replay buffer entries until ACK confirms, OR send FullResync on every zero-ACK reconnect.
MAJOR 2 — handleEventStreamFullResync ACKs without verifying side-effects (Codex)
pkg/daemon/daemon_ha_userspace.go:583 checks exporter exists and cfg exists, but does NOT check:
d.cluster == nild.sessionSync == nilIsLocalPrimaryAny()(no primary RGs → empty rgIDs loop → returns true with no work)d.sessionSync.IsConnected()
If sync disconnected, QueueSessionV4 at sync_conn.go:416 silently drops. Caller returns true → ACK → helper's replay buffer cleared / acked_seq advanced. No peer received bulk export.
Fix: add the same readiness gates as handleEventStreamDelta.
MAJOR 3 — Callback-returns-false → CPU spin loop on HA transition (Gemini)
The return false → disconnect → reconnect sequence lacks backoff on successful connections.
Worked failure (Gemini):
- Daemon transitions to Backup;
IsLocalPrimaryAny()returns false - Helper sends seq 1
handleEventStreamDeltareturns false- readLoop returns → stream closes via
connCancel() - Helper reads EOF → returns true → reconnects (no backoff on success path; only
Err(_)triggers 100ms backoff intry_connect) UnixStream::connectsucceeds immediately- Helper replays seq 1
- Daemon accepts; readLoop reads seq 1; returns false again
- Cycle repeats at IPC speeds → 100% CPU + log saturation on both processes
I flagged this as MINOR in my Claude r8; Gemini correctly elevated with worked trace.
Remediation paths (Gemini):
- If backup nodes don't need events: return true (consume + discard) rather than false
- If false is required: implement backoff (
EventStream.SendPause()before disconnect, OR acceptLoop sleep on brief sessions, OR delayed post-EOF reconnect in helper)
MAJOR 4 — Codec wire-position overlap (Copilot)
pkg/logging/ringbuf.go: DecodeRawEventRecord unconditionally reads evt.SessionPackets from data[56:64] and evt.SessionBytes from data[64:72] for every event type.
But encode_dataplane_event for security event frames (PolicyDeny/ScreenDrop/FilterLog) overloads those same byte ranges with rule_id (56..60), term_id (60..64), and owner_rg_id (64..66).
Decoder reads garbage values into rec.SessionPkts/rec.SessionBytes for security events. The PR description's "120-byte" payload claim is also inconsistent with code's 136 bytes (carried doc mismatch).
Fix: gate the SessionPackets/SessionBytes read to non-security event types, OR define a separate wire layout for security frames vs session events.
MINOR (Copilot r8)
- PR description says "codec foundation" but PR adds substantial Go consumer wiring (false advertising)
- Naming inconsistency
EventType*vsEventFrameType*on the same wire enum reasonparameter discarded in test helper (carried)- SetOn* doc comments still imply "before Start()" semantics; should document late-registration replay
ProcessRawEventdoesn't guard against niler.source- Duplicate decode block in
ProcessRawEventvsDecodeRawEventRecord(factor into helper)
Recommendation
Block on:
- FullResync persistence: pending-flag OR retain replay buf until ACK OR resend on every zero-ACK reconnect (Codex MAJOR 1)
handleEventStreamFullResyncreadiness gates matchinghandleEventStreamDelta(Codex MAJOR 2)- Callback-false backoff to prevent CPU-spin reconnect loop (Gemini MAJOR 3)
- Gate
SessionPackets/SessionBytesdecode by event type, OR define separate security-frame wire layout (Copilot MAJOR 4)
Strongly consider:
- Update PR description (120 → 136 bytes; "codec foundation" scope mismatch)
- Rename for consistency:
EventType*for all wire-enum members - Document late-registration replay semantics on SetOn* setters
Round-8 claim trace
- Codex
task-mp9g055f-rhng52— 2 MAJORs (FullResync persistence, ACK-without-resync) - Gemini Pro 3
task-mp9g0o6o-yk2a26— 1 MAJOR (CPU spin loop) + verified r7 wins - Copilot
450a60bcreview — 8 inline findings, 1 MAJOR (codec wire-position overlap)
Not merging — author's decision.
Refs #1379
Adds the userspace-dp event-stream codec foundation for policy deny, screen drop, and filter log telemetry frames.
Changes:
Validation:
Remaining gaps: