Skip to content

Commit 4c2629d

Browse files
zzylolclaude
andcommitted
fix(precompute): port wall-clock grace fallback from backend to close idle windows
Sync the precompute-engine window-closing fix from ASAPQuery-backend. Problem: with strict event-time semantics, a tumbling window [T, T+size) only closes when the watermark reaches T+size. For a one-shot batch ingest where every record carries (nearly) the same timestamp, the watermark freezes and flush_all's +1ms advance is a no-op, so the trailing window never closes — emit_batch is never called and the store stays empty even though data was ingested. Fix (ported from the backend's flush_all "sweep blocker #2" fallback): track each pane's wall-clock birth time and, in flush_all, force the effective watermark past pane_start + window_size_ms for any pane that has been alive longer than window_size_ms + wall_clock_grace_period_ms of wall-clock time, regardless of event-time. Set wall_clock_grace_period_ms <= 0 to opt out and keep strict event-time-only semantics. Changes: - config: add wall_clock_grace_period_ms (serde default 5000ms) - worker: GroupState.pane_wall_clock_starts_ms + prune; injectable now_ms_fn (default SystemTime::now, test override via set_now_ms_fn); record pane birth in process_group_samples; wall-clock fallback in flush_all - engine/main/CLI: thread the setting through (PrecomputeSettings + --wall-clock-grace-period-ms flag) - tests: wall_clock_fallback_closes_idle_window and the grace=0 opt-out; update existing config literals Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent b088ae5 commit 4c2629d

10 files changed

Lines changed: 298 additions & 2 deletions

File tree

asap-query-engine/src/bin/bench_precompute_sketch.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ async fn start_engine(
175175
pass_raw_samples: false,
176176
raw_mode_aggregation_id: 0,
177177
late_data_policy: LateDataPolicy::Drop,
178+
wall_clock_grace_period_ms: 5_000,
178179
};
179180
let sources: Vec<Box<dyn IngestSource>> =
180181
vec![Box::new(HttpIngestSource::new(HttpIngestConfig { port }))];

asap-query-engine/src/bin/e2e_quickstart_resource_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
234234
pass_raw_samples: false,
235235
raw_mode_aggregation_id: 0,
236236
late_data_policy: LateDataPolicy::Drop,
237+
wall_clock_grace_period_ms: 5_000,
237238
};
238239
let output_sink = Arc::new(StoreOutputSink::new(store.clone()));
239240
let sources: Vec<Box<dyn IngestSource>> =

asap-query-engine/src/bin/precompute_engine.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ struct Args {
6767
#[arg(long, value_enum, default_value_t = LateDataPolicy::Drop)]
6868
late_data_policy: LateDataPolicy,
6969

70+
/// Wall-clock grace period (ms) for the flush fallback that force-closes
71+
/// idle windows when event-time stagnates. Set to <= 0 to disable.
72+
#[arg(long, default_value_t = 5000)]
73+
wall_clock_grace_period_ms: i64,
74+
7075
// --- CSV file ingest (alternative to HTTP) ---
7176
/// Path to a local CSV file to ingest instead of listening for HTTP writes
7277
#[arg(long)]
@@ -170,6 +175,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
170175
pass_raw_samples: args.pass_raw_samples,
171176
raw_mode_aggregation_id: args.raw_mode_aggregation_id,
172177
late_data_policy: args.late_data_policy,
178+
wall_clock_grace_period_ms: args.wall_clock_grace_period_ms,
173179
};
174180

175181
// Create the output sink (writes directly to the store)

asap-query-engine/src/bin/test_e2e_precompute.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
151151
pass_raw_samples: false,
152152
raw_mode_aggregation_id: 0,
153153
late_data_policy: LateDataPolicy::Drop,
154+
wall_clock_grace_period_ms: 5000,
154155
};
155156
let output_sink = Arc::new(StoreOutputSink::new(store.clone()));
156157
let sources: Vec<Box<dyn IngestSource>> =
@@ -299,6 +300,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
299300
pass_raw_samples: true,
300301
raw_mode_aggregation_id: raw_agg_id,
301302
late_data_policy: LateDataPolicy::Drop,
303+
wall_clock_grace_period_ms: 5000,
302304
};
303305
let raw_sink = Arc::new(RawPassthroughSink::new(store.clone()));
304306
let raw_sources: Vec<Box<dyn IngestSource>> =
@@ -655,6 +657,7 @@ async fn run_single_bench(
655657
pass_raw_samples: false,
656658
raw_mode_aggregation_id: 0,
657659
late_data_policy: LateDataPolicy::Drop,
660+
wall_clock_grace_period_ms: 5000,
658661
};
659662
let sources: Vec<Box<dyn IngestSource>> =
660663
vec![Box::new(HttpIngestSource::new(HttpIngestConfig { port }))];

asap-query-engine/src/engine_config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,12 @@ pub struct PrecomputeSettings {
310310
pub flush_interval_ms: u64,
311311
pub channel_buffer_size: usize,
312312
pub dump_precomputes: bool,
313+
/// Wall-clock grace period (ms) for the flush fallback that force-closes
314+
/// idle windows when event-time stagnates (e.g. one-shot batch ingest
315+
/// where every record shares a timestamp). Set to <= 0 to disable and
316+
/// keep strict event-time-only semantics. See
317+
/// `PrecomputeEngineConfig::wall_clock_grace_period_ms`.
318+
pub wall_clock_grace_period_ms: i64,
313319
}
314320

315321
impl Default for PrecomputeSettings {
@@ -321,6 +327,7 @@ impl Default for PrecomputeSettings {
321327
flush_interval_ms: 1000,
322328
channel_buffer_size: 10000,
323329
dump_precomputes: false,
330+
wall_clock_grace_period_ms: 5000,
324331
}
325332
}
326333
}

asap-query-engine/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ async fn main() -> Result<()> {
209209
pass_raw_samples: false,
210210
raw_mode_aggregation_id: 0,
211211
late_data_policy: LateDataPolicy::Drop,
212+
wall_clock_grace_period_ms: config.precompute_engine.wall_clock_grace_period_ms,
212213
};
213214
let output_sink = Arc::new(StoreOutputSink::new(store.clone()));
214215
let sources: Vec<Box<dyn IngestSource>> = match &config.ingest {

asap-query-engine/src/precompute_engine/config.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,19 @@ pub struct PrecomputeEngineConfig {
3030
pub raw_mode_aggregation_id: u64,
3131
/// Policy for handling late samples that arrive after their window has closed.
3232
pub late_data_policy: LateDataPolicy,
33+
/// Wall-clock grace period (milliseconds) for the watermark fallback in
34+
/// `flush_all`. When event-time stagnates (e.g. a one-shot batch where
35+
/// every record carries the same timestamp), `flush_all`'s `+1ms`
36+
/// watermark advance is a no-op and idle windows never close. The
37+
/// wall-clock fallback closes a pane whose creation has been older
38+
/// than `window_size_ms + wall_clock_grace_period_ms` of *wall-clock*
39+
/// time, regardless of where event-time is. The grace period tolerates
40+
/// late-arriving events that would otherwise be evicted as "the window
41+
/// already closed". Set to `<= 0` to opt out and keep strict
42+
/// event-time-only semantics. Default: 5000 ms (matches
43+
/// `allowed_lateness_ms` default).
44+
#[serde(default = "default_wall_clock_grace_period_ms")]
45+
pub wall_clock_grace_period_ms: i64,
3346
}
3447

3548
impl Default for PrecomputeEngineConfig {
@@ -43,10 +56,15 @@ impl Default for PrecomputeEngineConfig {
4356
pass_raw_samples: false,
4457
raw_mode_aggregation_id: 0,
4558
late_data_policy: LateDataPolicy::Drop,
59+
wall_clock_grace_period_ms: default_wall_clock_grace_period_ms(),
4660
}
4761
}
4862
}
4963

64+
fn default_wall_clock_grace_period_ms() -> i64 {
65+
5_000
66+
}
67+
5068
#[cfg(test)]
5169
mod tests {
5270
use super::*;
@@ -62,5 +80,6 @@ mod tests {
6280
assert!(!config.pass_raw_samples);
6381
assert_eq!(config.raw_mode_aggregation_id, 0);
6482
assert_eq!(config.late_data_policy, LateDataPolicy::Drop);
83+
assert_eq!(config.wall_clock_grace_period_ms, 5_000);
6584
}
6685
}

asap-query-engine/src/precompute_engine/engine.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ impl PrecomputeEngine {
171171
pass_raw_samples: self.config.pass_raw_samples,
172172
raw_mode_aggregation_id: self.config.raw_mode_aggregation_id,
173173
late_data_policy: self.config.late_data_policy,
174+
wall_clock_grace_period_ms: self.config.wall_clock_grace_period_ms,
174175
},
175176
self.diagnostics.worker_group_counts[id].clone(),
176177
self.diagnostics.worker_watermarks[id].clone(),

0 commit comments

Comments
 (0)