Skip to content

Commit 037247d

Browse files
committed
feat: draft agent sse
1 parent a8ece8b commit 037247d

30 files changed

Lines changed: 2251 additions & 3 deletions
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
# Harden SSE Events Module — Tests, I/O Safety, Error Codes
2+
3+
This ExecPlan is a living document. The sections Progress, Surprises & Discoveries, Decision Log, and Outcomes & Retrospective must be kept up to date as work proceeds.
4+
5+
## Scope
6+
7+
| Repository | Access | Description |
8+
|-----------|--------|-------------|
9+
| `agent/` | read-write | Tests for events module, store I/O redesign, SSE error response improvements |
10+
11+
This plan lives in `agent/.ai/exec-plans/` because all code changes are within the agent repository. It follows the first-pass implementation tracked in `agent/.ai/exec-plans/active/20260310-agent-sse-deployment-events.md`.
12+
13+
## Purpose / Big Picture
14+
15+
The SSE events module was implemented as a first pass with no test coverage and several I/O design shortcuts. This plan hardens the implementation so it is production-ready:
16+
17+
1. A developer can run `./scripts/test.sh` and see comprehensive tests covering the event store, hub, model, and SSE handler — including edge cases like crash recovery, compaction, cursor expiration, and type filtering.
18+
2. The event store no longer blocks the tokio runtime during compaction or repeated file opens.
19+
3. SSE error responses return specific error codes (`cursor_expired`, `malformed_cursor`) so programmatic clients can distinguish error types.
20+
21+
This plan resolves agent tech debt items TD-1 through TD-4 and TD-6.
22+
23+
## Progress
24+
25+
- [ ] M1: Add tests for `events/model.rs` and `events/errors.rs`.
26+
- [ ] M2: Add tests for `events/store.rs` (append, replay, compaction, crash recovery).
27+
- [ ] M3: Add tests for `events/hub.rs` (publish, subscribe, cursor validation, try_publish).
28+
- [ ] M4: Add tests for `server/sse.rs` (cursor resolution, error responses, type filtering).
29+
- [ ] M5: Redesign store I/O — persistent file handle, `spawn_blocking` for compaction.
30+
- [ ] M6: Fix SSE error response codes.
31+
- [ ] M7: Integration test — event emission through sync path.
32+
33+
## Surprises & Discoveries
34+
35+
(Add entries as you go.)
36+
37+
## Decision Log
38+
39+
- Decision: Test the store and hub with real temp-dir-backed files, not mocks.
40+
Rationale: The store's crash recovery and compaction logic depend on real filesystem behavior (partial writes, atomic renames). Mocking would test the wrong thing. Use `tempfile::TempDir` for isolation.
41+
Date/Author: 2026-03-10
42+
43+
- Decision: Address I/O improvements (M5) after test coverage (M1-M4) so refactoring is protected by tests.
44+
Rationale: Tests-first ensures the redesign doesn't introduce regressions. The current I/O works correctly — the issue is performance under edge cases, not correctness.
45+
Date/Author: 2026-03-10
46+
47+
- Decision: Use `RwLock` instead of `Mutex` in EventHub to allow concurrent replays.
48+
Rationale: `replay_after()` only needs immutable access to the in-memory event vec. Multiple SSE clients replaying simultaneously shouldn't block each other. `publish()` takes a write lock. `broadcast_tx.send()` is already thread-safe and doesn't need the lock.
49+
Date/Author: 2026-03-10
50+
51+
## Outcomes & Retrospective
52+
53+
Not started. This section will be filled when milestones are implemented and validated.
54+
55+
## Context and Orientation
56+
57+
This plan operates on code introduced by the SSE implementation plan (`20260310-agent-sse-deployment-events.md`). Read that plan for full architectural context.
58+
59+
Key files being hardened:
60+
61+
- `agent/src/events/mod.rs`: Module root. Exports `EventHub`.
62+
- `agent/src/events/model.rs`: `Envelope` struct with `deployment_deployed()` and `deployment_removed()` constructors. Uses `device_api::models` for event payload types (`DeploymentDeployedBeta1Event`, `DeploymentRemovedBeta1Event`). `status_str()` helper serializes status enums.
63+
- `agent/src/events/store.rs`: `EventStore` with JSONL persistence. `init()` loads from disk tolerating malformed lines. `append()` assigns monotonic ID, opens file, writes, flushes, drops handle. `compact()` rewrites log via temp file + atomic rename when > `max_retained`. `replay_after()` uses `partition_point` for binary search.
64+
- `agent/src/events/hub.rs`: `EventHub` wraps `Mutex<EventStore>` + `broadcast::Sender<Envelope>`. `publish()` locks, appends, broadcasts. `replay_after()` locks, validates cursor, replays. `try_publish()` is fire-and-forget.
65+
- `agent/src/events/errors.rs`: `EventsErr` enum with `IoErr`, `SerializationErr`, `CursorExpiredErr`, `MalformedCursorErr`. Implements `crate::errors::Error` trait.
66+
- `agent/src/server/sse.rs`: SSE handler. Resolves cursor from query/header. Subscribes before replay. Chains replay stream with live broadcast stream. Deduplicates by ID. Filters by type. 30s heartbeat keep-alive.
67+
- `agent/src/sync/deployments.rs`: `apply_deployments()` emits `deployment.deployed.beta1` and `deployment.removed.beta1` events via `hub.try_publish()` for successful outcomes with `Deployed` or `Archived` activity status.
68+
69+
Test infrastructure:
70+
- Tests live in `agent/tests/`. Module structure mirrors `agent/src/`.
71+
- Test runner: `./scripts/test.sh``RUST_LOG=off cargo test --features test -- --test-threads=1`.
72+
- `tempfile` crate is already a workspace dependency.
73+
74+
## Plan of Work
75+
76+
### Milestone M1: Model and Error Tests
77+
78+
Create `agent/tests/events/mod.rs` and `agent/tests/events/model.rs`.
79+
80+
Tests for `model.rs`:
81+
- `envelope_deployment_deployed_has_correct_type`: Construct via `Envelope::deployment_deployed()`, assert `event_type == "deployment.deployed.beta1"`, `object == "event"`, `id == 0` (unassigned), `data` contains expected fields.
82+
- `envelope_deployment_removed_has_correct_type`: Same for `deployment_removed()`.
83+
- `envelope_serde_roundtrip`: Serialize an envelope to JSON and deserialize back, assert equality.
84+
- `status_str_serializes_enum_variants`: Test `status_str()` with known enum values, assert correct string output.
85+
86+
Tests for `errors.rs` (can go in `model.rs` or separate `errors.rs`):
87+
- `cursor_expired_returns_410`: Construct `CursorExpiredErr`, assert `http_status() == GONE`.
88+
- `malformed_cursor_returns_400`: Construct `MalformedCursorErr`, assert `http_status() == BAD_REQUEST`.
89+
90+
Add `pub mod events;` to `agent/tests/mod.rs`.
91+
92+
### Milestone M2: Store Tests
93+
94+
Create `agent/tests/events/store.rs`.
95+
96+
Each test creates a `TempDir` and constructs an `EventStore` pointing at a file inside it.
97+
98+
Tests:
99+
- `empty_store_has_no_events`: Init on non-existent file, assert `earliest_id() == None`, `latest_id() == None`, `replay_after(0)` returns empty vec.
100+
- `append_assigns_monotonic_ids`: Append 3 envelopes, assert IDs are 1, 2, 3.
101+
- `append_persists_to_disk`: Append 2 events, drop store, re-init from same file, assert both events are present with correct IDs.
102+
- `replay_after_returns_events_after_cursor`: Append 5 events, `replay_after(2)` returns events 3, 4, 5.
103+
- `replay_after_zero_returns_all`: Append 3 events, `replay_after(0)` returns all 3.
104+
- `replay_after_latest_returns_empty`: Append 3 events, `replay_after(3)` returns empty.
105+
- `compaction_keeps_90_percent`: Init with `max_retained = 10`, append 11 events. Assert len after compaction is 9 (90% of 10). Assert file on disk has 9 lines.
106+
- `compaction_preserves_ids`: After compaction, `earliest_id()` should be the ID of the 3rd event (first 2 drained), `latest_id()` unchanged.
107+
- `crash_recovery_tolerates_malformed_line`: Write a file with 2 valid JSON lines and 1 malformed line. Init store, assert 2 events loaded.
108+
- `crash_recovery_tolerates_trailing_empty_lines`: Write a file with valid events and trailing newlines. Init store, assert correct count.
109+
- `crash_recovery_continues_id_sequence`: Write a file with events having IDs 1-5. Init store, append new event, assert ID is 6.
110+
111+
### Milestone M3: Hub Tests
112+
113+
Create `agent/tests/events/hub.rs`.
114+
115+
Tests:
116+
- `publish_assigns_id_and_returns_envelope`: Create hub, publish envelope, assert returned envelope has id > 0.
117+
- `subscribe_receives_published_events`: Subscribe, publish 2 events, assert receiver gets both in order.
118+
- `replay_after_returns_historical_events`: Publish 3 events, `replay_after(1)` returns events 2, 3.
119+
- `replay_after_zero_with_empty_store_returns_empty`: New hub, `replay_after(0)` returns empty, no error.
120+
- `replay_after_expired_cursor_returns_error`: Publish events, compact so oldest is ID 5, `replay_after(2)` returns `CursorExpiredErr`.
121+
- `try_publish_does_not_panic_on_error`: This is harder to test directly without filesystem manipulation. At minimum, test the happy path doesn't return errors.
122+
- `multiple_subscribers_all_receive`: Subscribe twice, publish, assert both receivers get the event.
123+
124+
### Milestone M4: SSE Handler Tests
125+
126+
Create `agent/tests/server/sse.rs`.
127+
128+
These tests construct an axum Router with the events route and use `axum::body::to_bytes` or `tower::ServiceExt::oneshot` to test HTTP behavior without a real socket.
129+
130+
Tests:
131+
- `events_returns_200_with_event_stream_content_type`: GET `/v0.2/events`, assert 200 and `Content-Type: text/event-stream`.
132+
- `events_returns_400_for_malformed_cursor`: GET `/v0.2/events?after=abc`, assert 400 with error body containing `"malformed_cursor"` code.
133+
- `events_returns_410_for_expired_cursor`: Pre-populate and compact events so earliest is ID 100, GET `/v0.2/events?after=5`, assert 410 with `"cursor_expired"` code.
134+
- `events_returns_503_when_hub_not_initialized`: Construct state with `event_hub: None`, GET `/v0.2/events`, assert 503.
135+
- `events_replays_historical_events`: Publish 3 events, GET `/v0.2/events?after=0`, assert response body contains all 3 events as SSE frames.
136+
- `events_filters_by_type`: Publish events of different types, GET `/v0.2/events?types=deployment.deployed.beta1`, assert only matching events in response.
137+
- `last_event_id_header_works_as_cursor`: Publish 3 events, GET with `Last-Event-ID: 1`, assert events 2, 3 in response.
138+
- `after_takes_precedence_over_last_event_id`: Publish 5 events, GET with `after=3` and `Last-Event-ID: 1`, assert events 4, 5 in response.
139+
140+
Add `pub mod sse;` to `agent/tests/server/mod.rs`.
141+
142+
### Milestone M5: Store I/O Redesign
143+
144+
Refactor `EventStore` to:
145+
146+
1. **Keep a persistent file handle.** Add `writer: Option<BufWriter<File>>` to `EventStore`. Lazily open on first `append()`. Reuse for subsequent appends. Only reopen after compaction (since the file was replaced).
147+
148+
2. **Use `spawn_blocking` for compaction.** Compaction rewrites the entire log, which can be slow for large retention windows. Move the compaction work to a blocking thread. Since compaction is triggered inside `publish()` which holds the mutex, the simplest approach is:
149+
- After `append()` detects the threshold is exceeded, release the current writer handle.
150+
- Call `tokio::task::spawn_blocking` with the compaction closure.
151+
- Re-open the writer handle after compaction completes.
152+
- Since `EventHub::publish()` is already async and holds a mutex, awaiting the spawn_blocking is safe.
153+
154+
Alternatively: keep compaction synchronous but on the blocking thread by having `EventHub::publish()` check the threshold and spawn compaction after releasing the lock. This avoids holding the lock during compaction at the cost of slightly delayed compaction. Use this simpler approach.
155+
156+
3. **Switch from `Mutex` to `RwLock` in EventHub.** `replay_after()` only reads from the in-memory vec. Multiple SSE clients replaying concurrently should not block each other. `publish()` takes a write lock.
157+
158+
Changes:
159+
- `agent/src/events/store.rs`: Add `writer: Option<BufWriter<File>>` field. `append()` reuses writer. `compact()` closes writer, rewrites, sets writer to None (will be lazily reopened). Add `fn needs_compaction(&self) -> bool`.
160+
- `agent/src/events/hub.rs`: Change `Mutex<EventStore>` to `RwLock<EventStore>`. `publish()` uses `write().await`. `replay_after()` uses `read().await`. After publish, if `needs_compaction()`, spawn compaction on blocking thread outside the lock.
161+
162+
### Milestone M6: Fix SSE Error Response Codes
163+
164+
Change `error_response()` in `agent/src/server/sse.rs` to map error variants to specific codes:
165+
166+
fn error_response(e: EventsErr) -> (StatusCode, Json<serde_json::Value>) {
167+
let status = e.http_status();
168+
let code = match &e {
169+
EventsErr::CursorExpiredErr(_) => "cursor_expired",
170+
EventsErr::MalformedCursorErr(_) => "malformed_cursor",
171+
_ => "internal_error",
172+
};
173+
...
174+
}
175+
176+
This aligns with the agent API's pattern of machine-readable error codes in response bodies.
177+
178+
### Milestone M7: Integration Test — Sync Path Event Emission
179+
180+
Create `agent/tests/events/integration.rs` (or extend `tests/sync/deployments.rs`).
181+
182+
Test that the sync path actually emits events:
183+
- Construct a `SyncArgs` with a real `EventHub` (temp-dir-backed).
184+
- Set up mock HTTP client and storage with a deployment that will transition to `Deployed`.
185+
- Run `deployments::sync()`.
186+
- Assert the hub has a `deployment.deployed.beta1` event with the correct deployment ID.
187+
188+
This closes the gap where existing sync tests pass `None` for `event_hub`.
189+
190+
## Concrete Steps
191+
192+
All commands run from the `agent` submodule root (`/home/ben/miru/miru/agent`).
193+
194+
1. Create test scaffolding.
195+
196+
mkdir -p agent/tests/events
197+
touch agent/tests/events/{mod.rs,model.rs,store.rs,hub.rs}
198+
touch agent/tests/server/sse.rs
199+
200+
Expected: new empty test files.
201+
202+
2. Verify current tests pass before changes.
203+
204+
./scripts/test.sh
205+
206+
Expected: all tests pass.
207+
208+
3. After each milestone, run tests:
209+
210+
./scripts/test.sh
211+
212+
Expected: all tests pass, including new ones.
213+
214+
4. After M5 (I/O redesign), verify event persistence still works:
215+
216+
# in a test: create hub, publish events, drop hub, re-create from same file, replay
217+
# This is covered by store tests from M2.
218+
219+
5. After M6, verify error codes in test output:
220+
221+
# in sse tests: assert error body contains "cursor_expired" or "malformed_cursor"
222+
223+
## Validation and Acceptance
224+
225+
1. `./scripts/test.sh` passes with all new tests (model, store, hub, sse, integration).
226+
2. No regressions in existing tests.
227+
3. Store tests demonstrate: monotonic IDs, disk persistence, compaction, crash recovery.
228+
4. Hub tests demonstrate: publish/subscribe, cursor validation, concurrent access.
229+
5. SSE tests demonstrate: cursor resolution, error codes (400 with `malformed_cursor`, 410 with `cursor_expired`), type filtering, replay.
230+
6. Integration test demonstrates: event emission through the sync path.
231+
7. After M5: `EventStore` no longer opens/closes the file on every append. Compaction does not block the tokio runtime thread.
232+
233+
## Idempotence and Recovery
234+
235+
- All steps are idempotent: rerunning test creation or code edits produces the same result.
236+
- Tests use `TempDir` for isolation — no shared state between tests.
237+
- If M5 (I/O redesign) causes regressions, revert to the current synchronous implementation. The tests from M1-M4 will catch any issues.
238+
- No destructive operations. No database changes. No infrastructure changes.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ sysinfo = "0.38.0"
4141
tempfile = "3.25.0"
4242
thiserror = "2.0.18"
4343
tokio = { version = "1.41.1", features = ["rt-multi-thread", "fs", "signal"] }
44+
tokio-stream = { version = "0.1", features = ["sync"] }
4445
tower = "0.5.2"
4546
tower-http = { version = "0.6.2", features = ["trace"] }
4647
tracing = "0.1.40"

TECH_DEBT.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,37 @@ Items are ordered by ID. Gaps in IDs are expected — never renumber.
44

55
| ID | Title | Category | Scope |
66
|----|-------|----------|-------|
7+
| 1 | Events module has zero test coverage | test-coverage | `agent/src/events/`, `agent/src/server/sse.rs` |
8+
| 2 | EventStore uses blocking synchronous I/O in async runtime | reliability | `agent/src/events/store.rs` |
9+
| 3 | EventHub mutex serializes all event operations behind blocking disk I/O | performance | `agent/src/events/hub.rs` |
10+
| 4 | SSE error_response uses generic error code for all event errors | correctness | `agent/src/server/sse.rs:148` |
11+
| ~~5~~ | ~~sync events not emitted~~ — deliberate scoping decision, see active exec plan Decision Log |||
12+
| 6 | EventStore opens and closes log file on every append | performance | `agent/src/events/store.rs:72-76` |
13+
14+
---
15+
16+
## Details
17+
18+
### TD-1: Events module has zero test coverage
19+
20+
The `events/` module (store, hub, model, errors) and the SSE handler have no unit or integration tests. Existing sync tests pass `None` for the event hub, so event emission is never exercised. Store operations (append, replay, compaction, crash recovery), hub cursor validation, envelope construction, and the SSE handler (cursor resolution, type filtering, replay+live streaming, error responses) are all untested.
21+
22+
### TD-2: EventStore uses blocking synchronous I/O in async runtime
23+
24+
`EventStore::append()` performs `std::fs::OpenOptions::open()`, `writeln!`, and `flush()` — all blocking syscalls — directly inside the tokio async runtime. `compact()` rewrites the entire log synchronously. These run while holding a `tokio::sync::Mutex`, blocking the worker thread and serializing all event operations.
25+
26+
### TD-3: EventHub mutex serializes all operations behind blocking disk I/O
27+
28+
`EventHub::publish()` holds `Mutex<EventStore>` for the entire append+broadcast sequence. `replay_after()` also acquires the same mutex. Because append does blocking I/O, replay calls are blocked behind any in-progress write. Under concurrent SSE connections + event publication, this creates unnecessary contention.
29+
30+
### TD-4: SSE error_response uses generic error code
31+
32+
`error_response()` in `sse.rs` always sets `code: "events_error"` regardless of error variant. Clients cannot distinguish cursor_expired from malformed_cursor via the error code field, which is the standard pattern for programmatic error handling in the agent API.
33+
34+
### ~~TD-5~~ — Closed (deliberate decision)
35+
36+
The active exec plan (`agent/.ai/exec-plans/active/20260310-agent-sse-deployment-events.md`) explicitly decided not to emit sync-level events. See Decision Log entry dated 2026-03-10. Not tech debt.
37+
38+
### TD-6: EventStore opens and closes log file on every append
39+
40+
Each `append()` call opens the file, writes one line, flushes, and drops the handle. For the expected low frequency (events per sync cycle), this is functional but wasteful. A persistent file handle would eliminate repeated open/close syscalls.

agent/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ sysinfo = { workspace = true }
3030
tempfile = { workspace = true }
3131
thiserror = { workspace = true }
3232
tokio = { workspace = true }
33+
tokio-stream = { workspace = true }
3334
tracing = { workspace = true }
3435
tracing-appender = { workspace = true }
3536
tracing-subscriber = { workspace = true }
@@ -38,6 +39,7 @@ tower = { workspace = true }
3839
tower-http = { workspace = true }
3940

4041
[dev-dependencies]
42+
http-body-util = "0.1"
4143
reqwest = { workspace = true, features = ["json"] }
4244
rumqttd = { workspace = true }
4345
serial_test = { workspace = true }

agent/src/app/run.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ async fn init_socket_server(
303303
app_state.syncer.clone(),
304304
app_state.token_mngr.clone(),
305305
app_state.activity_tracker.clone(),
306+
app_state.event_hub.clone(),
306307
);
307308
let server_handle = serve(&options.server, Arc::new(server_state), async move {
308309
let _ = shutdown_rx.recv().await;

0 commit comments

Comments
 (0)