Commit 0977ce6
fix(precompute): close idle/trailing windows for one-shot batch ingest (wall-clock fallback + shutdown force-close) (#400)
* fix(precompute): port wall-clock grace fallback from backend to close idle windows
Sync the precompute-engine window-closing fix from ASAPQuery-backend.
Problem: with strict event-time semantics, a tumbling window [T, T+size)
only closes when the watermark reaches T+size. For a one-shot batch ingest
where every record carries (nearly) the same timestamp, the watermark
freezes and flush_all's +1ms advance is a no-op, so the trailing window
never closes — emit_batch is never called and the store stays empty even
though data was ingested.
Fix (ported from the backend's flush_all "sweep blocker #2" fallback):
track each pane's wall-clock birth time and, in flush_all, force the
effective watermark past pane_start + window_size_ms for any pane that has
been alive longer than window_size_ms + wall_clock_grace_period_ms of
wall-clock time, regardless of event-time. Set wall_clock_grace_period_ms
<= 0 to opt out and keep strict event-time-only semantics.
Changes:
- config: add wall_clock_grace_period_ms (serde default 5000ms)
- worker: GroupState.pane_wall_clock_starts_ms + prune; injectable now_ms_fn
(default SystemTime::now, test override via set_now_ms_fn); record pane
birth in process_group_samples; wall-clock fallback in flush_all
- engine/main/CLI: thread the setting through (PrecomputeSettings +
--wall-clock-grace-period-ms flag)
- tests: wall_clock_fallback_closes_idle_window and the grace=0 opt-out;
update existing config literals
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
* fix(precompute): force-close open windows on worker shutdown
Follow-up to the wall-clock grace fallback. The fallback only fires once a
pane has aged window_size + grace of wall-clock time on a flush tick, so a
one-shot batch that ingests and immediately shuts down still leaves its
trailing window open — flush_all's +1ms advance never reaches the window
end and the data never lands in the store.
Add Worker::force_close_all(), invoked from the Shutdown handler after the
final flush_all(): for every group with open panes, advance to a finite
bound (max_pane + window_size_ms) and emit every remaining window. A finite
bound is used deliberately — WindowManager::closed_windows enumerates window
starts one slide at a time, so passing i64::MAX would loop ~i64::MAX/slide
times and overflow. The pass is idempotent: drained panes are removed and
their wall-clock bookkeeping pruned.
- worker: force_close_all() + Shutdown handler wiring
- tests: shutdown_force_close_emits_trailing_window; update
test_update_agg_configs_enables_new_aggregation_at_runtime to assert the
trailing window [10_000, 20_000) is now emitted (previously lost)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
* test(precompute): e2e regression for one-shot NetFlow single-second ingest
Drives the real JsonFileIngestSource -> PrecomputeEngine -> sink path with a
synthesized NetFlow JSONL whose records all fall in the same second. Sets
wall_clock_grace_period_ms=0 so only the shutdown force-close can close the
trailing window — a non-empty sink proves the force-close rescues the batch.
Verified both directions: with the Shutdown force-close the single-second
window is emitted with the summed bytes; with it disabled the run produces 0
outputs (reproducing the reported empty-store bug).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
* style: cargo fmt for precompute worker tests and netflow e2e
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
* fix(precompute): use finite bound in UpdateAggConfigs removed-agg force-close
Addresses review feedback on #400. The removed-agg-id cleanup in the
UpdateAggConfigs handler advanced the watermark to i64::MAX, the exact hazard
force_close_all's docstring warns about: closed_windows enumerates window
starts one slide at a time, so with a real epoch-ms watermark it loops
~i64::MAX/slide times and overflows `start + window_size_ms` (panics in
debug). Use the same finite bound as force_close_all (max_pane +
window_size_ms).
Add test_update_agg_configs_removed_id_force_closes_at_epoch_ms_timestamps,
which opens a window at a realistic epoch-ms timestamp and removes the
agg_id; it hangs/overflows under the old i64::MAX code (verified: times out)
and passes with the finite bound.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
---------
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>1 parent 6016eb2 commit 0977ce6
11 files changed
Lines changed: 754 additions & 11 deletions
File tree
- asap-query-engine
- src
- bin
- precompute_engine
- tests
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
175 | 175 | | |
176 | 176 | | |
177 | 177 | | |
| 178 | + | |
178 | 179 | | |
179 | 180 | | |
180 | 181 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
234 | 234 | | |
235 | 235 | | |
236 | 236 | | |
| 237 | + | |
237 | 238 | | |
238 | 239 | | |
239 | 240 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
67 | 67 | | |
68 | 68 | | |
69 | 69 | | |
| 70 | + | |
| 71 | + | |
| 72 | + | |
| 73 | + | |
| 74 | + | |
70 | 75 | | |
71 | 76 | | |
72 | 77 | | |
| |||
170 | 175 | | |
171 | 176 | | |
172 | 177 | | |
| 178 | + | |
173 | 179 | | |
174 | 180 | | |
175 | 181 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
151 | 151 | | |
152 | 152 | | |
153 | 153 | | |
| 154 | + | |
154 | 155 | | |
155 | 156 | | |
156 | 157 | | |
| |||
299 | 300 | | |
300 | 301 | | |
301 | 302 | | |
| 303 | + | |
302 | 304 | | |
303 | 305 | | |
304 | 306 | | |
| |||
655 | 657 | | |
656 | 658 | | |
657 | 659 | | |
| 660 | + | |
658 | 661 | | |
659 | 662 | | |
660 | 663 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
310 | 310 | | |
311 | 311 | | |
312 | 312 | | |
| 313 | + | |
| 314 | + | |
| 315 | + | |
| 316 | + | |
| 317 | + | |
| 318 | + | |
313 | 319 | | |
314 | 320 | | |
315 | 321 | | |
| |||
321 | 327 | | |
322 | 328 | | |
323 | 329 | | |
| 330 | + | |
324 | 331 | | |
325 | 332 | | |
326 | 333 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
209 | 209 | | |
210 | 210 | | |
211 | 211 | | |
| 212 | + | |
212 | 213 | | |
213 | 214 | | |
214 | 215 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
30 | 30 | | |
31 | 31 | | |
32 | 32 | | |
| 33 | + | |
| 34 | + | |
| 35 | + | |
| 36 | + | |
| 37 | + | |
| 38 | + | |
| 39 | + | |
| 40 | + | |
| 41 | + | |
| 42 | + | |
| 43 | + | |
| 44 | + | |
| 45 | + | |
33 | 46 | | |
34 | 47 | | |
35 | 48 | | |
| |||
43 | 56 | | |
44 | 57 | | |
45 | 58 | | |
| 59 | + | |
46 | 60 | | |
47 | 61 | | |
48 | 62 | | |
49 | 63 | | |
| 64 | + | |
| 65 | + | |
| 66 | + | |
| 67 | + | |
50 | 68 | | |
51 | 69 | | |
52 | 70 | | |
| |||
62 | 80 | | |
63 | 81 | | |
64 | 82 | | |
| 83 | + | |
65 | 84 | | |
66 | 85 | | |
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
171 | 171 | | |
172 | 172 | | |
173 | 173 | | |
| 174 | + | |
174 | 175 | | |
175 | 176 | | |
176 | 177 | | |
| |||
0 commit comments